并行运行多个子进程的功能
项目描述
提供并行运行子进程命令的实用类和函数
>>> from random import randrange
>>> from commandpool import run
>>> commands = ['sleep %s' % randrange(5) for _ in range(100)]
>>> for proc, cmd in run(commands):
... print(proc.returncode, proc, cmd, sep=', ')
0, <subprocess.Popen object at 0x7fa470b5e278>, sleep 1
0, <subprocess.Popen object at 0x7fa470b449b0>, sleep 2
0, <subprocess.Popen object at 0x7fa470b53d30>, sleep 2
0, <subprocess.Popen object at 0x7fa470b44b70>, sleep 3
0, <subprocess.Popen object at 0x7fa470b53cf8>, sleep 3
0, <subprocess.Popen object at 0x7fa470b53d68>, sleep 4
查看此库提供的功能的一种方式是将它视为subprocess的等价物。
echo $commands | xargs -P $concurrency sh -c
此库通过定期检查启动的进程是否已完成,然后在其位置启动新的进程来工作。
安装
可以从pypi安装commandpool的最新稳定版本
$ pip install commandpool
用法
功能性
from commandpool import run
# Run at most 5 commands at a time.
run(commands, concurrency=5)
# Start all commands at the same time (this is the default).
run(commands, concurrency=None)
# The duration between 'ticks' is configurable.
run(commands, sleep_seconds=0.1)
# Processing commands as they are finished.
for proc, cmd in run(commands):
assert isinstance(proc, subprocess.Popen)
# The way commands are started is configurable through `start_command`.
from subprocess import Popen, PIPE
commands = {i: ('echo', i*i) for i in range(5, 10)}
start_command = lambda num: Popen(commands[num], stdout=PIPE)
for proc, cmd in run(commands, start_command=start_command):
print(proc.stdout, cmd, commands[cmd])
# b'25', 5, ('echo', 25)
# b'36', 6, ('echo', 36)
# ...
子类化
from commandpool import ConcurrentCommandRunner
class MyCommandRunner(ConcurrentCommandRunner):
def start_command(self, cmd):
...
def command_finished(self, proc, cmd):
...
runner = MyCommandRunner(commands, sleep_interval=1.0)
runner.run()
待办事项
添加测试。
完成文档。
替代方案
ConcurrentCommandRunner可以通过使用concurrent.futures在几行内实现,假设为每个命令创建一个线程是可以接受的。这也具有的优点,即当命令(包装在future中)完成时立即产生,而不是像ConcurrentCommandRunner那样在sleep_seconds间隔时产生。
from concurrent.futures import ThreadPoolExecutor, as_completed
from subprocess import run
with ThreadPoolExecutor(max_workers=10) as pool:
futures = {pool.submit(run, cmd): cmd for cmd in commands}
for res in as_completed(futures):
print(futures[res], res.returncode)
许可
在修订版BSD许可下发布。