跳转到主要内容

结构化并发 "actors"

项目描述

logo tractor: Python并行计算的下一代

gh_actions Documentation Status

tractor 是一个基于 结构化并发多进程 运行时,构建在 trio 之上。

基本上,tractor 通过 trio-“actors”提供并行性:我们的 nurseries 允许您生成新的Python进程,每个进程运行一个 trio 计划的运行时 - 一个对 trio.run() 的调用。

我们相信该系统遵循了“actor模型”的三个公理,但可能看起来并不像您可能想象中的“actor模型”那样,这是故意的。

理解“tractor”的第一步是掌握trio的基础。一个好的起点是查看trio文档和这篇博客文章

特性

  • 它仅仅是一个trio API

  • 无限可嵌套的过程树

  • 内置的IPC流API,带有任务扇出广播

  • 使用pdb++的(可能是第一个?)“原生”多核调试器用户界面

  • 支持可交换的、与操作系统特定的进程启动层

  • 模块化传输堆栈,允许自定义序列化(例如,使用msgspec)、通信协议和环境特定的IPC原语

  • 支持通过“感染asyncio”模式生成进程级-SC,循环间的点对点任务定向asyncio演员

  • 从头开始的structured concurrency

在进程中运行一个函数

使用trio的将tasks视为函数的风格

"""
Run with a process monitor from a terminal using::

    $TERM -e watch -n 0.1  "pstree -a $$" \
        & python examples/parallelism/single_func.py \
        && kill $!

"""
import os

import tractor
import trio


async def burn_cpu():

    pid = os.getpid()

    # burn a core @ ~ 50kHz
    for _ in range(50000):
        await trio.sleep(1/50000/50)

    return os.getpid()


async def main():

    async with tractor.open_nursery() as n:

        portal = await n.run_in_actor(burn_cpu)

        #  burn rubber in the parent too
        await burn_cpu()

        # wait on result from target function
        pid = await portal.result()

    # end of nursery block
    print(f"Collected subproc {pid}")


if __name__ == '__main__':
    trio.run(main)

这将在新的进程中运行burn_cpu(),并在nursery块完成时回收它。

如果您只需要运行一个同步函数并检索一个结果,您可能想看看trio-parallel

僵尸安全:销毁进程树

tractor试图保护您免受僵尸进程的影响,无论发生什么情况。

"""
Run with a process monitor from a terminal using::

    $TERM -e watch -n 0.1  "pstree -a $$" \
        & python examples/parallelism/we_are_processes.py \
        && kill $!

"""
from multiprocessing import cpu_count
import os

import tractor
import trio


async def target():
    print(
        f"Yo, i'm '{tractor.current_actor().name}' "
        f"running in pid {os.getpid()}"
    )

   await trio.sleep_forever()


async def main():

    async with tractor.open_nursery() as n:

        for i in range(cpu_count()):
            await n.run_in_actor(target, name=f'worker_{i}')

        print('This process tree will self-destruct in 1 sec...')
        await trio.sleep(1)

        # raise an error in root actor/process and trigger
        # reaping of all minions
        raise Exception('Self Destructed')


if __name__ == '__main__':
    try:
        trio.run(main)
    except Exception:
        print('Zombies Contained')

如果您可以创建僵尸子进程(不使用系统信号),那么它就是一个错误

“原生”多进程调试

通过pdb++的魔法和我们的内部IPC,我们已经能够为您的tractor树中的任何(子)进程创建原生感觉的调试体验。

from os import getpid

import tractor
import trio


async def breakpoint_forever():
    "Indefinitely re-enter debugger in child actor."
    while True:
        yield 'yo'
        await tractor.breakpoint()


async def name_error():
    "Raise a ``NameError``"
    getattr(doggypants)


async def main():
    """Test breakpoint in a streaming actor.
    """
    async with tractor.open_nursery(
        debug_mode=True,
        loglevel='error',
    ) as n:

        p0 = await n.start_actor('bp_forever', enable_modules=[__name__])
        p1 = await n.start_actor('name_error', enable_modules=[__name__])

        # retreive results
        stream = await p0.run(breakpoint_forever)
        await p1.run(name_error)


if __name__ == '__main__':
    trio.run(main)

您可以通过以下方式运行此命令

>>> python examples/debugging/multi_daemon_subactors.py

