跳转到主要内容

用于编写异步程序的Python库。

项目描述

Async-Tamer

您不需要分割代码库或跳过繁琐的步骤来使用async def。毕竟异步函数只是具有超级能力的普通函数... Async-Tamer通过将async def视为什么来帮助您利用这些超级能力:标记并减少忙等待的周期。(详情请参阅此readme的末尾。)

功能

  • ✅ 100% python
  • ✅ 100% free (BSD 3-clause)
  • ✅ 100% lean (无依赖项)

安装

pip install async-tamer

使用方法

简而言之,您可以将@tamed添加到异步函数中,以便从同步和异步上下文直接调用它。您还可以将@tamed函数分配给AsyncScope以获得结构化生命周期管理。

import asyncio
from tamer import tamed, AsyncScope

@tamed # <-- Notice the decorator
async def slow_echo(msg:str, delay:int) -> None:
    await asyncio.sleep(delay)
    print(msg)

slow_echo("sync > DELAY(.5)", 0.5)

with AsyncScope() as scope:
    slow_echo("scope > DELAY(.2)", .2, _async_scope=scope)
    slow_echo("scope > DELAY(.1)", .1, _async_scope=scope)
# implicit await :)


# Output
# ------
#
# sync > DELAY(.5)
# scope > DELAY(.1)
# scope > DELAY(.2)

@tamed装饰器

一个@tamed异步函数会根据其被调用的上下文改变其执行策略(行为方式)。在同步上下文中,它表现得像普通函数(阻塞)。在异步上下文中,它表现得像普通协程(非阻塞),并且当分配给AsyncScope时,它遵循作用域上下文管理器(非阻塞)。

import asyncio
from tamer import tamed, AsyncScope

@tamed  # <-- notice the decorator
async def slow_echo(msg:str, delay:int) -> None:
    await asyncio.sleep(delay)
    print(msg)

# ============================
# Asynchronous Execution
# ============================

async def main():
    first = slow_echo("async > DELAY(1)", 1)
    second = slow_echo("async > DELAY(.1)", 0.1)
    third = slow_echo("async > DELAY(.5)", 0.5)

    # Don't forget the await!
    await second
    await asyncio.gather(first, third)

asyncio.run(main())

# Output
# ------
#
# async > DELAY(.1)
# async > DELAY(.5)
# async > DELAY(1)


# ============================
# Synchronous Execution
# ============================

slow_echo("sync > DELAY(1)", 1)
slow_echo("sync > DELAY(.1)", 0.1)
slow_echo("sync > DELAY(.5)", 0.5)

# Output
# ------
#
# sync > DELAY(1)
# sync > DELAY(.1)
# sync > DELAY(.5)

# ============================
# AsyncScope Execution
# ============================

with AsyncScope() as scope:
    slow_echo("scope > DELAY(1)", 1, _async_scope=scope)
    slow_echo("scope > DELAY(.1)", 0.1, _async_scope=scope)
    slow_echo("scope > DELAY(.5)", 0.5, _async_scope=scope)

# Output
# ------
#
# scope > DELAY(.1)
# scope > DELAY(.5)
# scope > DELAY(1)

注意: _async_scope关键字参数由@tamed装饰器注入,用于将@tamed函数添加到AsyncScope中。您可能想要这样做的原因在AsyncScope部分中通过示例进行了说明。

返回结果

@tamed函数知道您何时(以及如何)期望返回结果。

import asyncio
from tamer import tamed, AsyncScope

@tamed
async def slow_io():
    await asyncio.sleep(0.1)
    return 200, "Time to be awesome!"

# ============================
# Asynchronous Execution
# ============================

async def main():
    coro = slow_io()  # <-- normal coroutine

    return_code, msg = await coro  # <-- await the result
    print(f"Status {return_code}: `{msg}`")

asyncio.run(main())

# Output
# ------
#
# Status 200: `Time to be awesome!`


# ============================
# Synchronous Execution
# ============================

return_code, msg = slow_io()  # <-- immediate result
print(f"Status {return_code}: `{msg}`")

# Output
# ------
#
# Status 200: `Time to be awesome!`

# ============================
# AsyncScope Execution
# ============================

with AsyncScope() as scope:
    delayed_result = slow_io(_async_scope=scope)
# <-- implicit await on exit

return_code, msg = delayed_result.value
print(f"Status {return_code}: `{msg}`")

# Output
# ------
#
# Status 200: `Time to be awesome!`

