一个与asyncio和trio兼容的Python并发调度库
项目描述
aiometer
aiometer
是一个与 asyncio
和 trio
兼容的并发调度库,灵感来自 Trimeter。它使执行大量任务并发变得更容易,同时控制并发限制(即应用 背压)并以可预测的方式收集结果。
内容
示例
让我们使用 HTTPX 来并发执行网络请求...
使用 IPython 交互式尝试此代码。
>>> import asyncio
>>> import functools
>>> import random
>>> import aiometer
>>> import httpx
>>>
>>> client = httpx.AsyncClient()
>>>
>>> async def fetch(client, request):
... response = await client.send(request)
... # Simulate extra processing...
... await asyncio.sleep(2 * random.random())
... return response.json()["json"]
...
>>> requests = [
... httpx.Request("POST", "https://httpbin.org/anything", json={"index": index})
... for index in range(100)
... ]
...
>>> # Send requests, and process responses as they're made available:
>>> async with aiometer.amap(
... functools.partial(fetch, client),
... requests,
... max_at_once=10, # Limit maximum number of concurrently running tasks.
... max_per_second=5, # Limit request rate to not overload the server.
... ) as results:
... async for data in results:
... print(data)
...
{'index': 3}
{'index': 4}
{'index': 1}
{'index': 2}
{'index': 0}
...
>>> # Alternatively, fetch and aggregate responses into an (ordered) list...
>>> jobs = [functools.partial(fetch, client, request) for request in requests]
>>> results = await aiometer.run_all(jobs, max_at_once=10, max_per_second=5)
>>> results
[{'index': 0}, {'index': 1}, {'index': 2}, {'index': 3}, {'index': 4}, ...]
安装
此项目处于测试版并正在成熟。请确保将任何依赖项固定到最新的次要版本。
pip install "aiometer==0.5.*"
特性
- 并发管理和节流助手。
- 支持
asyncio
和trio
。 - 完全类型注解。
- 100% 测试覆盖率。
用法
流控制
aiometer
的关键亮点是允许您应用流控制策略来限制程序并发程度。
有两个旋钮可以调整以微调并发
max_at_once
:用于限制在任何给定时间点同时运行的任务的最大数量。(如果你有100个任务,并设置max_at_once=10
,则aiometer
将确保不会同时运行超过10个。)max_per_second
:此选项限制每秒生成的任务数量。这有助于避免过载I/O资源,例如可能实施速率限制策略的服务器。
示例用法
>>> import asyncio
>>> import aiometer
>>> async def make_query(query):
... await asyncio.sleep(0.05) # Simulate a database request.
...
>>> queries = ['SELECT * from authors'] * 1000
>>> # Allow at most 5 queries to run concurrently at any given time:
>>> await aiometer.run_on_each(make_query, queries, max_at_once=5)
...
>>> # Make at most 10 queries per second:
>>> await aiometer.run_on_each(make_query, queries, max_per_second=10)
...
>>> # Run at most 10 concurrent jobs, spawning new ones at least every 5 seconds:
>>> async def job(id):
... await asyncio.sleep(10) # A very long task.
...
>>> await aiometer.run_on_each(job, range(100), max_at_once=10, max_per_second=0.2)
运行任务
aiometer
提供4种不同的并发运行任务的方式,分别以4个不同的运行函数的形式。每个函数都接受在流控制中记录的所有选项,并以略微不同的方式运行任务,从而可以解决各种用例。下面是一个方便的表格供参考(参见API参考)
入口点 | 用例 |
---|---|
run_on_each() |
以任何顺序执行异步回调。 |
run_all() |
按顺序返回结果列表。 |
amap() |
按结果可用顺序迭代。 |
run_any() |
返回第一个完成的函数的结果。 |
为了说明每个运行函数的行为,我们首先设置一个hello world异步程序
>>> import asyncio
>>> import random
>>> from functools import partial
>>> import aiometer
>>>
>>> async def get_greeting(name):
... await asyncio.sleep(random.random()) # Simulate I/O
... return f"Hello, {name}"
...
>>> async def greet(name):
... greeting = await get_greeting(name)
... print(greeting)
...
>>> names = ["Robert", "Carmen", "Lucas"]
让我们从run_on_each()
开始。它对作为参数传递的列表中的每个项目执行一次异步函数
>>> await aiometer.run_on_each(greet, names)
'Hello, Robert!'
'Hello, Lucas!'
'Hello, Carmen!'
如果我们想按与names
相同的顺序获取问候语列表,类似于Promise.all()
,我们可以使用run_all()
>>> await aiometer.run_all([partial(get_greeting, name) for name in names])
['Hello, Robert', 'Hello, Carmen!', 'Hello, Lucas!']
amap()
允许我们按结果可用顺序处理每个问候语(这意味着不保证保持顺序)
>>> async with aiometer.amap(get_greeting, names) as greetings:
... async for greeting in greetings:
... print(greeting)
'Hello, Lucas!'
'Hello, Robert!'
'Hello, Carmen!'
最后,可以使用run_any()
来运行异步函数,直到第一个完成,类似于Promise.any()
>>> await aiometer.run_any([partial(get_greeting, name) for name in names])
'Hello, Carmen!'
作为一个有趣的例子,让我们使用amap()
来实现sleep sort的无线程异步版本
>>> import asyncio
>>> from functools import partial
>>> import aiometer
>>> numbers = [0.3, 0.1, 0.6, 0.2, 0.7, 0.5, 0.5, 0.2]
>>> async def process(n):
... await asyncio.sleep(n)
... return n
...
>>> async with aiometer.amap(process, numbers) as results:
... sorted_numbers = [n async for n in results]
...
>>> sorted_numbers
[0.1, 0.2, 0.2, 0.3, 0.5, 0.5, 0.6, 0.7]
如何使用
在run_on_each
和amap
中多次使用参数化值
run_on_each
和amap
仅接受接受单个位置参数的函数(即(Any) -> Awaitable
)。
因此,如果你有一个多值参数化的函数,你应该重构它以匹配此形式。
这通常可以这样实现
- 构建一个代理容器类型(例如,一个
namedtuple
),例如T
。 - 重构你的函数,使其签名现在为
(T) -> Awaitable
。 - 构建这些代理容器的列表,并将其传递给
aiometer
。
例如,假设你有一个处理X/Y坐标的函数...
async def process(x: float, y: float) -> None:
pass
xs = list(range(100))
ys = list(range(100))
for x, y in zip(xs, ys):
await process(x, y)
你可以通过如下重构使用它与amap
from typing import NamedTuple
# Proxy container type:
class Point(NamedTuple):
x: float
y: float
# Rewrite to accept a proxy as a single positional argument:
async def process(point: Point) -> None:
x = point.x
y = point.y
...
xs = list(range(100))
ys = list(range(100))
# Build a list of proxy containers:
points = [Point(x, y) for x, y in zip(x, y)]
# Use it:
async with aiometer.amap(process, points) as results:
...
API参考
常见选项
max_at_once
(可选,int
):任何给定时间点同时运行的任务的最大数量。max_per_second
(可选,int
):每秒生成的任务的最大数量。
aiometer.run_on_each()
签名:async aiometer.run_on_each(async_fn, args, *, max_at_once=None, max_per_second=None) -> None
并发运行等价于async_fn(arg) for arg in args
的任务。不返回任何值。要获取返回值,请使用aiometer.run_all()
。
aiometer.run_all()
签名:async aiometer.run_all(async_fns, max_at_once=None, max_per_second=None) -> list
并发运行async_fns
函数,并按相同顺序返回结果列表。
aiometer.amap()
签名:async aiometer.amap(async_fn, args, max_at_once=None, max_per_second=None) -> async iterator
同时运行 async_fn(arg) for arg in args
的等效操作,并返回一个异步迭代器,它会随着结果可用而产出结果。
aiometer.run_any()
签名: async aiometer.run_any(async_fns, max_at_once=None, max_per_second=None) -> Any
同时运行 async_fns
函数,并返回第一个可用的结果。
贡献
参见 CONTRIBUTING.md。
许可证
MIT
变更日志
本项目所有显著的变更都将记录在此文件中。
格式基于 Keep a Changelog。
0.5.0 - 2023-12-11
已移除
- 停止支持 Python 3.7,因为它已达到生命周期结束。 (Pull #44)
已添加
- 添加对 Python 3.12 的官方支持。 (Pull #44)
- 添加对 anyio 4 的支持。这允许使用本地的 ExceptionGroup 捕获异常组。在 anyio 3.2+ 上,anyio 会抛出自己的 ExceptionGroup 类型。保留了与 anyio 3.2+ 的兼容性。 (Pull #43)
0.4.0 - 2023-01-18
已移除
- 停止支持 Python 3.6,因为它已达到生命周期结束。 (Pull #38)
已添加
- 添加对 Python 3.10 和 3.11 的官方支持。 (Pull #38)
修复
- 放宽
typing_extensions
的版本要求,并解决mypy>=0.981
严格的可选更改。 (Pull #38)
0.3.0 - 2021-07-06
变更
- 更新
anyio
依赖到 v3(之前是 v1)。(Pull #25)- 注意:没有 API 变更,但可能存在依赖不匹配。在升级
aiometer
之前,请确保将代码库迁移到 anyio v3。
- 注意:没有 API 变更,但可能存在依赖不匹配。在升级
已添加
- 添加对 Python 3.6 的支持(在 3.6 中安装
contextlib2
回退库)。(Pull #26) - 官方支持 Python 3.9。(Pull #26)
0.2.1 - 2020-03-26
修复
- 通过使用通用的细胞速率算法 (GCRA) 而不是漏桶算法来改进
max_per_second
实现的健壮性。(Pull #5)
0.2.0 - 2020-03-22
已添加
- 添加对 Python 3.7 的支持。(Pull #3)
0.1.0 - 2020-03-21
已添加
- 添加
run_on_each()
、run_all()
、amap()
和run_any()
,带有max_at_once
和max_per_second
选项。(Pull #1)
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。