跳转到主要内容

Python异步任务适用于AsyncIO

项目描述

PyPI - License Wheel Mypy PyPI PyPI Coverage Status tox

PATIO

PATIO是Python异步任务(AsyncIO)的缩写。

动机

我希望创建一个易于扩展的库,用于分布式任务执行,如celery,仅针对asyncio作为主要设计方法。

设计上,该库应适用于小型项目和真正的大型分布式项目。总体想法是用户只需将项目代码库拆分为两种角色的函数——后台任务和这些后台任务的触发器。这也应该有助于您的项目进行横向扩展。它允许通过嵌入TCP或使用插件通过现有的消息传递基础设施进行通信,在网络中提供工作者或调用者。

快速入门

执行线程池中的任务的简单示例

import asyncio
from functools import reduce

from patio import Registry
from patio.broker import MemoryBroker
from patio.executor import ThreadPoolExecutor


rpc = Registry()


@rpc("mul")
def multiply(*args: int) -> int:
    return reduce(lambda x, y: x * y, args)


async def main():
    async with ThreadPoolExecutor(rpc, max_workers=4) as executor:
        async with MemoryBroker(executor) as broker:
            print(
                await asyncio.gather(
                    *[broker.call("mul", 1, 2, 3) for _ in range(100)]
                )
            )


if __name__ == '__main__':
    asyncio.run(main())

本例中的ThreadPoolExecutor是执行任务的实体。如果您的项目中的任务异步,您可以选择AsyncExecutor,然后代码将如下所示

import asyncio
from functools import reduce

from patio import Registry
from patio.broker import MemoryBroker
from patio.executor import AsyncExecutor


rpc = Registry()


@rpc("mul")
async def multiply(*args: int) -> int:
    # do something asynchronously
    await asyncio.sleep(0)
    return reduce(lambda x, y: x * y, args)


async def main():
    async with AsyncExecutor(rpc, max_workers=4) as executor:
        async with MemoryBroker(executor) as broker:
            print(
                await asyncio.gather(
                    *[broker.call("mul", 1, 2, 3) for _ in range(100)]
                )
            )


if __name__ == '__main__':
    asyncio.run(main())

这些示例可能看起来很复杂,但不用担心,下一节详细介绍了通用概念,希望您会对此有更多了解。

主要概念

在开发这个库时,主要想法是创建一个最大程度模块化和可扩展的系统,可以通过第三方集成或直接在用户的代码中进行扩展。

构成一切的基本元素是

  • 注册表 - 函数的键值对存储
  • 执行器 - 执行从注册表中的函数的对象
  • 代理 - 在您的分布式(或本地)系统中分配任务的行为者。

注册表

这是一个函数存储容器,用于后续执行。您可以通过特定的名称或无名称注册函数,在这种情况下,函数将被分配一个唯一名称,该名称取决于函数的源代码。

此注册表不一定需要在调用和被调用方匹配,但对于您无名称注册的函数必须匹配,然后您在调用时不需要传递函数名,而是函数本身。

必须将注册表的实例传递给代理,在设置过程中的第一个代理将阻止注册表写入,即,注册新函数将是不可能的。

可选的 项目 参数,这本质上类似于命名空间,有助于避免不同项目中具有相同名称的函数冲突。建议指定它,并且代理也应该使用此参数,因此它应在同一项目内具有相同的值。

您可以选择手动注册元素或使用注册表实例作为装饰器

from patio import Registry

rpc = Registry(project="example")

# Will be registered with auto generated name
@rpc
def mul(a, b):
    return a * b

@rpc('div')
def div(a, b):
    return a / b

def pow(a, b):
    return a ** b

def sub(a, b):
    return a - b

# Register with auto generated name
rpc.register(pow)

rpc.register(sub, "sub")

或者使用 register 方法

from patio import Registry

rpc = Registry(project="example")

def pow(a, b):
    return a ** b

def sub(a, b):
    return a - b

# Register with auto generated name
rpc.register(pow)

rpc.register(sub, "sub")

最后,您可以显式注册函数,就像它们只是一个字典一样

from patio import Registry

rpc = Registry(project="example")

def mul(a, b):
    return a * b

rpc['mul'] = mul

执行器

执行器是执行注册表中本地函数的实体。以下执行器在包中实现

  • AsyncExecutor - 实现异步任务池
  • ThreadPoolExecutor - 实现线程池
  • ProcessPoolExecutor - 实现进程池
  • NullExecutor - 不执行任何操作,仅用于禁止显式执行。

它的作用是可靠地执行作业,而不占用太多资源,以免造成服务中断或过度消耗内存。

执行器实例传递给代理,通常应用于整个注册表。因此,您应该了解注册表中必须包含哪些函数,以便选择执行器的类型。

代理

将任务分配的责任转移到用户实现的基方法是,通过这种方式,任务分配可以通过第三方代理、数据库或其他方式实现。

该包通过以下代理实现

  • MemoryBroker - 在单个进程中分配任务。一个非常简单的实现,如果您还不确定应用程序将如何发展,只想稍后决定使用哪个代理,同时为切换到另一个代理奠定基础。
  • TCPBroker - 仅使用TCP实现的简单代理,支持服务器和客户端模式,适用于任务执行器和任务提供者。

MemoryBroker

如果您目前不需要以分布式方式分配任务,这是一个很好的起点。

实际上,这是一种在项目的其他地方运行执行器中的任务的方法。

TCPBroker

它允许您在不求助于外部消息代理的情况下使任务分布式。

