跳转到主要内容

asyncio的任务实用工具集合

项目描述

管理并发asyncio任务的实用工具。

您可以使用 concurrent async生成器来并发运行asyncio任务。

它的工作方式与 asyncio.as_available 类似,但有一些不同。

  • coros 可以是任何 iterables,包括同步/异步 generators

  • limit 可以提供以指定最大并发任务数

limit 设置为 -1 将使所有任务并发运行。

默认的 limit核心数 + 4,最多 32。这(有些任意)反映了asyncio的 ThreadPoolExecutor 的默认设置。

对于网络任务,如果打开许多并发连接会触发速率限制或消耗带宽,则可能有必要将并发 limit 设置得低于默认值。

如果在迭代提供的coroutines时引发错误,则错误将包装在 ConcurrentIteratorError 中,并立即引发。

在这种情况下,不会进行进一步的处理,并且 yield_exceptions 无效。

在尝试创建或运行任务时引发的任何错误都将包装在 ConcurrentError 中。

在任务执行过程中引发的任何错误都将包装在 ConcurrentExecutionError 中。

如果您将 yield_exceptions 设置为 True,则包装的错误将作为结果返回。

如果 yield_exceptions 为 False(默认值),则包装的错误将立即引发。

如果您使用任何类型的 生成器AsyncGenerator 来生成可等待对象,并且 yield_exceptions 设置为 False,在发生错误的情况下,您有责任关闭可能已创建但尚未触发的剩余可等待对象。

此实用程序适用于 io-bound(与 cpu-bound 相反)任务的并发性。

用法

首先创建一个协程,它在随机时间内等待,然后返回其 ID 和等待时间。

>>> import random

>>> async def task_to_run(task_id):
...     print(f"{task_id} starting")
...     wait = random.random() * 5
...     await asyncio.sleep(wait)
...     return task_id, wait

接下来创建一个异步生成器,产生 10 个协程。

请注意,协程不会被等待,它们将以任务的形式被创建。

>>> def provider():
...     for task_id in range(0, 10):
...         yield task_to_run(task_id)

最后,创建一个用于异步迭代结果并使用生成器调用的函数。

由于我们将并发性限制为 3,因此首先启动前 3 个任务,一旦第一个返回,下一个就会触发。

一直持续到所有任务都完成。

>>> import asyncio
>>> from aio.tasks import concurrent

>>> async def run(coros):
...     async for (task_id, wait) in concurrent(coros, limit=3):
...         print(f"{task_id} waited {wait}")

>>> asyncio.run(run(provider()))
0 starting
1 starting
2 starting
... waited ...
3 starting
... waited ...
...
... waited ...

项目详情


下载文件

下载适合您平台文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源代码分布

aio.tasks-0.0.5.tar.gz (8.1 kB 查看哈希值)

上传时间 源代码

构建分布

aio.tasks-0.0.5-py3-none-any.whl (8.2 kB 查看哈希值)

上传时间 Python 3

由以下机构支持