用于编写异步程序的Python库。
项目描述
Async-Tamer
您不需要分割代码库或跳过繁琐的步骤来使用async def
。毕竟异步函数只是具有超级能力的普通函数... Async-Tamer通过将async def
视为什么来帮助您利用这些超级能力:标记并减少忙等待的周期。(详情请参阅此readme的末尾。)
功能
- ✅ 100% python
- ✅ 100% free (BSD 3-clause)
- ✅ 100% lean (无依赖项)
安装
pip install async-tamer
使用方法
简而言之,您可以将@tamed
添加到异步函数中,以便从同步和异步上下文直接调用它。您还可以将@tamed
函数分配给AsyncScope
以获得结构化生命周期管理。
import asyncio
from tamer import tamed, AsyncScope
@tamed # <-- Notice the decorator
async def slow_echo(msg:str, delay:int) -> None:
await asyncio.sleep(delay)
print(msg)
slow_echo("sync > DELAY(.5)", 0.5)
with AsyncScope() as scope:
slow_echo("scope > DELAY(.2)", .2, _async_scope=scope)
slow_echo("scope > DELAY(.1)", .1, _async_scope=scope)
# implicit await :)
# Output
# ------
#
# sync > DELAY(.5)
# scope > DELAY(.1)
# scope > DELAY(.2)
@tamed
装饰器
一个@tamed
异步函数会根据其被调用的上下文改变其执行策略(行为方式)。在同步上下文中,它表现得像普通函数(阻塞)。在异步上下文中,它表现得像普通协程(非阻塞),并且当分配给AsyncScope
时,它遵循作用域上下文管理器(非阻塞)。
import asyncio
from tamer import tamed, AsyncScope
@tamed # <-- notice the decorator
async def slow_echo(msg:str, delay:int) -> None:
await asyncio.sleep(delay)
print(msg)
# ============================
# Asynchronous Execution
# ============================
async def main():
first = slow_echo("async > DELAY(1)", 1)
second = slow_echo("async > DELAY(.1)", 0.1)
third = slow_echo("async > DELAY(.5)", 0.5)
# Don't forget the await!
await second
await asyncio.gather(first, third)
asyncio.run(main())
# Output
# ------
#
# async > DELAY(.1)
# async > DELAY(.5)
# async > DELAY(1)
# ============================
# Synchronous Execution
# ============================
slow_echo("sync > DELAY(1)", 1)
slow_echo("sync > DELAY(.1)", 0.1)
slow_echo("sync > DELAY(.5)", 0.5)
# Output
# ------
#
# sync > DELAY(1)
# sync > DELAY(.1)
# sync > DELAY(.5)
# ============================
# AsyncScope Execution
# ============================
with AsyncScope() as scope:
slow_echo("scope > DELAY(1)", 1, _async_scope=scope)
slow_echo("scope > DELAY(.1)", 0.1, _async_scope=scope)
slow_echo("scope > DELAY(.5)", 0.5, _async_scope=scope)
# Output
# ------
#
# scope > DELAY(.1)
# scope > DELAY(.5)
# scope > DELAY(1)
注意:
_async_scope
关键字参数由@tamed
装饰器注入,用于将@tamed
函数添加到AsyncScope
中。您可能想要这样做的原因在AsyncScope
部分中通过示例进行了说明。
返回结果
@tamed
函数知道您何时(以及如何)期望返回结果。
import asyncio
from tamer import tamed, AsyncScope
@tamed
async def slow_io():
await asyncio.sleep(0.1)
return 200, "Time to be awesome!"
# ============================
# Asynchronous Execution
# ============================
async def main():
coro = slow_io() # <-- normal coroutine
return_code, msg = await coro # <-- await the result
print(f"Status {return_code}: `{msg}`")
asyncio.run(main())
# Output
# ------
#
# Status 200: `Time to be awesome!`
# ============================
# Synchronous Execution
# ============================
return_code, msg = slow_io() # <-- immediate result
print(f"Status {return_code}: `{msg}`")
# Output
# ------
#
# Status 200: `Time to be awesome!`
# ============================
# AsyncScope Execution
# ============================
with AsyncScope() as scope:
delayed_result = slow_io(_async_scope=scope)
# <-- implicit await on exit
return_code, msg = delayed_result.value
print(f"Status {return_code}: `{msg}`")
# Output
# ------
#
# Status 200: `Time to be awesome!`
当与AsyncScope
一起使用时,一个@tamed
函数将返回一个DelayedResult
实例。该对象代表@tamed
函数的结果,不应与类似的概念混淆,如Future
、asyncio.Task
或Coroutine
,它们代表并发执行的代码。虽然这些是相关对象,但DelayedResult
更为简单。例如,与代码不同,结果不会执行。因此,您无法取消它们,也无法链式调用回调。它们(结果)只是函数输出的值,而在DelayedResult
的情况下,它是一个后来才到达的值。
您可以使用DelayedResult
在异步上下文中对其进行await
操作,或者使用它来使用.block()
同步上下文,直到它可用。此外,您可以检查其.value
,它将返回结果或如果结果不可用则引发AttributeError
。
import asyncio
from tamer import tamed, AsyncScope
@tamed
async def request(delay:int):
await asyncio.sleep(delay)
return 200, "You are awesome!"
@tamed
async def post_process(raw_result):
ret_code, msg = await raw_result # <-- awaitable in async context
a, b = msg.rsplit(" ", 1)
return ret_code, " ".join((a, "very", b))
with AsyncScope() as scope:
raw_result = request(0.1, _async_scope=scope)
result = post_process(raw_result, _async_scope=scope) # <-- pass it around
try:
return_code, msg = result.value
except AttributeError: # <-- AttributeError, not attived yet
print(f"scope > result: Not yet available.")
result.block() # <-- block in sync context
return_code, msg = result.value
print(f"scope > result: Status {return_code}: `{msg}`")
# Output
# ------
#
# scope > result: Not yet available.
# scope > result: Status 200: `You are very awesome!`
AsyncScope
AsyncScope
管理一组@tamed
函数并控制其生命周期。这是一种将异步部分添加到程序中的结构化方法。简而言之,您需要了解3个关键字
- 调度:启动
@tamed
函数的代码行。 - 切换:切换到另一个执行上下文的代码行。
- 清理:处理错误处理的代码行。
您通过将_async_scope
设置为有意义的值来调用@tamed
函数以进行调度,而AsyncScope
有助于切换和清理。为此,它保证当作用域退出时,作用域内的所有函数都已完成。为了实现这一点,它会在作用域结束时在异步上下文之间进行切换,直到所有函数都完成。请注意,这里的完成并不意味着成功;函数可能会引发异常或被取消。这就是清理部分发挥作用的地方,我们将在“异常管理”部分中稍后介绍。
此外,您可以嵌套作用域。分配给outer_scope
的@tamed
函数独立执行,并与其他来自inner_scope
的@tamed
函数并行执行,并且可以在它们之间进行任意的切换。然而,由于inner_scope
在切换回同步上下文之前等待其所有函数完成,因此在inner_scope
之后调度的新的函数将等待inner_scope
的完成。
import asyncio
from tamer import tamed, AsyncScope
@tamed
async def slow_echo(msg:str, delay:int) -> None:
await asyncio.sleep(delay)
print(msg)
with AsyncScope() as outer_scope:
slow_echo("Outer Scope > DELAY(1.5)", 1.5, _async_scope=outer_scope)
slow_echo("Outer Scope > DELAY(1)", 1, _async_scope=outer_scope)
with AsyncScope() as inner_scope:
slow_echo("Outer Scope > Inner Scope > DELAY(2)", 2, _async_scope=inner_scope)
slow_echo("Outer Scope > Inner Scope > DELAY(1)", 1, _async_scope=inner_scope)
# await inner_scope functions
# Note: scheduled after inner scope has finished
slow_echo("Outer Scope > DELAY(.5)", 0.5, _async_scope=outer_scope)
# Output
# ------
#
# Outer Scope > DELAY(1)
# Outer Scope > Inner Scope > DELAY(1)
# Outer Scope > DELAY(1.5)
# Outer Scope > Inner Scope > DELAY(2)
# Outer Scope > DELAY(.5)
这与@tamed
和DelayedResult
一样,不仅适用于同步上下文(with
),也适用于异步上下文(async with
)。
import asyncio
from tamer import tamed, AsyncScope
@tamed
async def slow_echo(msg:str, delay:int) -> None:
await asyncio.sleep(delay)
print(msg)
@tamed
async def slow_bulk_echo() -> None:
async with AsyncScope() as outer_scope: # <-- `async with` in async contexts
slow_echo("Outer Scope > DELAY(1.5)", 1.5, _async_scope=outer_scope)
slow_echo("Outer Scope > DELAY(1)", 1, _async_scope=outer_scope)
async with AsyncScope() as inner_scope:
slow_echo("Outer Scope > Inner Scope > DELAY(2)", 2, _async_scope=inner_scope)
slow_echo("Outer Scope > Inner Scope > DELAY(1)", 1, _async_scope=inner_scope)
# await inner_scope functions
# Note: scheduled after inner scope has finished
slow_echo("Outer Scope > DELAY(.5)", 0.5, _async_scope=outer_scope)
slow_bulk_echo()
# Output
# ------
#
# Outer Scope > DELAY(1)
# Outer Scope > Inner Scope > DELAY(1)
# Outer Scope > DELAY(1.5)
# Outer Scope > Inner Scope > DELAY(2)
# Outer Scope > DELAY(.5)
能够嵌套AsyncScopes
的功能与它的关键字参数特别有用:exit_mode
和error_mode
。正如其名称所暗示的,exit_mode
控制作用域退出时发生的事情,而error_mode
控制分配的函数产生异常时发生的事情。
默认情况下,这些设置为exit_mode="wait"
和error_mode="cancel"
。前者将在作用域结束时“等待”未完成的函数。后者将在其中一个函数失败时“取消”其他未完成的函数。这种行为与asyncio.TaskGroup
或trio.Nursery
匹配。当您批量调用函数时很有用,例如,当进行多个Web API调用或从磁盘读取一批图像时。
或者,您可以使用exit_mode="cancel"
,这将“取消”作用域结束时未完成的函数。这对于关闭“无限循环”或取消您认为需要但实际不需要的数据请求非常有用。
import asyncio
from datetime import datetime
from tamer import tamed, AsyncScope
class RateLimiter:
def __init__(self):
# allow an initial burst
self.max_tokens = 3
self.tokens = self.max_tokens
@tamed
async def generate_tokens(self, delay:int):
while True: # <-- generate new tokens forever
await asyncio.sleep(delay)
self.tokens = min(self.tokens + 1, self.max_tokens)
@tamed
async def get_token(self):
# Note: This would not work with threads, but is perfectly
# fine in asyncio
while self.tokens == 0:
await asyncio.sleep(0)
self.tokens -= 1
return True
@tamed
async def fake_request(rate_limiter):
await rate_limiter.get_token()
print(datetime.now().strftime("%H:%M:%S.%f"), "Requesting...")
throttle = RateLimiter()
with AsyncScope(exit_mode="cancel") as service_layer:
throttle.generate_tokens(1, _async_scope=service_layer)
with AsyncScope() as batch:
for _ in range(6):
fake_request(throttle, _async_scope=batch)
# <-- wait for all requests to finish
# <-- cancel the rate limiter
# Output
# ------
# 00:22:28.348290 Requesting...
# 00:22:28.348436 Requesting...
# 00:22:28.348564 Requesting...
# 00:22:29.347495 Requesting...
# 00:22:30.347555 Requesting...
# 00:22:31.347597 Requesting...
异常管理
不幸的是,事情总是会出错。如果发生这种情况,Python会引发异常,而您作为程序的作者必须决定如何响应。@tamed
异步函数遵循同样的规则,它们与普通函数没有区别。
from tamer import tamed, AsyncScope
@tamed
async def faulty_function()
raise RuntimeError("Oh no!")
# ============================
# Asynchronous Execution
# ============================
async def main():
coro = faulty_function()
try:
await coro
except RuntimeError:
print("Actually, I'm good.")
asyncio.run(main())
# Output
# ------
#
# Actually, I'm good.
# ============================
# Synchronous Execution
# ============================
try:
faulty_function()
except RuntimeError:
print("Actually, I'm good.")
# Output
# ------
#
# Actually, I'm good.
# ============================
# AsyncScope Execution
# ============================
with AsyncScope() as scope:
delayed_result = faulty_function(_async_scope=scope)
try:
delayed_result.block()
except RuntimeError:
print("Actually, I'm good.")
# Output
# ------
#
# Actually, I'm good.
一个特殊案例是 AsyncScope
中的函数。在这里,您使用 DelayedResult.value
消费结果,但在等待结果时通过 DelayedResult.block()
或 await delayed_result
处理异常。这是故意的,因为任何在上述语句之后的代码现在都可以断言结果已经 成功 到达。
AsyncScope
结尾处的隐式 await
作为通用捕获,可以引发您未显式等待的所有异常。这确保了没有异常被遗漏,并且您的程序不会产生意外的副作用。
from tamer import tamed, AsyncScope
@tamed
async def faulty_function()
raise RuntimeError("Oh no!")
with AsyncScope() as scope:
result = faulty_function(_async_scope=scope)
# Output (excerpt)
# ----------------
#
# Traceback (most recent call last):
# [...]
# RuntimeError: Oh no!
作者注
注意:这一节相当哲学性,更多是关于这个包为什么存在,而不是如何使用它。这是“一个老头的絮叨”,所以如果你选择跳过,我不会告诉任何人 :)
如果您在网上阅读有关 Python 的 asyncio 库的内容,您会发现人们通常以性能为导向来处理这个库。然而,只有一些人真正看到了它的好处。这导致了对该库的混杂,但通常是负面的情绪,从 asyncio 被称为“虚假并行”,“太复杂而难以使用”,到“有用但非常窄领域”。
在我的理解中,这种情绪通常是由三个原因混合造成的,导致用户认为 asyncio 应该像线程一样使用。
- 用户认为 asyncio 在底层使用线程级并行,或者用于实现绿色线程。
- Asyncio 使用类似线程的语义(请参阅文末表格)。
- 许多在线文档和教程指出,
async/await
与基于线程的 Web 服务器实现相比提供了巨大的性能提升。
在思考编写线程时编写异步代码并不错……但这是不完整的,并且在我看来是误导的。为什么?因为它们是两种不同类型的并行。线程将一大块工作分成小块,并在多个核心上并行处理多个小块。Async/await
将一大块工作分成小块,标记忙等待的时段,并重新排序指令以最小化空闲时间。换句话说,线程使用线程级并行(废话!),而 async/await
使用指令级并行。两者都是并行形式,但它们 不 是同一件事。
线程 | Async/Await | |
---|---|---|
编排 | 主线程 + 无限循环 | 事件循环 |
创建 | tid = Thread(fn, ...) |
coro = fn(...) |
同步 | tid.join() |
await coro |
数据交换 | 共享内存 | 共享内存 |
开销 | ~50 µs (操作系统级线程) | 无(函数调用) |
并发 | 抢占式 | 合作式 |
隐式并行
我们不应该将 async/await
视为多线程代码,而应该将其视为分割数据加载和消费的代码。它知道输入数据来自外部系统(数据库、套接字、文件系统、blob 存储等),这些外部系统速度慢,并且程序的大部分时间都花在等待数据上。
asyncio 的巧妙之处在于意识到我们只能逐条执行指令(感谢 GIL),但我们可以并行等待任意数量的外部系统。传统的(同步)代码会请求一块数据,在它被准备期间(忙等待)什么也不做,消费它,然后继续请求下一块数据。异步代码首先请求所有数据,并行忙等待所有数据到达,然后逐个消费数据,而不再等待。这减少了空闲时间从(同步)的 sum(load_times)
到(异步)的 max(load_times)
;这是一个被称为隐式并行或如果对编译器和 CPU 的工作原理感兴趣,则是指令级并行的技巧。
此外,如果我们的工作负载允许处理无序数据,我们将在开始时请求所有数据,忙等直到第一份数据到来,处理它,然后转到下一份数据或继续忙等,直到再次接收到数据。这样,我们在min(load_times)
和max(load_times)
之间忙等,与同步情况下的sum(load_times)
相比,可以大幅提高速度。
为什么创建 Async-Tamer
理解async/await
背后的并行工作原理,并摆脱线程式设计,可以简化代码及其设计。加载数据的函数变为async def
,而消费数据的函数保持不变。不幸的是,目前使用asyncio表达这一点并不简单。我们无法在async
函数外使用await
,执行async
函数需要启动事件循环并协调它。
因此,我编写了async-tamer。它限制了async
在代码库中传播的范围,即它@tamed
了async
关键字。在等待数据时,我们需要事件循环及其隐含的并行性。在本地处理数据时,我们不需要事件循环,如果我们尝试这样做,就会陷入困境。程序的两个部分应该清晰地分离,async def
(等待外部事物)与def
(处理本地事物)非常适合这种需求。
库的其余部分实际上是语法糖,并尝试删除尽可能多的样板代码。我非常喜欢trio中的结构化并发概念,这正是我们在加载数据的“初始批次”时所需要的。因此,AsyncScopes
以类似的方式工作。然而,我也认为Nurseries
和(asyncio)TaskGroups
缺乏细粒度控制,添加生命周期kwargs感觉是这一想法的自然扩展。
感谢您一直看到最后,祝您编码愉快!
项目详情
下载文件
下载适合您平台的应用程序。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分布
构建分布
async_tamer-1.0.0.tar.gz的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | dfa1dead8d3f95126172d5013697771e579e4b52a21fa92460e06883d949dc46 |
|
MD5 | 7732de31150e5c8da8ab6fd76550313e |
|
BLAKE2b-256 | 582da70f527c7d9e90b7a2d5af18e7b8fe7782e0d70a926ef06c359ac52816dd |
async_tamer-1.0.0-py3-none-any.whl的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | a6e5856ffa78a59d6b3cc627ddb61f900276615c745205c2bc86d93e273e48b1 |
|
MD5 | 71b6ce8b3f77e516d64c8922885b863e |
|
BLAKE2b-256 | c456bd3cce51c8202e56c43b3f98aaff45fd66b1e3b339a70a748583ab18d5ef |