结构化并发 "actors"
项目描述
tractor: Python并行计算的下一代
tractor 是一个基于 结构化并发、多进程 运行时,构建在 trio 之上。
基本上,tractor 通过 trio-“actors”提供并行性:我们的 nurseries 允许您生成新的Python进程,每个进程运行一个 trio 计划的运行时 - 一个对 trio.run() 的调用。
我们相信该系统遵循了“actor模型”的三个公理,但可能看起来并不像您可能想象中的“actor模型”那样,这是故意的。
理解“tractor”的第一步是掌握trio的基础。一个好的起点是查看trio文档和这篇博客文章。
特性
在进程中运行一个函数
使用trio的将tasks视为函数的风格
"""
Run with a process monitor from a terminal using::
$TERM -e watch -n 0.1 "pstree -a $$" \
& python examples/parallelism/single_func.py \
&& kill $!
"""
import os
import tractor
import trio
async def burn_cpu():
pid = os.getpid()
# burn a core @ ~ 50kHz
for _ in range(50000):
await trio.sleep(1/50000/50)
return os.getpid()
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(burn_cpu)
# burn rubber in the parent too
await burn_cpu()
# wait on result from target function
pid = await portal.result()
# end of nursery block
print(f"Collected subproc {pid}")
if __name__ == '__main__':
trio.run(main)
这将在新的进程中运行burn_cpu(),并在nursery块完成时回收它。
如果您只需要运行一个同步函数并检索一个结果,您可能想看看trio-parallel。
僵尸安全:销毁进程树
tractor试图保护您免受僵尸进程的影响,无论发生什么情况。
"""
Run with a process monitor from a terminal using::
$TERM -e watch -n 0.1 "pstree -a $$" \
& python examples/parallelism/we_are_processes.py \
&& kill $!
"""
from multiprocessing import cpu_count
import os
import tractor
import trio
async def target():
print(
f"Yo, i'm '{tractor.current_actor().name}' "
f"running in pid {os.getpid()}"
)
await trio.sleep_forever()
async def main():
async with tractor.open_nursery() as n:
for i in range(cpu_count()):
await n.run_in_actor(target, name=f'worker_{i}')
print('This process tree will self-destruct in 1 sec...')
await trio.sleep(1)
# raise an error in root actor/process and trigger
# reaping of all minions
raise Exception('Self Destructed')
if __name__ == '__main__':
try:
trio.run(main)
except Exception:
print('Zombies Contained')
如果您可以创建僵尸子进程(不使用系统信号),那么它就是一个错误。
“原生”多进程调试
通过pdb++的魔法和我们的内部IPC,我们已经能够为您的tractor树中的任何(子)进程创建原生感觉的调试体验。
from os import getpid
import tractor
import trio
async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor."
while True:
yield 'yo'
await tractor.breakpoint()
async def name_error():
"Raise a ``NameError``"
getattr(doggypants)
async def main():
"""Test breakpoint in a streaming actor.
"""
async with tractor.open_nursery(
debug_mode=True,
loglevel='error',
) as n:
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])
p1 = await n.start_actor('name_error', enable_modules=[__name__])
# retreive results
stream = await p0.run(breakpoint_forever)
await p1.run(name_error)
if __name__ == '__main__':
trio.run(main)
您可以通过以下方式运行此命令
>>> python examples/debugging/multi_daemon_subactors.py
是的,还有一个内置的崩溃处理模式B)
我们希望很快添加一个从REPL重新启动的系统!
SC兼容的双向流
是的,您在这里首先看到了;我们提供了可靠、可传递的设置/销毁语义的2向流。
我们的API初稿类似于trio.Nursery.start()风格的调用
import trio
import tractor
@tractor.context
async def simple_rpc(
ctx: tractor.Context,
data: int,
) -> None:
'''Test a small ping-pong 2-way streaming server.
'''
# signal to parent that we're up much like
# ``trio_typing.TaskStatus.started()``
await ctx.started(data + 1)
async with ctx.open_stream() as stream:
count = 0
async for msg in stream:
assert msg == 'ping'
await stream.send('pong')
count += 1
else:
assert count == 10
async def main() -> None:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'rpc_server',
enable_modules=[__name__],
)
# XXX: this syntax requires py3.9
async with (
portal.open_context(
simple_rpc,
data=10,
) as (ctx, sent),
ctx.open_stream() as stream,
):
assert sent == 11
count = 0
# receive msgs using async for style
await stream.send('ping')
async for msg in stream:
assert msg == 'pong'
await stream.send('ping')
count += 1
if count >= 9:
break
# explicitly teardown the daemon-actor
await portal.cancel_actor()
if __name__ == '__main__':
trio.run(main)
工作池很简单
大多数新用户的初始请求是“我如何创建一个工作池?”
tractor设计用来处理您能想象到的任何SC(结构化并发)进程树;“工作池”模式是一个简单的特例。
我们有完整的worker pool重实现了std-lib的concurrent.futures.ProcessPoolExecutor示例,供参考。
您可以通过以下方式运行它(从该目录运行)以实时查看进程树
$TERM -e watch -n 0.1 "pstree -a $$" \ & python examples/parallelism/concurrent_actors_primes.py \ && kill $!
这不需要额外的线程、复杂的信号量或期货;我们需要的只是tractor的IPC!
“感染asyncio”模式
您有一堆希望强制在进程级别执行同步代码的 asyncio 代码吗?
请查看我们的实验性系统,用于控制 guest-mode 的 asyncio 演员
import asyncio
from statistics import mean
import time
import trio
import tractor
async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
) -> None:
# a first message must be sent **from** this ``asyncio``
# task or the ``trio`` side will never unblock from
# ``tractor.to_asyncio.open_channel_from():``
to_trio.send_nowait('start')
# XXX: this uses an ``from_trio: asyncio.Queue`` currently but we
# should probably offer something better.
while True:
# echo the msg back
to_trio.send_nowait(await from_trio.get())
await asyncio.sleep(0)
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
# this will block until the ``asyncio`` task sends a "first"
# message.
async with tractor.to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):
assert first == 'start'
await ctx.started(first)
async with ctx.open_stream() as stream:
async for msg in stream:
await chan.send(msg)
out = await chan.receive()
# echo back to parent actor-task
await stream.send(out)
async def main():
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_server',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
trio_to_aio_echo_server,
) as (ctx, first):
assert first == 'start'
count = 0
async with ctx.open_stream() as stream:
delays = []
send = time.time()
await stream.send(count)
async for msg in stream:
recv = time.time()
delays.append(recv - send)
assert msg == count
count += 1
send = time.time()
await stream.send(count)
if count >= 1e3:
break
print(f'mean round trip rate (Hz): {1/mean(delays)}')
await p.cancel_actor()
if __name__ == '__main__':
trio.run(main)
是的,我们启动了一个 Python 进程,运行 asyncio,在 asyncio 循环上启动 trio,然后向 trio 安排的任务发送命令,告诉 asyncio 任务该做什么 XD
我们需要帮助改进 asyncio 端的通道 API,使其更像 trio。欢迎您在 #273 上提出您的意见!
高级“集群”API
为了更加简洁,tractor 开发者开始尝试为管理演员树/集群开发一些“高级”API。目前,这些接口应被视为临时性的,但我们鼓励您尝试它们并提供反馈。这是一个新的 API,让您可以快速启动一个扁平集群
import trio
import tractor
async def sleepy_jane():
uid = tractor.current_actor().uid
print(f'Yo i am actor {uid}')
await trio.sleep_forever()
async def main():
'''
Spawn a flat actor cluster, with one process per
detected core.
'''
portal_map: dict[str, tractor.Portal]
results: dict[str, str]
# look at this hip new syntax!
async with (
tractor.open_actor_cluster(
modules=[__name__]
) as portal_map,
trio.open_nursery() as n,
):
for (name, portal) in portal_map.items():
n.start_soon(portal.run, sleepy_jane)
await trio.sleep(0.5)
# kill the cluster with a cancel
raise KeyboardInterrupt
if __name__ == '__main__':
try:
trio.run(main)
except KeyboardInterrupt:
pass
安装
从 PyPi 安装
pip install tractor
从 git 安装
pip install git+git://github.com/goodboy/tractor.git
底层
tractor 试图将 trionic 结构化并发 与分布式 Python 相结合。您可以将其视为 trio 跨进程 或简单地将其视为对 stdlib 的 multiprocessing 的有见地的替代品,但它是从底层构建的异步编程原语。
不要被这个描述吓到。 tractor 只是 trio,但具有进程管理和可取消流式 IPC 的孵化器。如果您了解如何使用 trio,tractor 将为您提供可能需要的并行性。
等等!我以为“演员”有消息、邮箱和其他东西呢?
让我们停下来问问您实际阅读了多少个标准演员模型论文 ;)
根据我们的经验,许多“演员系统”并不是真正的“演员模型”,因为它们 没有遵循 3 个公理,并且对 无界非确定性 问题的关注甚至更少(这正是模型最初创建的初衷)。
tractor 遵循所谓的“演员模型”的基本要求
In response to a message, an actor may: - send a finite number of new messages - create a finite number of new actors - designate a new behavior to process subsequent messages
并且 不需要做出任何进一步 API 改变即可实现这一点。
如果您想进一步讨论这个问题,请随时在我们的聊天中发言或在以下问题中讨论 (在您阅读完它们之后)
让我们澄清我们的术语
无论 tractor 是否具有“演员”都应基本上与用户无关,除了用于引用我们主要运行时原语之间的交互:每个 Python 进程 + trio.run() + 周围的 IPC 机制。这些是我们的高级、基本 运行时抽象单元,它们既是(在 Python 中尽可能多)也将被称为我们的 “演员”。
拖拉机的核心目标是允许开发高度分布式的软件,通过遵循结构化并发,使系统以可预测、可恢复甚至可能可理解的方式失败;“actor模型”只是描述系统特性的方式之一。
待办事项
帮助我们推动分布式Python的未来。
想打个招呼吗?
该项目与trio(即tractor从那个杰出的社区中获得大部分想法)的持续发展紧密相关。如果您想帮忙,有建议或只是想打个招呼,请随时通过我们的matrix频道联系我们。如果matrix看起来太前卫,我们大多数人也在trio gitter频道上!
项目详情
tractor-0.1.0a5.tar.gz的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | 20390371dff39659dbe4d2a4088a27a571570cedd23fb5714bc033c9e883eea1 |
|
MD5 | 095545ceae1e62ecb3f4c5947e8d0988 |
|
BLAKE2b-256 | 20a410f84b3003768c8880f8ca848a0b87f0324f5d94960dcd2b1d1aee8bacb8 |