跳转到主要内容

通用的类型安全异步工作进程运行器。

项目描述

jockey

通用的Python库,用于运行异步工作进程。适用于构建事件处理程序、网络框架等。您编写一个简单的适配器,jockey将负责注册处理程序、路由事件、并行运行作业、设置优先级、限制最大工作进程数、取消作业、等待退出等。

功能

  • 灵活,可以用作网络框架、事件消费者、后台作业处理、并行计算等。
  • 可以在asyncio事件循环、线程或进程中运行作业。
  • 100%类型安全。
  • 可靠、经过实战检验,且100%测试覆盖率。
  • 支持作业优先级和所有类型的工作进程数限制。

如果您需要一个适用于分布式系统的基于事件的框架,请看看 walnats。这是一个基于 nats.py 的类型安全的 Python 库,旨在提供安全、快速和可靠的解决方案。Jockey 基于walnats,这两个项目有很多共同的目标和设计选择,甚至包括一些API。

用法

让我们创建一组简单的函数来处理数学运算。

jockey 提供的大部分类都是泛型的,需要参数化。您需要定义3个类型变量

  • Payload 是处理程序的输入参数。它是Web框架的请求或事件处理程序的消息
  • Key 是路由键,用于选择处理程序。对于Web框架,它是URL路径;对于事件处理程序,它是路由键。
  • Result 是处理程序的响应。它是Web框架的响应;对于事件处理程序,它是None。

为了方便,我们可以将它们定义为类型别名

# each math operation accepts 2 integer numbers
Payload = tuple[int, int]
# each math operation is identified by a symbol (like "+" or "/")
Key = str
# each math operation returns a float
Result = float

需要定义的最重要组件是适配器。它是一组回调,告诉如何将原始消息转换为路由键和负载,以及如何处理成功和失败。在我们的案例中,我们将简单地从所有回调中打印。

import asyncio
from dataclasses import dataclass
from typing import Iterator
import jockey

@dataclass
class Message(jockey.Adapter[Payload, Key, Result]):
    left: int
    op: Key
    right: int

    def get_keys(self) -> Iterator[Key]:
        yield self.op

    async def get_payload(self) -> Payload:
        return (self.left, self.right)

    async def on_success(self, result: Result) -> None:
        print(f'SUCCESS: {self.left} {self.op} {self.right} = {result}')

    async def on_failure(self, exc: Exception) -> None:
        print(f'FAILURE: {self.left} {self.op} {self.right} caused {exc!r}')

    async def on_cancel(self, exc: asyncio.CancelledError) -> None:
        print(f'CANCELED: {self.left} {self.op} {self.right}')

接下来,我们创建一个数学操作的注册表

class Registry(jockey.Registry[Payload, Key, Result]):
    pass

registry = Registry()

在这个注册表中,我们可以注册所有的数学操作(“处理程序”)

@registry.add('+')
def _add(payload: Payload) -> Result:
    left, right = payload
    return left + right

您可以告诉 jockey 在单独的进程(或线程)中执行任务

@registry.add('/', execute_in=jockey.ExecuteIn.PROCESS)
def _div(payload: Payload) -> Result:
    left, right = payload
    return left / right

或者将处理程序异步化

@registry.add('-')
async def _sub(payload: Payload) -> Result:
    left, right = payload
    await asyncio.sleep(1)
    return left / right

最后,我们创建一个执行器并安排消息

async def main() -> None:
    async with jockey.Executor(registry).run() as executor:
        messages = [
            Message(3, '-', 2),
            Message(4, '+', 5),
            Message(3, '/', 2),
            Message(3, '/', 0),
            Message(3, '+', 0),
        ]
        for msg in messages:
            await executor.execute(msg)

if __name__ == '__main__':
    asyncio.run(main())

就这样!输出应该看起来像这样

SUCCESS: 4 + 5 = 9
SUCCESS: 3 + 0 = 3
SUCCESS: 3 / 2 = 1.5
FAILURE: 3 / 0 caused ZeroDivisionError('division by zero')
SUCCESS: 3 - 2 = 1.5

请注意,我们首先发送 Message(3, '-', 2),但由于 await asyncio.sleep(1),它最后到达。这展示了两个重要的事情

  1. executor.execute 并行运行所有消息。因此,当第一个消息被阻塞时,其余的消息可以进行处理。您可以使用 wait_for 参数指定执行器返回多长时间。
  2. 离开上下文时,执行器将阻塞并等待所有消息完成。同样,如果执行器被取消,它将确保取消所有正在运行的处理程序(并执行 Adapter.on_cancel)。

更多示例

examples 目录中,有一些您可以运行并看到项目实际操作的示例。

高级用法

该库可以做很多意想不到的事情,但没有人会读一大堆关于做什么和为什么的文档。相反,我们提供了一些示例(见上面)和每个公共方法的详尽文档字符串。因此,开始的最佳方式是取一个示例,根据您的需求进行调整,然后查看您使用的功能的文档字符串,以了解您可以和应该配置什么。

问答

  1. 它是维护的吗? 项目功能基本完成,因此我没有每天提交和发布的内容。但是,我接受贡献(见下文)。
  2. 如果我发现了bug怎么办? 分支项目,修复bug,编写一些测试,然后打开一个Pull Request。我通常在一天内合并和发布任何贡献。
  3. 是否支持重试? 网络框架不进行重试,事件消费者在消息代理端处理重试,所以对于大多数用例来说,在jockey本身中拥有重试功能是多余的。您始终可以通过从Adapter.on_failure发送新消息来实现自己的重试逻辑。

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪一个,请了解更多关于安装包的信息。

源代码分发

jockey-1.0.0.tar.gz (19.4 kB 查看哈希值)

上传时间 源代码

构建分发

jockey-1.0.0-py3-none-any.whl (14.0 kB 查看哈希值)

上传时间 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面