跳转到主要内容

一个与asyncio和trio兼容的Python并发调度库

项目描述

aiometer

Build Status Coverage Python versions Package version

aiometer 是一个与 asynciotrio 兼容的并发调度库,灵感来自 Trimeter。它使执行大量任务并发变得更容易,同时控制并发限制(即应用 背压)并以可预测的方式收集结果。

内容

示例

让我们使用 HTTPX 来并发执行网络请求...

使用 IPython 交互式尝试此代码。

>>> import asyncio
>>> import functools
>>> import random
>>> import aiometer
>>> import httpx
>>>
>>> client = httpx.AsyncClient()
>>>
>>> async def fetch(client, request):
...     response = await client.send(request)
...     # Simulate extra processing...
...     await asyncio.sleep(2 * random.random())
...     return response.json()["json"]
...
>>> requests = [
...     httpx.Request("POST", "https://httpbin.org/anything", json={"index": index})
...     for index in range(100)
... ]
...
>>> # Send requests, and process responses as they're made available:
>>> async with aiometer.amap(
...     functools.partial(fetch, client),
...     requests,
...     max_at_once=10, # Limit maximum number of concurrently running tasks.
...     max_per_second=5,  # Limit request rate to not overload the server.
... ) as results:
...     async for data in results:
...         print(data)
...
{'index': 3}
{'index': 4}
{'index': 1}
{'index': 2}
{'index': 0}
...
>>> # Alternatively, fetch and aggregate responses into an (ordered) list...
>>> jobs = [functools.partial(fetch, client, request) for request in requests]
>>> results = await aiometer.run_all(jobs, max_at_once=10, max_per_second=5)
>>> results
[{'index': 0}, {'index': 1}, {'index': 2}, {'index': 3}, {'index': 4}, ...]

安装

此项目处于测试版并正在成熟。请确保将任何依赖项固定到最新的次要版本。

pip install "aiometer==0.5.*"

特性

  • 并发管理和节流助手。
  • 支持 asynciotrio
  • 完全类型注解。
  • 100% 测试覆盖率。

用法

流控制

aiometer 的关键亮点是允许您应用流控制策略来限制程序并发程度。

有两个旋钮可以调整以微调并发

  • max_at_once:用于限制在任何给定时间点同时运行的任务的最大数量。(如果你有100个任务,并设置max_at_once=10,则aiometer将确保不会同时运行超过10个。)
  • max_per_second:此选项限制每秒生成的任务数量。这有助于避免过载I/O资源,例如可能实施速率限制策略的服务器。

示例用法

>>> import asyncio
>>> import aiometer
>>> async def make_query(query):
...     await asyncio.sleep(0.05)  # Simulate a database request.
...
>>> queries = ['SELECT * from authors'] * 1000
>>> # Allow at most 5 queries to run concurrently at any given time:
>>> await aiometer.run_on_each(make_query, queries, max_at_once=5)
...
>>> # Make at most 10 queries per second:
>>> await aiometer.run_on_each(make_query, queries, max_per_second=10)
...
>>> # Run at most 10 concurrent jobs, spawning new ones at least every 5 seconds:
>>> async def job(id):
...     await asyncio.sleep(10)  # A very long task.
...
>>> await aiometer.run_on_each(job, range(100),  max_at_once=10, max_per_second=0.2)

运行任务