当与AsyncScope一起使用时,一个@tamed函数将返回一个DelayedResult实例。该对象代表@tamed函数的结果,不应与类似的概念混淆,如Futureasyncio.TaskCoroutine,它们代表并发执行的代码。虽然这些是相关对象,但DelayedResult更为简单。例如,与代码不同,结果不会执行。因此,您无法取消它们,也无法链式调用回调。它们(结果)只是函数输出的值,而在DelayedResult的情况下,它是一个后来才到达的值。

您可以使用DelayedResult在异步上下文中对其进行await操作,或者使用它来使用.block()同步上下文,直到它可用。此外,您可以检查其.value,它将返回结果或如果结果不可用则引发AttributeError

import asyncio
from tamer import tamed, AsyncScope

@tamed
async def request(delay:int):
    await asyncio.sleep(delay)
    return 200, "You are awesome!"

@tamed
async def post_process(raw_result):
    ret_code, msg = await raw_result  # <-- awaitable in async context
    a, b = msg.rsplit(" ", 1)
    return ret_code, " ".join((a, "very", b))

with AsyncScope() as scope:
    raw_result = request(0.1, _async_scope=scope)
    result = post_process(raw_result, _async_scope=scope)  # <-- pass it around

    try:
        return_code, msg = result.value
    except AttributeError:  # <-- AttributeError, not attived yet
        print(f"scope > result: Not yet available.")

    result.block()  # <-- block in sync context
    return_code, msg = result.value
    print(f"scope > result: Status {return_code}: `{msg}`")

# Output
# ------
#
# scope > result: Not yet available.
# scope > result: Status 200: `You are very awesome!`

AsyncScope

AsyncScope管理一组@tamed函数并控制其生命周期。这是一种将异步部分添加到程序中的结构化方法。简而言之,您需要了解3个关键字

  1. 调度:启动@tamed函数的代码行。
  2. 切换:切换到另一个执行上下文的代码行。
  3. 清理:处理错误处理的代码行。

您通过将_async_scope设置为有意义的值来调用@tamed函数以进行调度,而AsyncScope有助于切换清理。为此,它保证当作用域退出时,作用域内的所有函数都已完成。为了实现这一点,它会在作用域结束时在异步上下文之间进行切换,直到所有函数都完成。请注意,这里的完成并不意味着成功;函数可能会引发异常或被取消。这就是清理部分发挥作用的地方,我们将在“异常管理”部分中稍后介绍。

此外,您可以嵌套作用域。分配给outer_scope@tamed函数独立执行,并与其他来自inner_scope@tamed函数并行执行,并且可以在它们之间进行任意的切换。然而,由于inner_scope在切换回同步上下文之前等待其所有函数完成,因此在inner_scope之后调度的新的函数将等待inner_scope的完成。

import asyncio
from tamer import tamed, AsyncScope

@tamed
async def slow_echo(msg:str, delay:int) -> None:
    await asyncio.sleep(delay)
    print(msg)

with AsyncScope() as outer_scope:
    slow_echo("Outer Scope > DELAY(1.5)", 1.5, _async_scope=outer_scope)
    slow_echo("Outer Scope > DELAY(1)", 1, _async_scope=outer_scope)
    
    with AsyncScope() as inner_scope:
        slow_echo("Outer Scope > Inner Scope > DELAY(2)", 2, _async_scope=inner_scope)
        slow_echo("Outer Scope > Inner Scope > DELAY(1)", 1, _async_scope=inner_scope)
    # await inner_scope functions

    # Note: scheduled after inner scope has finished
    slow_echo("Outer Scope > DELAY(.5)", 0.5, _async_scope=outer_scope)

# Output
# ------
#
# Outer Scope > DELAY(1)
# Outer Scope > Inner Scope > DELAY(1)
# Outer Scope > DELAY(1.5)
# Outer Scope > Inner Scope > DELAY(2)
# Outer Scope > DELAY(.5)

这与@tamedDelayedResult一样,不仅适用于同步上下文(with),也适用于异步上下文(async with)。

import asyncio
from tamer import tamed, AsyncScope

@tamed
async def slow_echo(msg:str, delay:int) -> None:
    await asyncio.sleep(delay)
    print(msg)

@tamed
async def slow_bulk_echo() -> None:
    async with AsyncScope() as outer_scope:  # <-- `async with` in async contexts
        slow_echo("Outer Scope > DELAY(1.5)", 1.5, _async_scope=outer_scope)
        slow_echo("Outer Scope > DELAY(1)", 1, _async_scope=outer_scope)
        
        async with AsyncScope() as inner_scope:
            slow_echo("Outer Scope > Inner Scope > DELAY(2)", 2, _async_scope=inner_scope)
            slow_echo("Outer Scope > Inner Scope > DELAY(1)", 1, _async_scope=inner_scope)
        # await inner_scope functions

        # Note: scheduled after inner scope has finished
        slow_echo("Outer Scope > DELAY(.5)", 0.5, _async_scope=outer_scope)

