使用asyncio的异步分布式进程池
项目描述
简介
Distex提供了一个分布式进程池,用于利用多个CPU或机器。它使用asyncio来高效管理工作进程。
特性
可扩展到1到1000多个处理器;
每秒可以处理大约50,000个小任务;
易于与SSH(安全外壳)主机一起使用;
完全支持异步;
映射到无界可迭代对象;
安装
pip3 install -U distex
当使用远程主机时,distex也必须安装在这些主机上。请确保distex_proc脚本可以在路径中找到。
对于SSH主机:由于没有密码支持,应使用SSH密钥进行身份验证。可以使用以下命令测试远程安装:
ssh <host> distex_proc
依赖关系
Python版本3.6或更高;
在Unix上推荐安装uvloop包:pip3 install uvloop
SSH客户端和服务器(可选)。
示例
进程池可以包含本地和远程工作者。以下是一个使用4个本地工作者的进程池
from distex import Pool
def f(x):
return x*x
pool = Pool(4)
for y in pool.map(f, range(100)):
print(y)
要创建一个在主机 maxi 上也使用8个工作者的进程池,使用ssh
pool = Pool(4, 'ssh://maxi/8')
结合使用 eventkit 的进程池
from distex import Pool
import eventkit as ev
import bz2
pool = Pool()
# await pool # un-comment in Jupyter
data = [b'A' * 1000000] * 1000
pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()
print(pipe.run()) # in Jupyter: print(await pipe)
pool.shutdown()
对每个可想象到的异步构造都有全面的支持
import asyncio
from distex import Pool
def init():
# pool initializer: set the start time for every worker
import time
import builtins
builtins.t0 = time.time()
async def timer(i=0):
# async code running in the pool
import time
import asyncio
await asyncio.sleep(1)
return time.time() - t0
async def ait():
# async iterator running on the user side
for i in range(20):
await asyncio.sleep(0.1)
yield i
async def main():
async with Pool(4, initializer=init, qsize=1) as pool:
async for t in pool.map_async(timer, ait()):
print(t)
print(await pool.run_on_all_async(timer))
asyncio.run(main())
高级架构
Distex不使用远程“任务服务器”。相反,它是反过来进行的:首先启动本地服务器;然后启动本地和远程工作者,每个工作者都将自行连接回服务器。当所有工作者都连接上后,进程池就准备好投入使用了。
每个工作者由一个运行asyncio事件循环的单线程进程组成。这个循环用于通信和运行异步任务。同步任务以阻塞方式运行。
使用ssh时,从远程(或“反向”)Unix套接字到本地服务器监听的本地Unix套接字创建了一个远程(或“反向”)隧道。远程机器上的多个工作者将使用相同的Unix套接字并共享相同的ssh隧道。
使用的是普通的 ssh 可执行文件,而不是像 AsyncSSH 这样的更优雅的解决方案。这是为了将加密/解密的计算量保持在事件循环之外,并将其卸载到 ssh 进程。
文档
- 作者::
Ewald de Wit <ewald.de.wit@gmail.com>
项目详情
下载文件
下载适用于您平台的应用程序。如果您不确定选择哪个,请了解更多关于 安装包 的信息。