通用的类型安全异步工作进程运行器。
项目描述
jockey
通用的Python库,用于运行异步工作进程。适用于构建事件处理程序、网络框架等。您编写一个简单的适配器,jockey将负责注册处理程序、路由事件、并行运行作业、设置优先级、限制最大工作进程数、取消作业、等待退出等。
功能
- 灵活,可以用作网络框架、事件消费者、后台作业处理、并行计算等。
- 可以在asyncio事件循环、线程或进程中运行作业。
- 100%类型安全。
- 可靠、经过实战检验,且100%测试覆盖率。
- 支持作业优先级和所有类型的工作进程数限制。
如果您需要一个适用于分布式系统的基于事件的框架,请看看 walnats。这是一个基于 nats.py 的类型安全的 Python 库,旨在提供安全、快速和可靠的解决方案。Jockey 基于walnats,这两个项目有很多共同的目标和设计选择,甚至包括一些API。
用法
让我们创建一组简单的函数来处理数学运算。
jockey 提供的大部分类都是泛型的,需要参数化。您需要定义3个类型变量
Payload
是处理程序的输入参数。它是Web框架的请求或事件处理程序的消息Key
是路由键,用于选择处理程序。对于Web框架,它是URL路径;对于事件处理程序,它是路由键。Result
是处理程序的响应。它是Web框架的响应;对于事件处理程序,它是None。
为了方便,我们可以将它们定义为类型别名
# each math operation accepts 2 integer numbers
Payload = tuple[int, int]
# each math operation is identified by a symbol (like "+" or "/")
Key = str
# each math operation returns a float
Result = float
需要定义的最重要组件是适配器。它是一组回调,告诉如何将原始消息转换为路由键和负载,以及如何处理成功和失败。在我们的案例中,我们将简单地从所有回调中打印。
import asyncio
from dataclasses import dataclass
from typing import Iterator
import jockey
@dataclass
class Message(jockey.Adapter[Payload, Key, Result]):
left: int
op: Key
right: int
def get_keys(self) -> Iterator[Key]:
yield self.op
async def get_payload(self) -> Payload:
return (self.left, self.right)
async def on_success(self, result: Result) -> None:
print(f'SUCCESS: {self.left} {self.op} {self.right} = {result}')
async def on_failure(self, exc: Exception) -> None:
print(f'FAILURE: {self.left} {self.op} {self.right} caused {exc!r}')
async def on_cancel(self, exc: asyncio.CancelledError) -> None:
print(f'CANCELED: {self.left} {self.op} {self.right}')
接下来,我们创建一个数学操作的注册表
class Registry(jockey.Registry[Payload, Key, Result]):
pass
registry = Registry()
在这个注册表中,我们可以注册所有的数学操作(“处理程序”)
@registry.add('+')
def _add(payload: Payload) -> Result:
left, right = payload
return left + right
您可以告诉 jockey 在单独的进程(或线程)中执行任务
@registry.add('/', execute_in=jockey.ExecuteIn.PROCESS)
def _div(payload: Payload) -> Result:
left, right = payload
return left / right
或者将处理程序异步化
@registry.add('-')
async def _sub(payload: Payload) -> Result:
left, right = payload
await asyncio.sleep(1)
return left / right
最后,我们创建一个执行器并安排消息
async def main() -> None:
async with jockey.Executor(registry).run() as executor:
messages = [
Message(3, '-', 2),
Message(4, '+', 5),
Message(3, '/', 2),
Message(3, '/', 0),
Message(3, '+', 0),
]
for msg in messages:
await executor.execute(msg)
if __name__ == '__main__':
asyncio.run(main())
就这样!输出应该看起来像这样
SUCCESS: 4 + 5 = 9
SUCCESS: 3 + 0 = 3
SUCCESS: 3 / 2 = 1.5
FAILURE: 3 / 0 caused ZeroDivisionError('division by zero')
SUCCESS: 3 - 2 = 1.5
请注意,我们首先发送 Message(3, '-', 2)
,但由于 await asyncio.sleep(1)
,它最后到达。这展示了两个重要的事情
executor.execute
并行运行所有消息。因此,当第一个消息被阻塞时,其余的消息可以进行处理。您可以使用wait_for
参数指定执行器返回多长时间。- 离开上下文时,执行器将阻塞并等待所有消息完成。同样,如果执行器被取消,它将确保取消所有正在运行的处理程序(并执行
Adapter.on_cancel
)。
更多示例
在 examples 目录中,有一些您可以运行并看到项目实际操作的示例。
- math.py 是上面用法部分的代码。
- rabbitmq_consumer.py 展示了如何使用 jockey 和 aio-pika 从 RabbitMQ 消费和处理消息。
- web_framework.py 展示了如何使用 jockey 和 uvicorn 编写简单的Web框架。
高级用法
该库可以做很多意想不到的事情,但没有人会读一大堆关于做什么和为什么的文档。相反,我们提供了一些示例(见上面)和每个公共方法的详尽文档字符串。因此,开始的最佳方式是取一个示例,根据您的需求进行调整,然后查看您使用的功能的文档字符串,以了解您可以和应该配置什么。
问答
- 它是维护的吗? 项目功能基本完成,因此我没有每天提交和发布的内容。但是,我接受贡献(见下文)。
- 如果我发现了bug怎么办? 分支项目,修复bug,编写一些测试,然后打开一个Pull Request。我通常在一天内合并和发布任何贡献。
- 是否支持重试? 网络框架不进行重试,事件消费者在消息代理端处理重试,所以对于大多数用例来说,在jockey本身中拥有重试功能是多余的。您始终可以通过从
Adapter.on_failure
发送新消息来实现自己的重试逻辑。
项目详情
下载文件
下载适用于您的平台的文件。如果您不确定选择哪一个,请了解更多关于安装包的信息。
源代码分发
构建分发
jockey-1.0.0.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 3c7de864d7cb91a3b19ad4556e229819b8f8bb18afb9f72f190c2e4ce2287e27 |
|
MD5 | 23c14a586722e77549757ee81c02a6a8 |
|
BLAKE2b-256 | 5c01502b749a8ee8d81864719f50bd3c832285da20347647c29e5685e274e86b |
jockey-1.0.0-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 721fc5e7214ca381724feade034f93998c08568cf0ea6cc3f8521bb3f41655aa |
|
MD5 | 61606e7ae67e9bdd5d400f621ad5dc80 |
|
BLAKE2b-256 | 9b614e262baf6d2dc9484517b7ac364a030c654ef80a0374b954b5053abea02f |