跳转到主要内容

AnyIO的任务作用域

项目描述

此库实现了作用域任务组/托儿所。

理由

可组合性

大型程序通常由相互依赖的构建块组成。这些依赖关系可能很复杂,不一定总是线性的,通常形成某种有向无环图,而不是漂亮的线性或分层关系集合。

让我们举一个例子。

您的服务器包含一些管理模块,该模块需要一个支持库,该库连接到数据库。在半途中遇到错误,因此加载了一个错误处理器,该处理器也使用数据库。

接下来,您的服务器的一个客户端做了些可疑的操作,因此您想记录下来,由于加载错误处理器和连接到数据库成本较高,您希望重用已经拥有的处理器。

后来,管理代码终止。但是,它不应该卸载错误处理器,因为其他代码仍然需要它。

这是一个问题,因为您喜欢使用结构化编程原则,该原则规定,如果您启动了它,则需要停止它。因此,您需要跳过一些步骤,将所有这些连接起来,跟踪每个模块的用户,并按正确的顺序关闭事物。

更糟的是:假设您的代码因致命异常而死亡。这个异常通常在您的所有代码中传播,因此往往会取消数据库连接,在错误处理器有机会记录问题之前。更糟的是,如果日志记录器中的异常发生在 finally: 块中,它基本上会替换原始异常,因此您将有很多乐趣尝试调试所有这些。

AsyncScope可以帮助您。

AsyncScope 跟踪程序构建块。它记得哪些部分依赖于哪些其他部分,防止循环依赖,并在不再有人使用时立即终止作用域。

现在您的错误处理程序将保持与您所需的时间一样长,数据库连接在错误处理程序(或任何其他代码)需要时不会死亡,您的错误会正确记录,并且您可以找到问题。

多个服务

某些程序需要任意数量的异步上下文,例如在同一个方法中与任何数量的服务器通信的客户端。在这种情况下创建服务器上下文通常很尴尬;您需要一个子任务或一个 contextlib.AsyncExitStack 作为上下文的“守护者”。然而,设置子任务有很多样板代码;退出堆栈在您需要动态删除服务器时没有帮助。

AsyncScope 通过将代码结构与服务使用解耦来帮助,同时确保外部连接和其他子任务在其最后一个用户终止时不会重复且干净地结束。

用法

主代码

将您的主代码包装在 async with asyncscope.main_scope('NAME'): ... 块中。(如果省略,则名称默认为 _main。)

此调用初始化全局 AsyncScope.scope 对象。它始终引用当前服务(即,最初是您的主代码)。

包装服务

“服务”被定义为任何可能在程序中的多个上下文中使用的非平凡对象。这可能是一个 HTTP 会话,或数据库连接,任何创建起来相当麻烦的对象...

AsyncScope 要求您将服务包装在一个具有常规调用约定的函数中,因为它不知道(实际上也不想知道)如何设置和拆卸服务对象。

以下是一些示例。

如果服务使用 run 方法,您将这样做

from asyncscope import scope

async def some_service(*p, **kw):
   srv = your_service(*p, **kw)
   async with anyio.create_task_group() as tg:
      await tg.start(srv.run)

      scope.register(srv)
      await scope.no_more_dependents()

      await srv.stop_running()
      tg.cancel_scope.cancel()

或者,如果服务作为一个异步上下文管理器运行

from asyncscope import scope

async def some_service(*p, **kw):
   async with your_service(*p, **kw) as srv:
      # NB: some services use "async with await …"
      scope.register(srv)
      await scope.no_more_dependents()

或者²,它可能作为一个无上下文的背景服务运行

from asyncscope import scope

async def some_service(*p, **kw):
   srv = your_service(*p, **kw)
   srv = await srv.start()

   scope.register(srv)
   await scope.no_more_dependents()

   await srv.aclose()

或者³,如果服务是一个设置麻烦的对象

from asyncscope import scope

async def some_service(*p, **kw):
   srv = SomeObject(*p, **kw)
   await SomeObject.costly_setup()

   scope.register(srv)
   try:
      await scope.no_more_dependents()
   finally:
      srv.teardown()
   # use this to e.g. clean up circular references within your object

接下来,我们将看到如何使用这些对象。

使用服务

使用 AsyncScope,服务以两种方式之一使用。

  • 在上下文中

    from asyncscope import scope
    
    async with scope.using_scope():
        srv = await scope.service(name, some_service, *p, **kw)
        ...
  • 直到调用者的作用域结束 您显式释放它

    from asyncscope import scope
    
    srv = await scope.service(name, some_service, *p, **kw)
    ...
    del srv  # don't hog the memory!
    scope.release(name)

您还可以检查是否存在具有特定名称的服务

from asyncscope import scope

try:
    srv = scope.lookup(name)
except KeyError:
    pass  # no it does not