slow_bulk_echo()

# Output
# ------
#
# Outer Scope > DELAY(1)
# Outer Scope > Inner Scope > DELAY(1)
# Outer Scope > DELAY(1.5)
# Outer Scope > Inner Scope > DELAY(2)
# Outer Scope > DELAY(.5)

能够嵌套AsyncScopes的功能与它的关键字参数特别有用:exit_modeerror_mode。正如其名称所暗示的,exit_mode控制作用域退出时发生的事情,而error_mode控制分配的函数产生异常时发生的事情。

默认情况下,这些设置为exit_mode="wait"error_mode="cancel"。前者将在作用域结束时“等待”未完成的函数。后者将在其中一个函数失败时“取消”其他未完成的函数。这种行为与asyncio.TaskGrouptrio.Nursery匹配。当您批量调用函数时很有用,例如,当进行多个Web API调用或从磁盘读取一批图像时。

或者,您可以使用exit_mode="cancel",这将“取消”作用域结束时未完成的函数。这对于关闭“无限循环”或取消您认为需要但实际不需要的数据请求非常有用。

import asyncio
from datetime import datetime
from tamer import tamed, AsyncScope

class RateLimiter:
    def __init__(self):
        # allow an initial burst
        self.max_tokens = 3
        self.tokens = self.max_tokens

    @tamed
    async def generate_tokens(self, delay:int):
        while True:  # <-- generate new tokens forever
            await asyncio.sleep(delay)
            self.tokens = min(self.tokens + 1, self.max_tokens)

    @tamed
    async def get_token(self):
        # Note: This would not work with threads, but is perfectly 
        # fine in asyncio
        while self.tokens == 0:
            await asyncio.sleep(0)
        self.tokens -= 1
        return True

@tamed
async def fake_request(rate_limiter):
    await rate_limiter.get_token()
    print(datetime.now().strftime("%H:%M:%S.%f"), "Requesting...")

throttle = RateLimiter()
with AsyncScope(exit_mode="cancel") as service_layer:
    throttle.generate_tokens(1, _async_scope=service_layer)

    with AsyncScope() as batch:
        for _ in range(6):
            fake_request(throttle, _async_scope=batch)
    # <-- wait for all requests to finish
# <-- cancel the rate limiter

# Output
# ------
# 00:22:28.348290 Requesting...
# 00:22:28.348436 Requesting...
# 00:22:28.348564 Requesting...
# 00:22:29.347495 Requesting...
# 00:22:30.347555 Requesting...
# 00:22:31.347597 Requesting...

异常管理

不幸的是,事情总是会出错。如果发生这种情况,Python会引发异常,而您作为程序的作者必须决定如何响应。@tamed异步函数遵循同样的规则,它们与普通函数没有区别。

from tamer import tamed, AsyncScope

@tamed
async def faulty_function()
    raise RuntimeError("Oh no!")

# ============================
# Asynchronous Execution
# ============================

async def main():
    coro = faulty_function()

    try:
        await coro
    except RuntimeError:
        print("Actually, I'm good.")

asyncio.run(main())

# Output
# ------
#
# Actually, I'm good.


# ============================
# Synchronous Execution
# ============================

try:
    faulty_function()
except RuntimeError:
    print("Actually, I'm good.")

# Output
# ------
#
# Actually, I'm good.


# ============================
# AsyncScope Execution
# ============================

with AsyncScope() as scope:
    delayed_result = faulty_function(_async_scope=scope)

    try:
        delayed_result.block()
    except RuntimeError: 
        print("Actually, I'm good.")

# Output
# ------
#
# Actually, I'm good.

一个特殊案例是 AsyncScope 中的函数。在这里,您使用 DelayedResult.value 消费结果,但在等待结果时通过 DelayedResult.block()await delayed_result 处理异常。这是故意的,因为任何在上述语句之后的代码现在都可以断言结果已经 成功 到达。

AsyncScope 结尾处的隐式 await 作为通用捕获,可以引发您未显式等待的所有异常。这确保了没有异常被遗漏,并且您的程序不会产生意外的副作用。

from tamer import tamed, AsyncScope

@tamed
async def faulty_function()
    raise RuntimeError("Oh no!")

