跳转到主要内容

使用asyncio的异步分布式进程池

项目描述

Build PyPi Documentation

简介

Distex提供了一个分布式进程池,用于利用多个CPU或机器。它使用asyncio来高效管理工作进程。

特性

  • 可扩展到1到1000多个处理器;

  • 每秒可以处理大约50,000个小任务;

  • 易于与SSH(安全外壳)主机一起使用;

  • 完全支持异步;

  • 映射到无界可迭代对象;

  • concurrent.futures.ProcessPool(或PEP3148)兼容。

安装

pip3 install -U distex

当使用远程主机时,distex也必须安装在这些主机上。请确保distex_proc脚本可以在路径中找到。

对于SSH主机:由于没有密码支持,应使用SSH密钥进行身份验证。可以使用以下命令测试远程安装:

ssh <host> distex_proc

依赖关系

  • Python版本3.6或更高;

  • 在Unix上推荐安装uvloop包:pip3 install uvloop

  • SSH客户端和服务器(可选)。

示例

进程池可以包含本地和远程工作者。以下是一个使用4个本地工作者的进程池

from distex import Pool

def f(x):
    return x*x

pool = Pool(4)
for y in pool.map(f, range(100)):
    print(y)

要创建一个在主机 maxi 上也使用8个工作者的进程池,使用ssh

pool = Pool(4, 'ssh://maxi/8')

结合使用 eventkit 的进程池

from distex import Pool
import eventkit as ev
import bz2

pool = Pool()
# await pool  # un-comment in Jupyter
data = [b'A' * 1000000] * 1000

pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()

print(pipe.run())  # in Jupyter: print(await pipe)
pool.shutdown()

对每个可想象到的异步构造都有全面的支持

import asyncio
from distex import Pool

def init():
    # pool initializer: set the start time for every worker
    import time
    import builtins
    builtins.t0 = time.time()

async def timer(i=0):
    # async code running in the pool
    import time
    import asyncio
    await asyncio.sleep(1)
    return time.time() - t0

async def ait():
    # async iterator running on the user side
    for i in range(20):
        await asyncio.sleep(0.1)
        yield i

async def main():
    async with Pool(4, initializer=init, qsize=1) as pool:
        async for t in pool.map_async(timer, ait()):
            print(t)
        print(await pool.run_on_all_async(timer))


asyncio.run(main())

高级架构

Distex不使用远程“任务服务器”。相反,它是反过来进行的:首先启动本地服务器;然后启动本地和远程工作者,每个工作者都将自行连接回服务器。当所有工作者都连接上后,进程池就准备好投入使用了。

每个工作者由一个运行asyncio事件循环的单线程进程组成。这个循环用于通信和运行异步任务。同步任务以阻塞方式运行。

使用ssh时,从远程(或“反向”)Unix套接字到本地服务器监听的本地Unix套接字创建了一个远程(或“反向”)隧道。远程机器上的多个工作者将使用相同的Unix套接字并共享相同的ssh隧道。

使用的是普通的 ssh 可执行文件,而不是像 AsyncSSH 这样的更优雅的解决方案。这是为了将加密/解密的计算量保持在事件循环之外,并将其卸载到 ssh 进程。

文档

Distex文档

作者:

Ewald de Wit <ewald.de.wit@gmail.com>

项目详情


下载文件

下载适用于您平台的应用程序。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

distex-0.7.2.tar.gz (18.2 kB 查看散列)

上传时间

构建分发

distex-0.7.2-py3-none-any.whl (19.3 kB 查看散列)

上传时间 Python 3

支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面