跳转到主要内容

习惯性的asyncio工具

项目描述

aiotools

PyPI release version Supported Python versions Test Status Code Coverage

习惯性的asyncio utilties

注意: 此项目处于早期开发阶段。公共API可能会在版本间发生变化。

模块

我还推荐您尝试以下asyncio库,以使您的生活更加愉快。

  • async_timeout:提供轻量级超时包装,不启动子任务。
  • aiojobs:提供异步任务的无缝关闭的并发限制调度器。
  • trio:Python异步IO堆栈的替代实现,重点在于取消作用域和称为“nursery”的任务组。

示例

Async上下文管理器

这是contextlib.contextmanager的异步版本,以便更容易编写不创建样板类的异步上下文管理器。

import asyncio
import aiotools

@aiotools.actxmgr
async def mygen(a):
   await asyncio.sleep(1)
   yield a + 1
   await asyncio.sleep(1)

async def somewhere():
   async with mygen(1) as b:
       assert b == 2

请注意,您需要用try-finally块包装yield,以确保即使在异步with块内部发生异常时也能释放资源(例如,锁)。

import asyncio
import aiotools

lock = asyncio.Lock()

@aiotools.actxmgr
async def mygen(a):
   await lock.acquire()
   try:
       yield a + 1
   finally:
       lock.release()

async def somewhere():
   try:
       async with mygen(1) as b:
           raise RuntimeError('oops')
   except RuntimeError:
       print('caught!')  # you can catch exceptions here.

您还可以创建一组异步上下文管理器,这些管理器可以使用asyncio.gather()同时进入/退出。

import asyncio
import aiotools

@aiotools.actxmgr
async def mygen(a):
   yield a + 10

async def somewhere():
   ctxgrp = aiotools.actxgroup(mygen(i) for i in range(10))
   async with ctxgrp as values:
       assert len(values) == 10
       for i in range(10):
           assert values[i] == i + 10

Async服务器

这实现了启动基于asyncio的服务守护进程的常见模式。

import asyncio
import aiotools

async def echo(reader, writer):
   data = await reader.read(100)
   writer.write(data)
   await writer.drain()
   writer.close()

@aiotools.server
async def myworker(loop, pidx, args):
   server = await asyncio.start_server(echo, '0.0.0.0', 8888,
       reuse_port=True, loop=loop)
   print(f'[{pidx}] started')
   yield  # wait until terminated
   server.close()
   await server.wait_closed()
   print(f'[{pidx}] terminated')

if __name__ == '__main__':
   # Run the above server using 4 worker processes.
   aiotools.start_server(myworker, num_workers=4)

它自动处理SIGINT/SIGTERM信号以停止服务器,以及运行在多个进程上的事件循环的生命周期管理。内部使用aiotools.fork模块获取内核支持,通过支持版本的PID文件描述符来解决潜在的信号/PID相关的竞争条件(Python 3.9+和Linux内核5.4+)。

Async任务组

TaskGroup对象通过异步上下文管理器管理通过其create_task()方法创建的子任务的生命周期,只有当所有子任务都完成或取消时,上下文管理器才会退出。

这受自trio的nursery API,并从EdgeDB的Python客户端库中采用了草稿实现。

import aiotools

async def do():
    async with aiotools.TaskGroup() as tg:
        tg.create_task(...)
        tg.create_task(...)
        ...
    # at this point, all subtasks are either cancelled or done.

Async定时器

import aiotools

i = 0

async def mytick(interval):
   print(i)
   i += 1

async def somewhere():
   t = aiotools.create_timer(mytick, 1.0)
   ...
   t.cancel()
   await t

t是一个asyncio.Task对象。要停止计时器,请调用t.cancel(); await t。请记住不要忘记awaitt,因为它需要额外的步骤来取消和等待所有挂起的任务。为了使计时器函数可取消,请添加一个捕获asyncio.CancelledError的try-except子句,因为我们将其用作终止信号。

您可以将TimerDelayPolicy参数添加到控制计时器触发的任务执行时间超过计时器间隔时的行为。默认情况下是累积它们,并在计时器取消时一次性取消所有剩余的计时器。如果选择CANCEL,则在每个间隔取消任何挂起的先前触发的任务。

import asyncio
import aiotools

async def mytick(interval):
   await asyncio.sleep(100)  # cancelled on every next interval.

async def somewhere():
   t = aiotools.create_timer(mytick, 1.0, aiotools.TimerDelayPolicy.CANCEL)
   ...
   t.cancel()
   await t

虚拟时钟

它提供了一个虚拟时钟,通过临时修补事件循环选择器,在多个协程任务中的任何组合的asyncio.sleep()调用时立即推进事件循环时间。

这也在我们的计时器测试套件中使用。

import aiotools
import pytest

@pytest.mark.asyncio
async def test_sleeps():
    loop = aiotools.compat.get_running_loop()
    vclock = aiotools.VirtualClock()
    with vclock.patch_loop():
        print(loop.time())  # -> prints 0
        await asyncio.sleep(3600)
        print(loop.time())  # -> prints 3600

变更日志

1.7.0 (2023-08-25)

