Python异步任务适用于AsyncIO
项目描述
PATIO
PATIO是Python异步任务(AsyncIO)的缩写。
动机
我希望创建一个易于扩展的库,用于分布式任务执行,如celery
,仅针对asyncio作为主要设计方法。
设计上,该库应适用于小型项目和真正的大型分布式项目。总体想法是用户只需将项目代码库拆分为两种角色的函数——后台任务和这些后台任务的触发器。这也应该有助于您的项目进行横向扩展。它允许通过嵌入TCP或使用插件通过现有的消息传递基础设施进行通信,在网络中提供工作者或调用者。
快速入门
执行线程池中的任务的简单示例
import asyncio
from functools import reduce
from patio import Registry
from patio.broker import MemoryBroker
from patio.executor import ThreadPoolExecutor
rpc = Registry()
@rpc("mul")
def multiply(*args: int) -> int:
return reduce(lambda x, y: x * y, args)
async def main():
async with ThreadPoolExecutor(rpc, max_workers=4) as executor:
async with MemoryBroker(executor) as broker:
print(
await asyncio.gather(
*[broker.call("mul", 1, 2, 3) for _ in range(100)]
)
)
if __name__ == '__main__':
asyncio.run(main())
本例中的ThreadPoolExecutor
是执行任务的实体。如果您的项目中的任务异步,您可以选择AsyncExecutor
,然后代码将如下所示
import asyncio
from functools import reduce
from patio import Registry
from patio.broker import MemoryBroker
from patio.executor import AsyncExecutor
rpc = Registry()
@rpc("mul")
async def multiply(*args: int) -> int:
# do something asynchronously
await asyncio.sleep(0)
return reduce(lambda x, y: x * y, args)
async def main():
async with AsyncExecutor(rpc, max_workers=4) as executor:
async with MemoryBroker(executor) as broker:
print(
await asyncio.gather(
*[broker.call("mul", 1, 2, 3) for _ in range(100)]
)
)
if __name__ == '__main__':
asyncio.run(main())
这些示例可能看起来很复杂,但不用担心,下一节详细介绍了通用概念,希望您会对此有更多了解。
主要概念
在开发这个库时,主要想法是创建一个最大程度模块化和可扩展的系统,可以通过第三方集成或直接在用户的代码中进行扩展。
构成一切的基本元素是
注册表
- 函数的键值对存储执行器
- 执行从注册表中的函数的对象代理
- 在您的分布式(或本地)系统中分配任务的行为者。
注册表
这是一个函数存储容器,用于后续执行。您可以通过特定的名称或无名称注册函数,在这种情况下,函数将被分配一个唯一名称,该名称取决于函数的源代码。
此注册表不一定需要在调用和被调用方匹配,但对于您无名称注册的函数必须匹配,然后您在调用时不需要传递函数名,而是函数本身。
必须将注册表的实例传递给代理,在设置过程中的第一个代理将阻止注册表写入,即,注册新函数将是不可能的。
可选的 项目
参数,这本质上类似于命名空间,有助于避免不同项目中具有相同名称的函数冲突。建议指定它,并且代理也应该使用此参数,因此它应在同一项目内具有相同的值。
您可以选择手动注册元素或使用注册表实例作为装饰器
from patio import Registry
rpc = Registry(project="example")
# Will be registered with auto generated name
@rpc
def mul(a, b):
return a * b
@rpc('div')
def div(a, b):
return a / b
def pow(a, b):
return a ** b
def sub(a, b):
return a - b
# Register with auto generated name
rpc.register(pow)
rpc.register(sub, "sub")
或者使用 register
方法
from patio import Registry
rpc = Registry(project="example")
def pow(a, b):
return a ** b
def sub(a, b):
return a - b
# Register with auto generated name
rpc.register(pow)
rpc.register(sub, "sub")
最后,您可以显式注册函数,就像它们只是一个字典一样
from patio import Registry
rpc = Registry(project="example")
def mul(a, b):
return a * b
rpc['mul'] = mul
执行器
执行器是执行注册表中本地函数的实体。以下执行器在包中实现
AsyncExecutor
- 实现异步任务池ThreadPoolExecutor
- 实现线程池ProcessPoolExecutor
- 实现进程池NullExecutor
- 不执行任何操作,仅用于禁止显式执行。
它的作用是可靠地执行作业,而不占用太多资源,以免造成服务中断或过度消耗内存。
执行器实例传递给代理,通常应用于整个注册表。因此,您应该了解注册表中必须包含哪些函数,以便选择执行器的类型。
代理
将任务分配的责任转移到用户实现的基方法是,通过这种方式,任务分配可以通过第三方代理、数据库或其他方式实现。
该包通过以下代理实现
MemoryBroker
- 在单个进程中分配任务。一个非常简单的实现,如果您还不确定应用程序将如何发展,只想稍后决定使用哪个代理,同时为切换到另一个代理奠定基础。TCPBroker
- 仅使用TCP实现的简单代理,支持服务器和客户端模式,适用于任务执行器和任务提供者。
MemoryBroker
如果您目前不需要以分布式方式分配任务,这是一个很好的起点。
实际上,这是一种在项目的其他地方运行执行器中的任务的方法。
TCPBroker
它允许您在不求助于外部消息代理的情况下使任务分布式。
TCP代理实现的基本思想是,在执行任务方面,它们之间没有区别,这只是一个建立连接的方式,服务器和客户端都可以是执行任务的一方,也可以是设置任务的一方,也可以是混合模式。
换句话说,决定在您的系统中谁是服务器和谁是客户端,只是连接和找到彼此的一种方式,在您的分布式系统中。
以下是服务器与客户端之间组织通信的几种方式。
以服务器为中心的方案示例
该图描述了一个简单示例,假设有一个服务器和一个客户端通过TCP交换消息。
一个客户端多个服务器示例
这是一个客户端如何建立与一组服务器连接的示例。
全网状示例
全网状方案,所有客户端都连接到所有服务器。
授权
授权发生在连接开始时,为此,参数 key=
(默认为 b''
)必须包含客户端和服务器相同的密钥。
重要的是要理解这并不能100%防止MITM等攻击。
只有当客户端和服务器位于受信任的网络中时,才应使用此方法。为了确保流量在互联网上的安全传输,应将 ssl_context=
参数添加到服务器和客户端(见下例)。
示例
以下示例可能会帮助您理解。
服务器执行任务
from functools import reduce
import asyncio
from patio import Registry
from patio.broker.tcp import TCPServerBroker
from patio.executor import ThreadPoolExecutor
rpc = Registry(project="test", auto_naming=False)
def mul(*args):
return reduce(lambda x, y: x * y, args)
async def main():
rpc.register(mul, "mul")
async with ThreadPoolExecutor(rpc) as executor:
async with TCPServerBroker(executor) as broker:
# Start IPv4 server
await broker.listen(address='127.0.0.1')
# Start IPv6 server
await broker.listen(address='::1', port=12345)
await broker.join()
if __name__ == "__main__":
asyncio.run(main())
客户端远程调用任务
import asyncio
from patio import Registry
from patio.broker.tcp import TCPClientBroker
from patio.executor import NullExecutor
rpc = Registry(project="test", auto_naming=False)
async def main():
async with NullExecutor(rpc) as executor:
async with TCPClientBroker(executor) as broker:
# Connect to the IPv4 address
await broker.connect(address='127.0.0.1')
# Connect to the IPv6 address (optional)
await broker.connect(address='::1', port=12345)
print(
await asyncio.gather(*[
broker.call('mul', i, i) for i in range(10)
]),
)
if __name__ == "__main__":
asyncio.run(main())
带有SSL的示例
任务归结为将ssl上下文传递给服务器和客户端。
以下是一个如何制作一些自签名证书和授权CA的示例。原始帖子 在这里。
这只是一个示例,如果您想使用自己的证书,只需根据您的安全策略创建所需的ssl上下文。
证书授权机构创建
注意:这是用于签名证书请求的密钥,任何持有此密钥的人都可以代表您签名证书。所以请将其保存在安全的地方!
openssl req -x509 \
-sha256 -days 3650 \
-nodes \
-newkey rsa:2048 \
-subj "/CN=Patio Example CA/C=CC/L=West Island" \
-keyout CA.key -out CA.pem
创建服务器证书
首先创建服务器私钥
openssl genrsa -out server.key 2048
然后创建一个证书请求并使用此密钥签名
openssl req \
-new -sha256 \
-key server.key \
-subj "/CN=server.example.net/C=CC/L=West Island" \
-out server.csr
由CA签名此请求
openssl x509 -req \
-days 365 -sha256 \
-in server.csr \
-CA CA.pem \
-CAkey CA.key \
-CAcreateserial \
-out server.pem
这应该足以加密流量。
带有SSL的服务器执行任务
from functools import reduce
import asyncio
import ssl
from patio import Registry
from patio.broker.tcp import TCPServerBroker
from patio.executor import ThreadPoolExecutor
rpc = Registry(project="test", auto_naming=False)
def mul(*args):
return reduce(lambda x, y: x * y, args)
async def main():
rpc.register(mul, "mul")
ssl_context = ssl.SSLContext()
ssl_context.load_verify_locations("path/to/CA.pem")
ssl_context.load_cert_chain("path/to/server.pem", "path/to/server.key")
async with ThreadPoolExecutor(rpc) as executor:
async with TCPServerBroker(executor, ssl_context=ssl_context) as broker:
# Start IPv4 server
await broker.listen(address='127.0.0.1')
# Start IPv6 server
await broker.listen(address='::1', port=12345)
await broker.join()
if __name__ == "__main__":
asyncio.run(main())
客户端远程调用任务
import asyncio
import ssl
from patio import Registry
from patio.broker.tcp import TCPClientBroker
from patio.executor import NullExecutor
rpc = Registry(project="test", auto_naming=False)
async def main():
ssl_context = ssl.create_default_context(cafile="path/to/CA.pem")
async with NullExecutor(rpc) as executor:
async with TCPClientBroker(executor, ssl_context=ssl_context) as broker:
# Connect to the IPv4 address
await broker.connect(address='127.0.0.1')
# Connect to the IPv6 address (optional)
await broker.connect(address='::1', port=12345)
print(
await asyncio.gather(*[
broker.call('mul', i, i) for i in range(10)
]),
)
if __name__ == "__main__":
asyncio.run(main())
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。