aiometer提供4种不同的并发运行任务的方式,分别以4个不同的运行函数的形式。每个函数都接受在流控制中记录的所有选项,并以略微不同的方式运行任务,从而可以解决各种用例。下面是一个方便的表格供参考(参见API参考

入口点 用例
run_on_each() 以任何顺序执行异步回调。
run_all() 按顺序返回结果列表。
amap() 按结果可用顺序迭代。
run_any() 返回第一个完成的函数的结果。

为了说明每个运行函数的行为,我们首先设置一个hello world异步程序

>>> import asyncio
>>> import random
>>> from functools import partial
>>> import aiometer
>>>
>>> async def get_greeting(name):
...     await asyncio.sleep(random.random())  # Simulate I/O
...     return f"Hello, {name}"
...
>>> async def greet(name):
...     greeting = await get_greeting(name)
...     print(greeting)
...
>>> names = ["Robert", "Carmen", "Lucas"]

让我们从run_on_each()开始。它对作为参数传递的列表中的每个项目执行一次异步函数

>>> await aiometer.run_on_each(greet, names)
'Hello, Robert!'
'Hello, Lucas!'
'Hello, Carmen!'

如果我们想按与names相同的顺序获取问候语列表,类似于Promise.all(),我们可以使用run_all()

>>> await aiometer.run_all([partial(get_greeting, name) for name in names])
['Hello, Robert', 'Hello, Carmen!', 'Hello, Lucas!']

amap()允许我们按结果可用顺序处理每个问候语(这意味着不保证保持顺序)

>>> async with aiometer.amap(get_greeting, names) as greetings:
...     async for greeting in greetings:
...         print(greeting)
'Hello, Lucas!'
'Hello, Robert!'
'Hello, Carmen!'

最后,可以使用run_any()来运行异步函数,直到第一个完成,类似于Promise.any()

>>> await aiometer.run_any([partial(get_greeting, name) for name in names])
'Hello, Carmen!'

作为一个有趣的例子,让我们使用amap()来实现sleep sort的无线程异步版本

>>> import asyncio
>>> from functools import partial
>>> import aiometer
>>> numbers = [0.3, 0.1, 0.6, 0.2, 0.7, 0.5, 0.5, 0.2]
>>> async def process(n):
...     await asyncio.sleep(n)
...     return n
...
>>> async with aiometer.amap(process, numbers) as results:
...     sorted_numbers = [n async for n in results]
...
>>> sorted_numbers
[0.1, 0.2, 0.2, 0.3, 0.5, 0.5, 0.6, 0.7]

如何使用

run_on_eachamap中多次使用参数化值

run_on_eachamap仅接受接受单个位置参数的函数(即(Any) -> Awaitable)。

因此,如果你有一个多值参数化的函数,你应该重构它以匹配此形式。

这通常可以这样实现

  1. 构建一个代理容器类型(例如,一个namedtuple),例如T
  2. 重构你的函数,使其签名现在为(T) -> Awaitable
  3. 构建这些代理容器的列表,并将其传递给aiometer

例如,假设你有一个处理X/Y坐标的函数...

async def process(x: float, y: float) -> None:
    pass

xs = list(range(100))
ys = list(range(100))

for x, y in zip(xs, ys):
    await process(x, y)

你可以通过如下重构使用它与amap

from typing import NamedTuple

# Proxy container type:
class Point(NamedTuple):
    x: float
    y: float

# Rewrite to accept a proxy as a single positional argument:
async def process(point: Point) -> None:
    x = point.x
    y = point.y
    ...

xs = list(range(100))
ys = list(range(100))

# Build a list of proxy containers:
points = [Point(x, y) for x, y in zip(x, y)]

# Use it:
async with aiometer.amap(process, points) as results:
    ...

API参考

常见选项

  • max_at_once可选int):任何给定时间点同时运行的任务的最大数量。
  • max_per_second可选int):每秒生成的任务的最大数量。

aiometer.run_on_each()

签名async aiometer.run_on_each(async_fn, args, *, max_at_once=None, max_per_second=None) -> None

并发运行等价于async_fn(arg) for arg in args的任务。不返回任何值。要获取返回值,请使用aiometer.run_all()

aiometer.run_all()

签名async aiometer.run_all(async_fns, max_at_once=None, max_per_second=None) -> list

并发运行async_fns函数,并按相同顺序返回结果列表。

aiometer.amap()

签名async aiometer.amap(async_fn, args, max_at_once=None, max_per_second=None) -> async iterator

同时运行 async_fn(arg) for arg in args 的等效操作,并返回一个异步迭代器,它会随着结果可用而产出结果。

aiometer.run_any()

签名: async aiometer.run_any(async_fns, max_at_once=None, max_per_second=None) -> Any

同时运行 async_fns 函数,并返回第一个可用的结果。

贡献

参见 CONTRIBUTING.md

许可证

MIT

变更日志

本项目所有显著的变更都将记录在此文件中。

格式基于 Keep a Changelog

0.5.0 - 2023-12-11

已移除

  • 停止支持 Python 3.7,因为它已达到生命周期结束。 (Pull #44)

已添加

  • 添加对 Python 3.12 的官方支持。 (Pull #44)
  • 添加对 anyio 4 的支持。这允许使用本地的 ExceptionGroup 捕获异常组。在 anyio 3.2+ 上,anyio 会抛出自己的 ExceptionGroup 类型。保留了与 anyio 3.2+ 的兼容性。 (Pull #43)

0.4.0 - 2023-01-18

已移除

  • 停止支持 Python 3.6,因为它已达到生命周期结束。 (Pull #38)

已添加

  • 添加对 Python 3.10 和 3.11 的官方支持。 (Pull #38)

修复

  • 放宽 typing_extensions 的版本要求,并解决 mypy>=0.981 严格的可选更改。 (Pull #38)

0.3.0 - 2021-07-06

变更

  • 更新 anyio 依赖到 v3(之前是 v1)。(Pull #25)
    • 注意:没有 API 变更,但可能存在依赖不匹配。在升级 aiometer 之前,请确保将代码库迁移到 anyio v3。

已添加

  • 添加对 Python 3.6 的支持(在 3.6 中安装 contextlib2 回退库)。(Pull #26)
  • 官方支持 Python 3.9。(Pull #26)

0.2.1 - 2020-03-26

修复

  • 通过使用通用的细胞速率算法 (GCRA) 而不是漏桶算法来改进 max_per_second 实现的健壮性。(Pull #5)

0.2.0 - 2020-03-22

已添加

  • 添加对 Python 3.7 的支持。(Pull #3)

0.1.0 - 2020-03-21

已添加

  • 添加 run_on_each()run_all()amap()run_any(),带有 max_at_oncemax_per_second 选项。(Pull #1)

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

aiometer-0.5.0.tar.gz (18.3 kB 查看哈希值)

上传时间

构建分布

aiometer-0.5.0-py3-none-any.whl (12.3 kB 查看哈希值)

上传时间 Python 3

支持者