else:
    ...
    del srv
    scope.release(name)

在这三种情况下 srv 是您的 some_service 代码传递给 AsyncScope.Scope.register 的对象。

服务命名

AsyncScope 使用 name 来发现服务是否已经在运行。如果是这样,它将记录当前作用域也在使用这个命名的服务,并简单地返回它。

名称必须是全局唯一的。为了避免冲突,根据使用情况,将对象类、标识符如 id(YourServiceClass)id(container_object) 添加到其中。

AsyncScope 不会从其参数中尝试推导出唯一性,因为任意的命名约定可能不适合所有人。区分潜在冲突的一种简单方法是在名称中包含 id(some_service)

影响

调用 Scope.serviceScope.using_service 并不能保证服务在您调用时启动:它可能已经运行了。同样,离开 async with 块或退出调用者的作用域可能不会停止服务:可能还有其他用户,或者某些缓存机制会延迟关闭它。

调用这些函数两次/嵌套 Scope.using_service 调用是可以的。使用周期(服务 A 启动服务 B,而后来 B 又需要 A)是禁止的,并且会被检测到。

每个作用域都包含一个任务组,您可以使用常用的 startstart_soon 方法来访问它。您还可以调用 scope.spawn()。此函数返回一个包装新任务的 CancelScope,因此如果需要,您可以取消它。以这种方式启动的所有任务在作用域退出时也会自动取消。

您的 some_service 代码 必须 正确调用 scope.register() 一次,否则等待它启动的作用域将永远等待。(如果您的范围的主要任务在这样做之前退出,它们将被取消。)

当前作用域作为 scope 上下文变量可用。

“examples” 目录包含一些示例代码。

日志记录

scope.logger 是一个标准的 logging.Logger 对象,命名为 scope.NAME

多线程

AsyncScope 与多线程不兼容。从多个线程使用单个主作用域 导致数据不一致、死锁和其他难以发现的错误。

如果您在一个新线程中启动单独的异步主循环,您必须在进入线程的主作用域之前调用 scope.thread_reset()。您还应该将特定于线程的名称传递给 main_scope

不要在线程之间共享服务。它们通常不具有多线程意识,并且 AsyncScope 可能会在任何时间终止它们。

异常处理

本节描述了从服务的主要任务逃逸的异常的效果,导致其终止。

不是 ExceptionBaseException 子类永远不会被捕获。如果服务尚未调用 Scope.register,它们可能收到一个 concurrent.Futures.CancelledError 或异步框架的取消异常。

在服务调用 Scope.register 之后抛出的 Exception 不会被处理。它们最终会传播出 AsyncScope.main_scope 块。

否则,错误会传播到等待其 Scope.register 调用的调用者。

否则,异常将不会被处理;其影响在下一节中描述。

取消语义

当作用域退出(无论是干净地退出还是当它引发逃逸其任务组的错误)时,依赖于它的作用域将立即并行取消。然后,它所依赖的作用域将被干净且有序地终止,前提是它们不是由其他作用域使用的。

当作用域的主要任务结束时,也会发生这种情况。

“干净终止”意味着作用域对 no_more_dependents() 的调用返回。如果没有这样的调用,则取消作用域的任务。

待办事项:编写一个服务,您的代码可以使用它来使另一个服务保持活跃一段时间。

代码结构

作用域的主要代码通常如下所示

  • 执行启动服务所需的所有操作。此代码可以启动它所依赖的其他作用域。请注意,如果作用域已经运行,则 service 将简单地返回其现有的服务对象。

  • 调用 scope.register(service_object)

  • 调用 await scope.no_more_dependents()(从属任务)或等待SIGTERM(守护进程主任务)或终止(主任务已完成)

  • 干净地停止你的服务。

如果不使用 no_more_dependents,则你的代码将被取消。

范围通常不需要访问其自身的范围对象。它存储在contextvar中,如果需要可以由 scope.get() 获取。然而,对于大多数用途,asyncscope的全局 scope 对象会透明地访问当前范围。

临时服务

有些服务不需要一直运行。为了提前释放服务,使用 async with scope.using_scope():。这创建了一个嵌套范围。在嵌套范围内启动的服务,在上下文退出时会自动释放,前提是没有其他代码使用它们(通常如此)。

如果嵌套范围使用的范围处理器退出,嵌套范围内的代码会像往常一样被取消。然后离开嵌套范围会触发一个 ScopeDied 异常。

项目详情


下载文件

下载适合您平台的应用程序文件。如果您不确定选择哪一个,请了解更多关于 安装包 的信息。

源分布

asyncscope-0.11.2.tar.gz (18.2 kB 查看哈希值)

上传时间

构建分布

asyncscope-0.11.2-py3-none-any.whl (13.7 kB 查看哈希值)

上传时间 Python 3

支持者