with AsyncScope() as scope:
    result = faulty_function(_async_scope=scope)

# Output (excerpt)
# ----------------
#
# Traceback (most recent call last):
#    [...]
# RuntimeError: Oh no!

作者注

注意:这一节相当哲学性,更多是关于这个包为什么存在,而不是如何使用它。这是“一个老头的絮叨”,所以如果你选择跳过,我不会告诉任何人 :)

如果您在网上阅读有关 Python 的 asyncio 库的内容,您会发现人们通常以性能为导向来处理这个库。然而,只有一些人真正看到了它的好处。这导致了对该库的混杂,但通常是负面的情绪,从 asyncio 被称为“虚假并行”,“太复杂而难以使用”,到“有用但非常窄领域”。

在我的理解中,这种情绪通常是由三个原因混合造成的,导致用户认为 asyncio 应该像线程一样使用。

  1. 用户认为 asyncio 在底层使用线程级并行,或者用于实现绿色线程。
  2. Asyncio 使用类似线程的语义(请参阅文末表格)。
  3. 许多在线文档和教程指出,async/await 与基于线程的 Web 服务器实现相比提供了巨大的性能提升。

在思考编写线程时编写异步代码并不错……但这是不完整的,并且在我看来是误导的。为什么?因为它们是两种不同类型的并行。线程将一大块工作分成小块,并在多个核心上并行处理多个小块。Async/await 将一大块工作分成小块,标记忙等待的时段,并重新排序指令以最小化空闲时间。换句话说,线程使用线程级并行(废话!),而 async/await 使用指令级并行。两者都是并行形式,但它们 是同一件事。

线程 Async/Await
编排 主线程 + 无限循环 事件循环
创建 tid = Thread(fn, ...) coro = fn(...)
同步 tid.join() await coro
数据交换 共享内存 共享内存
开销 ~50 µs (操作系统级线程) 无(函数调用)
并发 抢占式 合作式

隐式并行

我们不应该将 async/await 视为多线程代码,而应该将其视为分割数据加载和消费的代码。它知道输入数据来自外部系统(数据库、套接字、文件系统、blob 存储等),这些外部系统速度慢,并且程序的大部分时间都花在等待数据上。

asyncio 的巧妙之处在于意识到我们只能逐条执行指令(感谢 GIL),但我们可以并行等待任意数量的外部系统。传统的(同步)代码会请求一块数据,在它被准备期间(忙等待)什么也不做,消费它,然后继续请求下一块数据。异步代码首先请求所有数据,并行忙等待所有数据到达,然后逐个消费数据,而不再等待。这减少了空闲时间从(同步)的 sum(load_times) 到(异步)的 max(load_times);这是一个被称为隐式并行或如果对编译器和 CPU 的工作原理感兴趣,则是指令级并行的技巧。

此外,如果我们的工作负载允许处理无序数据,我们将在开始时请求所有数据,忙等直到第一份数据到来,处理它,然后转到下一份数据或继续忙等,直到再次接收到数据。这样,我们在min(load_times)max(load_times)之间忙等,与同步情况下的sum(load_times)相比,可以大幅提高速度。

为什么创建 Async-Tamer

理解async/await背后的并行工作原理,并摆脱线程式设计,可以简化代码及其设计。加载数据的函数变为async def,而消费数据的函数保持不变。不幸的是,目前使用asyncio表达这一点并不简单。我们无法在async函数外使用await,执行async函数需要启动事件循环并协调它。

因此,我编写了async-tamer。它限制了async在代码库中传播的范围,即它@tamedasync关键字。在等待数据时,我们需要事件循环及其隐含的并行性。在本地处理数据时,我们不需要事件循环,如果我们尝试这样做,就会陷入困境。程序的两个部分应该清晰地分离,async def(等待外部事物)与def(处理本地事物)非常适合这种需求。

库的其余部分实际上是语法糖,并尝试删除尽可能多的样板代码。我非常喜欢trio中的结构化并发概念,这正是我们在加载数据的“初始批次”时所需要的。因此,AsyncScopes以类似的方式工作。然而,我也认为Nurseries和(asyncio)TaskGroups缺乏细粒度控制,添加生命周期kwargs感觉是这一想法的自然扩展。

感谢您一直看到最后,祝您编码愉快!

项目详情


下载文件

下载适合您平台的应用程序。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

async_tamer-1.0.0.tar.gz (17.9 kB 查看散列)

上传时间

构建分布

async_tamer-1.0.0-py3-none-any.whl (13.4 kB 查看散列)

上传时间 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面