习惯性的asyncio工具
项目描述
aiotools
习惯性的asyncio utilties
注意: 此项目处于早期开发阶段。公共API可能会在版本间发生变化。
模块
我还推荐您尝试以下asyncio库,以使您的生活更加愉快。
- async_timeout:提供轻量级超时包装,不启动子任务。
- aiojobs:提供异步任务的无缝关闭的并发限制调度器。
- trio:Python异步IO堆栈的替代实现,重点在于取消作用域和称为“nursery”的任务组。
示例
Async上下文管理器
这是contextlib.contextmanager的异步版本,以便更容易编写不创建样板类的异步上下文管理器。
import asyncio
import aiotools
@aiotools.actxmgr
async def mygen(a):
await asyncio.sleep(1)
yield a + 1
await asyncio.sleep(1)
async def somewhere():
async with mygen(1) as b:
assert b == 2
请注意,您需要用try-finally块包装yield,以确保即使在异步with块内部发生异常时也能释放资源(例如,锁)。
import asyncio
import aiotools
lock = asyncio.Lock()
@aiotools.actxmgr
async def mygen(a):
await lock.acquire()
try:
yield a + 1
finally:
lock.release()
async def somewhere():
try:
async with mygen(1) as b:
raise RuntimeError('oops')
except RuntimeError:
print('caught!') # you can catch exceptions here.
您还可以创建一组异步上下文管理器,这些管理器可以使用asyncio.gather()同时进入/退出。
import asyncio
import aiotools
@aiotools.actxmgr
async def mygen(a):
yield a + 10
async def somewhere():
ctxgrp = aiotools.actxgroup(mygen(i) for i in range(10))
async with ctxgrp as values:
assert len(values) == 10
for i in range(10):
assert values[i] == i + 10
Async服务器
这实现了启动基于asyncio的服务守护进程的常见模式。
import asyncio
import aiotools
async def echo(reader, writer):
data = await reader.read(100)
writer.write(data)
await writer.drain()
writer.close()
@aiotools.server
async def myworker(loop, pidx, args):
server = await asyncio.start_server(echo, '0.0.0.0', 8888,
reuse_port=True, loop=loop)
print(f'[{pidx}] started')
yield # wait until terminated
server.close()
await server.wait_closed()
print(f'[{pidx}] terminated')
if __name__ == '__main__':
# Run the above server using 4 worker processes.
aiotools.start_server(myworker, num_workers=4)
它自动处理SIGINT/SIGTERM信号以停止服务器,以及运行在多个进程上的事件循环的生命周期管理。内部使用aiotools.fork
模块获取内核支持,通过支持版本的PID文件描述符来解决潜在的信号/PID相关的竞争条件(Python 3.9+和Linux内核5.4+)。
Async任务组
TaskGroup
对象通过异步上下文管理器管理通过其create_task()
方法创建的子任务的生命周期,只有当所有子任务都完成或取消时,上下文管理器才会退出。
这受自trio的nursery API,并从EdgeDB的Python客户端库中采用了草稿实现。
import aiotools
async def do():
async with aiotools.TaskGroup() as tg:
tg.create_task(...)
tg.create_task(...)
...
# at this point, all subtasks are either cancelled or done.
Async定时器
import aiotools
i = 0
async def mytick(interval):
print(i)
i += 1
async def somewhere():
t = aiotools.create_timer(mytick, 1.0)
...
t.cancel()
await t
t
是一个asyncio.Task
对象。要停止计时器,请调用t.cancel(); await t
。请记住不要忘记await
t
,因为它需要额外的步骤来取消和等待所有挂起的任务。为了使计时器函数可取消,请添加一个捕获asyncio.CancelledError
的try-except子句,因为我们将其用作终止信号。
您可以将TimerDelayPolicy
参数添加到控制计时器触发的任务执行时间超过计时器间隔时的行为。默认情况下是累积它们,并在计时器取消时一次性取消所有剩余的计时器。如果选择CANCEL
,则在每个间隔取消任何挂起的先前触发的任务。
import asyncio
import aiotools
async def mytick(interval):
await asyncio.sleep(100) # cancelled on every next interval.
async def somewhere():
t = aiotools.create_timer(mytick, 1.0, aiotools.TimerDelayPolicy.CANCEL)
...
t.cancel()
await t
虚拟时钟
它提供了一个虚拟时钟,通过临时修补事件循环选择器,在多个协程任务中的任何组合的asyncio.sleep()
调用时立即推进事件循环时间。
这也在我们的计时器测试套件中使用。
import aiotools
import pytest
@pytest.mark.asyncio
async def test_sleeps():
loop = aiotools.compat.get_running_loop()
vclock = aiotools.VirtualClock()
with vclock.patch_loop():
print(loop.time()) # -> prints 0
await asyncio.sleep(3600)
print(loop.time()) # -> prints 3600
变更日志
1.7.0 (2023-08-25)
破坏性更改
- 已停止对Python 3.7的支持,因为它已达到生命周期的终点。
修复
- 修正了
create_timer()
中回调参数的类型注解(#61)
1.6.1 (2023-05-02)
修复
- PersistentTaskGroup不再存储未处理的异常的历史记录,而是将它们作为异常组抛出,以防止内存泄漏(#54)
1.6.0 (2023-03-14)
新功能
- 添加了
as_completed_safe()
,它使用PersistentTaskGroup
增强了asyncio.as_completed()
(#52)
1.5.9 (2022-04-26)
修复
- 改进了pidfd可用性的检查,以避免在Linux内核5.1和5.2上的边缘情况失败,在这些内核上,
signal.pidfd_send_signal()
可用,但os.pidfd_open()
不可用(#51)
1.5.8 (2022-04-25)
修复
- 在首次初始化时显式地将事件循环附加到
PidfdChildWatcher
(#50)
1.5.7 (2022-04-12)
修复
- 通过删除在#47中造成误导性修复的未使用代码,修复了macOS上默认导入的回归(#49)
1.5.6 (2022-04-11)
新功能
- 除了
aclosing()
之外,还添加了异步上下文管理器closing_async()
(#48)
修复
- 允许在Windows平台上导入aiotools,从默认的
__all__
导入列表中删除不兼容的模块(#47)
1.5.5 (2022-03-22)
新功能
- 将
wait_timeout
选项添加到start_server()
中(#46)
修复
- 通过最小化在
afork()
子进程中暴露事件循环来解决信号竞争问题(#46)
杂项
- 现在CI使用Python 3.11a6或更高版本运行,支持stdlib的
asyncio.TaskGroup
(#45)
1.5.4 (2022-03-10)
新功能
- 如果
create_task()
的调用者通过await
等待它们,则通过单独的未来实例传播任务结果和异常,除了调用任务组异常处理器。请注意,在Python 3.6中,等待这些未来实例会无限期地挂起,但我们不会修复它,因为Python 3.6自2021年12月起已达到生命周期的终点。(#44)
1.5.3 (2022-03-07)
修复
- 修复了
ExceptionGroup
的特征检测,并将MultiError
继承自ExceptionGroup
而不是BaseExceptionGroup
(#42)
1.5.2 (2022-03-06)
修复
1.5.1 (2022-03-06)
修复
- 修复了Python 3.11中
TaskGroup
缺少命名支持的问题 (#39)
1.5.0 (2022-03-06)
新功能
- 为Python 3.9的
Task.cancel()
添加了对msg
参数的支持 (#32) - 修复了
TaskGroup
中的“意外取消”错误 (#35) - 重写PersistentTaskGroup,使用Python 3.11的最新功能如
Task.uncancel()
和Task.cancelling()
,同时仍然支持较旧的Python版本 (#36) - 添加了
PersistentTaskGroup.all()
以列出所有未终止的持久化任务组 (#38)
1.4.0 (2022-01-10)
新功能
1.3.0 (2021-12-19)
修复
- 添加了对Python 3.10的支持 (#28)
文档更改
- 通过删除第三方autodoc-typehints扩展和自定义样式表覆盖,修复了在Python 3.10和Sphinx 4.x上构建文档的问题 (#28)
1.2.2 (2021-06-07)
修复
- fork: 在
PidfdChildProcess
中显式处理子进程的段错误(使用信号生成核心转储) (#27)
1.2.1 (2021-01-12)
修复
- 避免自定义
clone()
函数的副作用,并暂时回到os.fork()
和os.pidfd_open()
的组合 (#25)
1.2.0 (2021-01-12)
重大更改
- server: 完全弃用了
start_server()
的use_threading
参数 (#23)
新功能
- 现在的主要目标是Python 3.9,尽管我们仍然支持从Python 3.6开始 (#22)
- fork: 添加了一个新的模块
fork
,以支持Linux 5.4+中的PID文件描述符和POSIX兼容的回退,以异步地在不发生信号/PID竞态的情况下分叉Python进程 (#22) - server: 使用新的
fork
模块重新编写了模块,并处理了各种边缘情况,例如兄弟子进程的异步失败 (#23)
1.1.1 (2020-12-16)
修复
- 修复了当使用长时间运行的异步任务时,
TaskGroup
可能出现的内存泄漏 (#21)
1.1.0 (2020-10-18)
新功能
- 将任务组模块中的
current_taskgroup
上下文变量添加到任务组模块(仅适用于Python 3.7或更高版本)
修复
- 修复了在
aiotools
根包中缺少taskgroup
模块导出的自动导入
1.0.0 (2020-10-18)
新功能
- 采用来自EdgeDB的EdgeDB的任务组API实现作为
aiotools.taskgroup
(#18) - 添加了
timer.VirtualClock
,它提供了一个虚拟时钟,使得使用asyncio.sleep()
的asyncio代码块可以立即且确定性地完成 (#19)
杂项
0.9.1 (2020-02-25)
- 一个维护版本,修复了
aiotools
命名空间中的defer
模块导出
0.9.0 (2020-02-25)
- defer: 一个新模块,使用asyncio感知模拟Golang的
defer()
API。
0.8.5 (2019-11-19)
- 服务器: 将工作主函数的内部重写为使用原生
async with
而不是手动展开__aenter__()
和__aexit__()
特殊方法,以保持代码简单并避免潜在的错误。
0.8.4 (2019-11-18)
- Python 3.8 现已官方支持。
- 服务器: 修复当使用
multiprocessing.set_start_method("spawn")
时的错误。- 注意:自 Python 3.8 以来,这已成为 macOS 的默认设置。
- 已知问题:#12
- 从
__init__.py
中移除一些打包黑客技巧,并让 setuptools 从单独的aiotools/VERSION
文本文件中读取版本。
0.8.3 (2019-10-07)
- 上下文: 修复
aclosing()
的__aexit__()
异常参数。
0.8.2 (2019-08-28)
- 上下文,服务器: 在 Python 3.6、3.7 和 3.8 中,捕获 asyncio.CancelledError 和 BaseException 以保持取消行为的一致性。
0.8.1 (2019-02-24)
- 服务器: 修复在主/工作上下文管理器中使用线程式工作时接收到的停止信号的输出。
0.8.0 (2018-11-18)
- 服务器: 更新停止信号处理,现在用户定义的工作/主上下文管理器可以区分接收到的停止信号。有关更多详细信息,请参阅更新后的文档。
0.7.3 (2018-10-16)
- 这是一个技术版本,用于修复阻止自动化 CI 发布流程的测试用例。
0.7.2 (2018-10-16)
- 使用针对 asyncio 的小型兼容性模块改进对 Python 3.6/3.7 的支持。
- func:将
expire_after
选项添加到lru_cache()
函数。
0.7.1 (2018-08-24)
- 文档的少量更新
0.7.0 (2018-08-24)
- 添加对 Python 3.7 的支持
- 上下文: 更新以类似于 Python 3.7 的方式工作
- 上下文: 在 Python 3.7+ 中弃用
AsyncContextDecorator
功能 - 上下文: 在标准库中添加了对
contextlib.AsyncExitStack
的别名。
0.6.0 (2018-04-10)
- 引入一个新的模块
aiotools.iter
,其中包含aiter()
函数,它与内置的异步版本iter()
对应。
0.5.4 (2018-02-01)
- 服务器: 移除不必要的 setpgrp 系统调用,该系统调用也由 Docker 的默认 seccomp 配置阻止!
0.5.3 (2018-01-12)
- 服务器: 哎呀!(一个 finally 块应该是一个 else 块)
0.5.2 (2018-01-12)
- 服务器: 改善内在美(代码可读性)
- 服务器: 改善工作到主中断的可靠性和可移植性
0.5.1 (2018-01-11)
- 服务器: 修复与多个工作者的初始化错误处理相关的竞争条件
0.5.0 (2017-11-08)
- func: 添加
lru_cache()
,它是functools.lru_cache()
的协程版本
0.4.5 (2017-10-14)
- 服务器: 修复在模块终止期间信号处理相关的竞争条件
- 服务器: 改善初始化工作者时的错误处理(在记录异常后自动关闭其他工作者和主循环)
0.4.4 (2017-09-12)
- 添加一个新的模块
aiotools.func
,其中包含apartial()
函数,它是标准库中functools.partial()
的异步版本
0.4.3 (2017-08-06)
- 添加类似于标准库中的
closing()
的aclosing()
上下文管理器 - 加快 Travis CI 打包构建速度
- 现在同时提供 rst 格式的 README 和 CHANGES(此文件)
0.4.2 (2017-08-01)
server
:修复子工作者中启动子进程- 添加对
uvloop
的支持
0.4.0 (2017-08-01)
- 将
use_threading
参数添加到 - 添加初始文档(由于 Python 版本问题,目前未在 readthedocs.io 上提供)
0.3.2 (2017-07-31)
- 将
extra_procs
参数添加到start_server()
函数 - 添加套接字和 ZeroMQ 服务器示例
- 改进 CI 配置
0.3.1 (2017-07-26)
- 改进 CI 脚本
- 采用 editorconfig
0.3.0 (2017-04-26)
- 使用多进程和自动子生命周期管理添加
start_server()
函数 - 使用
asyncio.gather()
和return_exceptions=True
明确AsyncContextGroup
的语义
0.2.0 (2017-04-20)
- 添加
AsyncContextManager
的抽象类型 - 将
AsyncGenContextManager
重命名为AsyncContextManager
- 添加
AsyncContextGroup
0.1.1 (2017-04-14)
- 初始发布
项目详情
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源分发
构建分发
aiotools-1.7.0.tar.gz 的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | 2f348af526e6e02485341924af88daaf79a1f7b2e9284544ffbf759080d38581 |
|
MD5 | 70a54cd128c1a1b09d88da14bcbc0231 |
|
BLAKE2b-256 | 57fbdb6aacb1683517d6178eb7ae8ac09e125f78a4fd4d97b2dc7ea55c108b6d |
aiotools-1.7.0-py3-none-any.whl 的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | 008e4b448869e00352034106fcab4ed849e62d0ec7c89661d5650006f1a7b3a9 |
|
MD5 | 1fc3696ba24c9b5e5d7f671f6e4f05c9 |
|
BLAKE2b-256 | 7f099c8a0985b01b94974f1cca1ea35f5937ed77a62c13e01491efc15719b70f |