破坏性更改

  • 已停止对Python 3.7的支持,因为它已达到生命周期的终点。

修复

  • 修正了create_timer()中回调参数的类型注解(#61

1.6.1 (2023-05-02)

修复

  • PersistentTaskGroup不再存储未处理的异常的历史记录,而是将它们作为异常组抛出,以防止内存泄漏(#54

1.6.0 (2023-03-14)

新功能

  • 添加了as_completed_safe(),它使用PersistentTaskGroup增强了asyncio.as_completed()#52

1.5.9 (2022-04-26)

修复

  • 改进了pidfd可用性的检查,以避免在Linux内核5.1和5.2上的边缘情况失败,在这些内核上,signal.pidfd_send_signal()可用,但os.pidfd_open()不可用(#51

1.5.8 (2022-04-25)

修复

  • 在首次初始化时显式地将事件循环附加到PidfdChildWatcher#50

1.5.7 (2022-04-12)

修复

  • 通过删除在#47中造成误导性修复的未使用代码,修复了macOS上默认导入的回归(#49

1.5.6 (2022-04-11)

新功能

  • 除了aclosing()之外,还添加了异步上下文管理器closing_async()#48

修复

  • 允许在Windows平台上导入aiotools,从默认的__all__导入列表中删除不兼容的模块(#47

1.5.5 (2022-03-22)

新功能

  • wait_timeout选项添加到start_server()中(#46

修复

  • 通过最小化在afork()子进程中暴露事件循环来解决信号竞争问题(#46

杂项

  • 现在CI使用Python 3.11a6或更高版本运行,支持stdlib的asyncio.TaskGroup#45

1.5.4 (2022-03-10)

新功能

  • 如果create_task()的调用者通过await等待它们,则通过单独的未来实例传播任务结果和异常,除了调用任务组异常处理器。请注意,在Python 3.6中,等待这些未来实例会无限期地挂起,但我们不会修复它,因为Python 3.6自2021年12月起已达到生命周期的终点。(#44

1.5.3 (2022-03-07)

修复

  • 修复了ExceptionGroup的特征检测,并将MultiError继承自ExceptionGroup而不是BaseExceptionGroup (#42)

1.5.2 (2022-03-06)

修复

  • 为了向后兼容,恢复了MultiError的默认导出 (#40)
  • 仅在通过async with语句使用PersistentTaskGroup时设置current_ptaskgroup。 (#41)

1.5.1 (2022-03-06)

修复

  • 修复了Python 3.11中TaskGroup缺少命名支持的问题 (#39)

1.5.0 (2022-03-06)

新功能

  • 为Python 3.9的Task.cancel()添加了对msg参数的支持 (#32)
  • 修复了TaskGroup中的“意外取消”错误 (#35)
  • 重写PersistentTaskGroup,使用Python 3.11的最新功能如Task.uncancel()Task.cancelling(),同时仍然支持较旧的Python版本 (#36)
  • 添加了PersistentTaskGroup.all()以列出所有未终止的持久化任务组 (#38)

1.4.0 (2022-01-10)

新功能

  • ptaskgroup: 实现PersistentTaskGroup (#30)
  • server: 为工作进程公开process_index上下文变量 (#31)

1.3.0 (2021-12-19)

修复

  • 添加了对Python 3.10的支持 (#28)

文档更改

  • 通过删除第三方autodoc-typehints扩展和自定义样式表覆盖,修复了在Python 3.10和Sphinx 4.x上构建文档的问题 (#28)

1.2.2 (2021-06-07)

修复

  • fork:PidfdChildProcess中显式处理子进程的段错误(使用信号生成核心转储) (#27)

1.2.1 (2021-01-12)

修复

  • 避免自定义clone()函数的副作用,并暂时回到os.fork()os.pidfd_open()的组合 (#25)

1.2.0 (2021-01-12)

重大更改

  • server: 完全弃用了start_server()use_threading参数 (#23)

新功能

  • 现在的主要目标是Python 3.9,尽管我们仍然支持从Python 3.6开始 (#22)
  • fork: 添加了一个新的模块fork,以支持Linux 5.4+中的PID文件描述符和POSIX兼容的回退,以异步地在不发生信号/PID竞态的情况下分叉Python进程 (#22)
  • server: 使用新的fork模块重新编写了模块,并处理了各种边缘情况,例如兄弟子进程的异步失败 (#23)

1.1.1 (2020-12-16)

修复

  • 修复了当使用长时间运行的异步任务时,TaskGroup可能出现的内存泄漏 (#21)

1.1.0 (2020-10-18)

新功能

  • 将任务组模块中的current_taskgroup上下文变量添加到任务组模块(仅适用于Python 3.7或更高版本)

修复

  • 修复了在aiotools根包中缺少taskgroup模块导出的自动导入

1.0.0 (2020-10-18)

新功能

  • 采用来自EdgeDB的EdgeDB的任务组API实现作为aiotools.taskgroup (#18)
  • 添加了timer.VirtualClock,它提供了一个虚拟时钟,使得使用asyncio.sleep()的asyncio代码块可以立即且确定性地完成 (#19)

杂项

  • 采用towncrier进行更改日志管理 (#15)
  • 迁移到GitHub Actions进行CI (#19)

0.9.1 (2020-02-25)

  • 一个维护版本,修复了aiotools命名空间中的defer模块导出

0.9.0 (2020-02-25)

  • defer: 一个新模块,使用asyncio感知模拟Golang的defer() API。

0.8.5 (2019-11-19)

  • 服务器: 将工作主函数的内部重写为使用原生 async with 而不是手动展开 __aenter__()__aexit__() 特殊方法,以保持代码简单并避免潜在的错误。

0.8.4 (2019-11-18)

  • Python 3.8 现已官方支持。
  • 服务器: 修复当使用 multiprocessing.set_start_method("spawn") 时的错误。
    • 注意:自 Python 3.8 以来,这已成为 macOS 的默认设置。
    • 已知问题:#12
  • __init__.py 中移除一些打包黑客技巧,并让 setuptools 从单独的 aiotools/VERSION 文本文件中读取版本。

0.8.3 (2019-10-07)

  • 上下文: 修复 aclosing()__aexit__() 异常参数。

0.8.2 (2019-08-28)

  • 上下文服务器: 在 Python 3.6、3.7 和 3.8 中,捕获 asyncio.CancelledError 和 BaseException 以保持取消行为的一致性。

0.8.1 (2019-02-24)

  • 服务器: 修复在主/工作上下文管理器中使用线程式工作时接收到的停止信号的输出。

0.8.0 (2018-11-18)

  • 服务器: 更新停止信号处理,现在用户定义的工作/主上下文管理器可以区分接收到的停止信号。有关更多详细信息,请参阅更新后的文档。

0.7.3 (2018-10-16)

  • 这是一个技术版本,用于修复阻止自动化 CI 发布流程的测试用例。

0.7.2 (2018-10-16)

  • 使用针对 asyncio 的小型兼容性模块改进对 Python 3.6/3.7 的支持。
  • func:将 expire_after 选项添加到 lru_cache() 函数。

0.7.1 (2018-08-24)

  • 文档的少量更新

0.7.0 (2018-08-24)

  • 添加对 Python 3.7 的支持
  • 上下文: 更新以类似于 Python 3.7 的方式工作
  • 上下文: 在 Python 3.7+ 中弃用 AsyncContextDecorator 功能
  • 上下文: 在标准库中添加了对 contextlib.AsyncExitStack 的别名。

0.6.0 (2018-04-10)

  • 引入一个新的模块 aiotools.iter,其中包含 aiter() 函数,它与内置的异步版本 iter() 对应。

0.5.4 (2018-02-01)

  • 服务器: 移除不必要的 setpgrp 系统调用,该系统调用也由 Docker 的默认 seccomp 配置阻止!

0.5.3 (2018-01-12)

  • 服务器: 哎呀!(一个 finally 块应该是一个 else 块)

0.5.2 (2018-01-12)

  • 服务器: 改善内在美(代码可读性)
  • 服务器: 改善工作到主中断的可靠性和可移植性

0.5.1 (2018-01-11)

  • 服务器: 修复与多个工作者的初始化错误处理相关的竞争条件

0.5.0 (2017-11-08)

  • func: 添加 lru_cache(),它是 functools.lru_cache() 的协程版本

0.4.5 (2017-10-14)

  • 服务器: 修复在模块终止期间信号处理相关的竞争条件
  • 服务器: 改善初始化工作者时的错误处理(在记录异常后自动关闭其他工作者和主循环)

0.4.4 (2017-09-12)

  • 添加一个新的模块 aiotools.func,其中包含 apartial() 函数,它是标准库中 functools.partial() 的异步版本

0.4.3 (2017-08-06)

  • 添加类似于标准库中的 closing()aclosing() 上下文管理器
  • 加快 Travis CI 打包构建速度
  • 现在同时提供 rst 格式的 README 和 CHANGES(此文件)

0.4.2 (2017-08-01)

  • server:修复子工作者中启动子进程
  • 添加对 uvloop 的支持

0.4.0 (2017-08-01)

  • use_threading 参数添加到
  • 添加初始文档(由于 Python 版本问题,目前未在 readthedocs.io 上提供)

0.3.2 (2017-07-31)

  • extra_procs 参数添加到 start_server() 函数
  • 添加套接字和 ZeroMQ 服务器示例
  • 改进 CI 配置

0.3.1 (2017-07-26)

  • 改进 CI 脚本
  • 采用 editorconfig

0.3.0 (2017-04-26)

  • 使用多进程和自动子生命周期管理添加 start_server() 函数
  • 使用 asyncio.gather()return_exceptions=True 明确 AsyncContextGroup 的语义

0.2.0 (2017-04-20)

  • 添加 AsyncContextManager 的抽象类型
  • AsyncGenContextManager 重命名为 AsyncContextManager
  • 添加 AsyncContextGroup

0.1.1 (2017-04-14)

  • 初始发布

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

aiotools-1.7.0.tar.gz (60.9 kB 查看散列)

上传

构建分发

aiotools-1.7.0-py3-none-any.whl (37.8 kB 查看散列)

上传 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面