是的,还有一个内置的崩溃处理模式B)

我们希望很快添加一个从REPL重新启动的系统!

SC兼容的双向流

是的,您在这里首先看到了;我们提供了可靠、可传递的设置/销毁语义的2向流。

我们的API初稿类似于trio.Nursery.start()风格的调用

import trio
import tractor


@tractor.context
async def simple_rpc(

    ctx: tractor.Context,
    data: int,

) -> None:
    '''Test a small ping-pong 2-way streaming server.

    '''
    # signal to parent that we're up much like
    # ``trio_typing.TaskStatus.started()``
    await ctx.started(data + 1)

    async with ctx.open_stream() as stream:

        count = 0
        async for msg in stream:

            assert msg == 'ping'
            await stream.send('pong')
            count += 1

        else:
            assert count == 10


async def main() -> None:

    async with tractor.open_nursery() as n:

        portal = await n.start_actor(
            'rpc_server',
            enable_modules=[__name__],
        )

        # XXX: this syntax requires py3.9
        async with (

            portal.open_context(
                simple_rpc,
                data=10,
            ) as (ctx, sent),

            ctx.open_stream() as stream,
        ):

            assert sent == 11

            count = 0
            # receive msgs using async for style
            await stream.send('ping')

            async for msg in stream:
                assert msg == 'pong'
                await stream.send('ping')
                count += 1

                if count >= 9:
                    break


        # explicitly teardown the daemon-actor
        await portal.cancel_actor()


if __name__ == '__main__':
    trio.run(main)

请参阅#53中的原始提案和讨论,以及我们希望听到您意见的#223后续改进!

工作池很简单

大多数新用户的初始请求是“我如何创建一个工作池?”

tractor设计用来处理您能想象到的任何SC(结构化并发)进程树;“工作池”模式是一个简单的特例。

我们有完整的worker pool重实现了std-lib的concurrent.futures.ProcessPoolExecutor示例,供参考。

您可以通过以下方式运行它(从该目录运行)以实时查看进程树

$TERM -e watch -n 0.1  "pstree -a $$" \
    & python examples/parallelism/concurrent_actors_primes.py \
    && kill $!

这不需要额外的线程、复杂的信号量或期货;我们需要的只是tractor的IPC!

“感染asyncio”模式

您有一堆希望强制在进程级别执行同步代码的 asyncio 代码吗?

请查看我们的实验性系统,用于控制 guest-modeasyncio 演员

import asyncio
from statistics import mean
import time

import trio
import tractor


async def aio_echo_server(
    to_trio: trio.MemorySendChannel,
    from_trio: asyncio.Queue,
) -> None:

    # a first message must be sent **from** this ``asyncio``
    # task or the ``trio`` side will never unblock from
    # ``tractor.to_asyncio.open_channel_from():``
    to_trio.send_nowait('start')

    # XXX: this uses an ``from_trio: asyncio.Queue`` currently but we
    # should probably offer something better.
    while True:
        # echo the msg back
        to_trio.send_nowait(await from_trio.get())
        await asyncio.sleep(0)


@tractor.context
async def trio_to_aio_echo_server(
    ctx: tractor.Context,
):
    # this will block until the ``asyncio`` task sends a "first"
    # message.
    async with tractor.to_asyncio.open_channel_from(
        aio_echo_server,
    ) as (first, chan):

        assert first == 'start'
        await ctx.started(first)

        async with ctx.open_stream() as stream:

            async for msg in stream:
                await chan.send(msg)

                out = await chan.receive()
                # echo back to parent actor-task
                await stream.send(out)


async def main():

    async with tractor.open_nursery() as n:
        p = await n.start_actor(
            'aio_server',
            enable_modules=[__name__],
            infect_asyncio=True,
        )
        async with p.open_context(
            trio_to_aio_echo_server,
        ) as (ctx, first):

            assert first == 'start'

            count = 0
            async with ctx.open_stream() as stream:

                delays = []
                send = time.time()

                await stream.send(count)
                async for msg in stream:
                    recv = time.time()
                    delays.append(recv - send)
                    assert msg == count
                    count += 1
                    send = time.time()
                    await stream.send(count)

                    if count >= 1e3:
                        break

        print(f'mean round trip rate (Hz): {1/mean(delays)}')
        await p.cancel_actor()


