跳转到主要内容

asyncio服务管理器

项目描述

面元

asyncio服务管理器(自0.10.0版本起支持经典阻塞代码)。

Github actions status for master branch Codecov coverage for master branch Pypi version Pypi downloads count

原因

Asyncio

mode 尝试做太多工作

  • 混乱的回调(on_starton_startedon_crashed等)。
  • 继承限制命名并强制调用 super()
  • 强制使用日志模块和日志配置。

阻塞代码

  • ExitStack 对于管理服务来说太低级。
  • 异步和阻塞世界的通用API。

特性

  • 简单(startstopdependenciesadd_task)。
  • 可通过继承配置(优雅关闭超时)。
  • 混入(不需要 super())。
  • 不需要运行器引擎(如 WorkerRunner 等),只需直接 awaitasync with/with

许可证

facet 在MIT许可证下提供。

需求

  • python 3.11+

支持python 3.6+的最后版本是0.9.1

使用方法

Asyncio

import asyncio
from facet import AsyncioServiceMixin

class B(AsyncioServiceMixin):
    def __init__(self):
        self.value = 0

    async def start(self):
        self.value += 1
        print("b started")

    async def stop(self):
        self.value -= 1
        print("b stopped")

class A(AsyncioServiceMixin):
    def __init__(self):
        self.b = B()

    @property
    def dependencies(self):
        return [self.b]

    async def start(self):
        print("a started")

    async def stop(self):
        print("a stopped")

asyncio.run(A().run())

这将产生

b started
a started

启动和停止顺序由严格规则确定:**必须首先启动依赖项,最后停止依赖项**。这就是为什么 BA 之前启动的原因。因为 A 可能在其 start 例程中使用 B

ctrl-c 并你将看到

a stopped
b stopped
Traceback (most recent call last):
  ...
KeyboardInterrupt

停止顺序相反,因为 A 可能在其 stop 例程中使用 B。任何抛出的异常都会传播到上级上下文。facet 不尝试过于智能。

服务可以用作上下文管理器。而不是

asyncio.run(A().run())

代码可以看起来像

async def main():
    async with A() as a:
        assert a.b.value == 1
        await a.wait()

asyncio.run(main())

另一个服务特性是 add_task 方法

class A(AsyncioServiceMixin):
    async def task(self):
        await asyncio.sleep(1)
        print("task done")

    async def start(self):
        self.add_task(self.task())
        print("start done")

asyncio.run(A().run())

这会导致后台任务创建和处理

start done
task done

后台任务中任何未处理的异常都会导致整个服务栈崩溃。这也是一个快速且响亮失败的关键特性。

服务停止时,所有后台任务都将被取消并等待。

您可以管理依赖项的启动/停止,以顺序、并行或混合方式启动。例如

class A(AsyncioServiceMixin):
    def __init__(self):
        self.b = B()
        self.c = C()
        self.d = D()

    @property
    def dependencies(self):
        return [
            [self.b, self.c],
            self.d,
        ]

这导致首先并行启动 bc,在它们成功启动后,d 将尝试启动,然后调用 a 的自身启动。在停止例程中,首先调用 a 的停止,然后调用 d 的停止,最后并行停止 bc

这里的规则是 第一层嵌套是顺序的,第二层嵌套是并行的

阻塞代码

从版本 0.10.0 开始,facet 可以在具有类似规则的阻塞代码中使用。 但API有限。例如

from facet import BlockingServiceMixin

class B(BlockingServiceMixin):
    def __init__(self):
        self.value = 0

    def start(self):
        self.value += 1
        print("b started")

    def stop(self):
        self.value -= 1
        print("b stopped")

class A(BlockingServiceMixin):
    def __init__(self):
        self.b = B()

    @property
    def dependencies(self):
        return [self.b]

    def start(self):
        print("a started")

    def stop(self):
        print("a stopped")

with A() as a:
    assert a.b.value == 1

这将产生

b started
a started
a stopped
b stopped

如您所见,没有 wait 方法。等待和后台任务由用户负责,技术上可以使用 concurrent.futures 模块实现。但是,facet 不提供此类功能,因为有很多方法可以实现:threading/multiprocessing 及其原语。

此外,没有“依赖项的顺序、并行和混合启动/停止”功能。因此,只需将依赖项作为 dependencies 属性的普通 list 放置,它们将顺序启动/停止。

API

Asyncio

以下是在继承/混合中获得的方法

start

async def start(self):
    pass

启动例程。

stop

async def stop(self):
    pass

停止例程。

dependencies

@property
def dependencies(self) -> list[AsyncioServiceMixin | list[AsyncioServiceMixin]]:
    return []

应返回当前服务依赖项实例的可迭代对象。

add_task

def add_task(self, coroutine: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:

添加后台任务。

run

async def run(self) -> None:

运行服务并等待它停止。

wait

async def wait(self) -> None:

等待服务停止。服务必须已启动。这对于将服务用作上下文管理器很有用。

graceful_shutdown_timeout

@property
def graceful_shutdown_timeout(self) -> int:
    return 10

等待停止例程的总秒数。此属性可以由子类覆盖

class CustomServiceMixin(AsyncioServiceMixin):
    @property
    def graceful_shutdown_timeout(self):
        return 60

running

@property
def running(self) -> bool:

检查服务是否正在运行

阻塞代码

start

def start(self):
    pass

启动例程。

stop

def stop(self):
    pass

停止例程。

dependencies

@property
def dependencies(self) -> list[BlockingServiceMixin | list[BlockingServiceMixin]]:
    return []

应返回当前服务依赖项实例的可迭代对象。

running

@property
def running(self) -> bool:

检查服务是否正在运行

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

facet-0.10.1.tar.gz (8.9 kB 查看散列)

上传时间

构建分发

facet-0.10.1-py3-none-any.whl (6.6 kB 查看散列)

上传时间 Python 3

由以下支持