TCP代理实现的基本思想是,在执行任务方面,它们之间没有区别,这只是一个建立连接的方式,服务器和客户端都可以是执行任务的一方,也可以是设置任务的一方,也可以是混合模式。

换句话说,决定在您的系统中谁是服务器和谁是客户端,只是连接和找到彼此的一种方式,在您的分布式系统中。

以下是服务器与客户端之间组织通信的几种方式。

以服务器为中心的方案示例

server centric

该图描述了一个简单示例,假设有一个服务器和一个客户端通过TCP交换消息。

一个客户端多个服务器示例

multiple servers

这是一个客户端如何建立与一组服务器连接的示例。

全网状示例

full mesh

全网状方案,所有客户端都连接到所有服务器。

授权

授权发生在连接开始时,为此,参数 key=(默认为 b'')必须包含客户端和服务器相同的密钥。

重要的是要理解这并不能100%防止MITM等攻击。

只有当客户端和服务器位于受信任的网络中时,才应使用此方法。为了确保流量在互联网上的安全传输,应将 ssl_context= 参数添加到服务器和客户端(见下例)。

示例

以下示例可能会帮助您理解。

服务器执行任务
from functools import reduce

import asyncio

from patio import Registry
from patio.broker.tcp import TCPServerBroker
from patio.executor import ThreadPoolExecutor

rpc = Registry(project="test", auto_naming=False)


def mul(*args):
    return reduce(lambda x, y: x * y, args)


async def main():
    rpc.register(mul, "mul")

    async with ThreadPoolExecutor(rpc) as executor:
        async with TCPServerBroker(executor) as broker:
            # Start IPv4 server
            await broker.listen(address='127.0.0.1')

            # Start IPv6 server
            await broker.listen(address='::1', port=12345)

            await broker.join()


if __name__ == "__main__":
    asyncio.run(main())
客户端远程调用任务
import asyncio

from patio import Registry
from patio.broker.tcp import TCPClientBroker
from patio.executor import NullExecutor

rpc = Registry(project="test", auto_naming=False)


async def main():
    async with NullExecutor(rpc) as executor:
        async with TCPClientBroker(executor) as broker:
            # Connect to the IPv4 address
            await broker.connect(address='127.0.0.1')

            # Connect to the IPv6 address (optional)
            await broker.connect(address='::1', port=12345)

            print(
                await asyncio.gather(*[
                    broker.call('mul', i, i) for i in range(10)
                ]),
            )


if __name__ == "__main__":
    asyncio.run(main())
带有SSL的示例

任务归结为将ssl上下文传递给服务器和客户端。

以下是一个如何制作一些自签名证书和授权CA的示例。原始帖子 在这里

这只是一个示例,如果您想使用自己的证书,只需根据您的安全策略创建所需的ssl上下文。

证书授权机构创建

注意:这是用于签名证书请求的密钥,任何持有此密钥的人都可以代表您签名证书。所以请将其保存在安全的地方!

openssl req -x509 \
  -sha256 -days 3650 \
  -nodes \
  -newkey rsa:2048 \
  -subj "/CN=Patio Example CA/C=CC/L=West Island" \
  -keyout CA.key -out CA.pem
创建服务器证书

首先创建服务器私钥

openssl genrsa -out server.key 2048

然后创建一个证书请求并使用此密钥签名

openssl req \
  -new -sha256 \
  -key server.key \
  -subj "/CN=server.example.net/C=CC/L=West Island" \
  -out server.csr

由CA签名此请求

openssl x509 -req \
  -days 365 -sha256 \
  -in server.csr \
  -CA CA.pem \
  -CAkey CA.key \
  -CAcreateserial \
  -out server.pem

这应该足以加密流量。

带有SSL的服务器执行任务
from functools import reduce

import asyncio
import ssl

from patio import Registry
from patio.broker.tcp import TCPServerBroker
from patio.executor import ThreadPoolExecutor

rpc = Registry(project="test", auto_naming=False)


def mul(*args):
    return reduce(lambda x, y: x * y, args)


async def main():
    rpc.register(mul, "mul")

    ssl_context = ssl.SSLContext()
    ssl_context.load_verify_locations("path/to/CA.pem")
    ssl_context.load_cert_chain("path/to/server.pem", "path/to/server.key")

    async with ThreadPoolExecutor(rpc) as executor:
        async with TCPServerBroker(executor, ssl_context=ssl_context) as broker:
            # Start IPv4 server
            await broker.listen(address='127.0.0.1')

            # Start IPv6 server
            await broker.listen(address='::1', port=12345)

            await broker.join()


if __name__ == "__main__":
    asyncio.run(main())
客户端远程调用任务
import asyncio
import ssl

from patio import Registry
from patio.broker.tcp import TCPClientBroker
from patio.executor import NullExecutor


rpc = Registry(project="test", auto_naming=False)


async def main():
    ssl_context = ssl.create_default_context(cafile="path/to/CA.pem")

    async with NullExecutor(rpc) as executor:
        async with TCPClientBroker(executor, ssl_context=ssl_context) as broker:
            # Connect to the IPv4 address
            await broker.connect(address='127.0.0.1')

            # Connect to the IPv6 address (optional)
            await broker.connect(address='::1', port=12345)

            print(
                await asyncio.gather(*[
                    broker.call('mul', i, i) for i in range(10)
                ]),
            )


if __name__ == "__main__":
    asyncio.run(main())

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

patio-0.1.4.tar.gz (19.2 kB 查看哈希值)

上传时间

构建分布

patio-0.1.4-py3-none-any.whl (21.6 kB 查看哈希值)

上传时间 Python 3

由以下支持