跳转到主要内容

并行运行多个子进程的功能

项目描述

Latest version released on PyPi Build status BSD 3-Clause

提供并行运行子进程命令的实用类和函数

>>> from random import randrange
>>> from commandpool import run

>>> commands = ['sleep %s' % randrange(5) for _ in range(100)]

>>> for proc, cmd in run(commands):
...    print(proc.returncode, proc, cmd, sep=', ')
0, <subprocess.Popen object at 0x7fa470b5e278>, sleep 1
0, <subprocess.Popen object at 0x7fa470b449b0>, sleep 2
0, <subprocess.Popen object at 0x7fa470b53d30>, sleep 2
0, <subprocess.Popen object at 0x7fa470b44b70>, sleep 3
0, <subprocess.Popen object at 0x7fa470b53cf8>, sleep 3
0, <subprocess.Popen object at 0x7fa470b53d68>, sleep 4

查看此库提供的功能的一种方式是将它视为subprocess的等价物。

echo $commands | xargs -P $concurrency sh -c

此库通过定期检查启动的进程是否已完成,然后在其位置启动新的进程来工作。

安装

可以从pypi安装commandpool的最新稳定版本

$ pip install commandpool

用法

功能性

from commandpool import run

# Run at most 5 commands at a time.
run(commands, concurrency=5)

# Start all commands at the same time (this is the default).
run(commands, concurrency=None)

# The duration between 'ticks' is configurable.
run(commands, sleep_seconds=0.1)

# Processing commands as they are finished.
for proc, cmd in run(commands):
    assert isinstance(proc, subprocess.Popen)

# The way commands are started is configurable through `start_command`.
from subprocess import Popen, PIPE

commands = {i: ('echo', i*i) for i in range(5, 10)}
start_command = lambda num: Popen(commands[num], stdout=PIPE)

for proc, cmd in run(commands, start_command=start_command):
    print(proc.stdout, cmd, commands[cmd])
# b'25', 5, ('echo', 25)
# b'36', 6, ('echo', 36)
# ...

子类化

from commandpool import ConcurrentCommandRunner

class MyCommandRunner(ConcurrentCommandRunner):
   def start_command(self, cmd):
       ...

   def command_finished(self, proc, cmd):
       ...

runner = MyCommandRunner(commands, sleep_interval=1.0)
runner.run()

待办事项

  • 添加测试。

  • 完成文档。

替代方案

ConcurrentCommandRunner可以通过使用concurrent.futures在几行内实现,假设为每个命令创建一个线程是可以接受的。这也具有的优点,即当命令(包装在future中)完成时立即产生,而不是像ConcurrentCommandRunner那样在sleep_seconds间隔时产生。

from concurrent.futures import ThreadPoolExecutor, as_completed
from subprocess import run

with ThreadPoolExecutor(max_workers=10) as pool:
   futures = {pool.submit(run, cmd): cmd for cmd in commands}
   for res in as_completed(futures):
       print(futures[res], res.returncode)

许可

修订版BSD许可下发布。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪一个,请了解更多关于 安装软件包 的信息。

源代码发行版

commandpool-0.0.0.tar.gz (4.8 kB 查看哈希值)

上传时间 源代码

构建发行版

commandpool-0.0.0-py2.py3-none-any.whl (7.2 kB 查看哈希值)

上传时间 Python 2 Python 3

由以下支持

AWSAWS云计算和安全赞助商DatadogDatadog监控FastlyFastlyCDNGoogleGoogle下载分析MicrosoftMicrosoftPSF赞助商PingdomPingdom监控SentrySentry错误记录StatusPageStatusPage状态页面