if __name__ == '__main__':
    trio.run(main)

是的,我们启动了一个 Python 进程,运行 asyncio,在 asyncio 循环上启动 trio,然后向 trio 安排的任务发送命令,告诉 asyncio 任务该做什么 XD

我们需要帮助改进 asyncio 端的通道 API,使其更像 trio。欢迎您在 #273 上提出您的意见!

高级“集群”API

为了更加简洁,tractor 开发者开始尝试为管理演员树/集群开发一些“高级”API。目前,这些接口应被视为临时性的,但我们鼓励您尝试它们并提供反馈。这是一个新的 API,让您可以快速启动一个扁平集群

import trio
import tractor


async def sleepy_jane():
    uid = tractor.current_actor().uid
    print(f'Yo i am actor {uid}')
    await trio.sleep_forever()


async def main():
    '''
    Spawn a flat actor cluster, with one process per
    detected core.

    '''
    portal_map: dict[str, tractor.Portal]
    results: dict[str, str]

    # look at this hip new syntax!
    async with (

        tractor.open_actor_cluster(
            modules=[__name__]
        ) as portal_map,

        trio.open_nursery() as n,
    ):

        for (name, portal) in portal_map.items():
            n.start_soon(portal.run, sleepy_jane)

        await trio.sleep(0.5)

        # kill the cluster with a cancel
        raise KeyboardInterrupt


if __name__ == '__main__':
    try:
        trio.run(main)
    except KeyboardInterrupt:
        pass

安装

从 PyPi 安装

pip install tractor

从 git 安装

pip install git+git://github.com/goodboy/tractor.git

底层

tractor 试图将 trionic 结构化并发 与分布式 Python 相结合。您可以将其视为 trio 跨进程 或简单地将其视为对 stdlib 的 multiprocessing 的有见地的替代品,但它是从底层构建的异步编程原语。

不要被这个描述吓到。 tractor 只是 trio,但具有进程管理和可取消流式 IPC 的孵化器。如果您了解如何使用 triotractor 将为您提供可能需要的并行性。

等等!我以为“演员”有消息、邮箱和其他东西呢?

让我们停下来问问您实际阅读了多少个标准演员模型论文 ;)

根据我们的经验,许多“演员系统”并不是真正的“演员模型”,因为它们 没有遵循 3 个公理,并且对 无界非确定性 问题的关注甚至更少(这正是模型最初创建的初衷)。

从作者口中,唯一的要求遵循 3 个公理仅此而已

tractor 遵循所谓的“演员模型”的基本要求

In response to a message, an actor may:

- send a finite number of new messages
- create a finite number of new actors
- designate a new behavior to process subsequent messages

并且 不需要做出任何进一步 API 改变即可实现这一点。

如果您想进一步讨论这个问题,请随时在我们的聊天中发言或在以下问题中讨论 (在您阅读完它们之后)

让我们澄清我们的术语

无论 tractor 是否具有“演员”都应基本上与用户无关,除了用于引用我们主要运行时原语之间的交互:每个 Python 进程 + trio.run() + 周围的 IPC 机制。这些是我们的高级、基本 运行时抽象单元,它们既是(在 Python 中尽可能多)也将被称为我们的 “演员”

拖拉机的核心目标是允许开发高度分布式的软件,通过遵循结构化并发,使系统以可预测、可恢复甚至可能可理解的方式失败;“actor模型”只是描述系统特性的方式之一。

待办事项

帮助我们推动分布式Python的未来。

  • 通过组合上下文管理器实现Erlang风格的监督器(见#22

  • 类型化消息协议(例如,通过msgspec.Struct,见#36

  • 类型化能力基础(对话)协议(见#196,在#311中开始草案工作)

想打个招呼吗?

该项目与trio(即tractor从那个杰出的社区中获得大部分想法)的持续发展紧密相关。如果您想帮忙,有建议或只是想打个招呼,请随时通过我们的matrix频道联系我们。如果matrix看起来太前卫,我们大多数人也在trio gitter频道上!

项目详情


下载文件

下载适用于您平台的应用程序。如果您不确定选择哪一个,请了解更多关于安装包的信息。

源分布

tractor-0.1.0a5.tar.gz (106.7 kB 查看散列)

上传时间

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面