跳转到主要内容

未提供项目描述

项目描述

dagorama (WIP)

Dagorama是一个用于后台处理的具有偏见的计算库。您可以将它看作是Celery、Airflow和Dask之间的混合体。

其主要设计目标是让用户能够在纯Python中编写链式数据处理逻辑,并轻松扩展到外部专用机器。此外,它还力求提供

  • 在纯Python中易于理解的流程控制,无需学习新的语言ORM
  • 为现代Python编写,支持IDE建议和mypy类型提示
  • 支持动态逻辑,如分支if语句,它根据某些运行时条件决定子树执行
  • 在测试时易于集成到一台机器上的单元测试
  • 为后台处理提供合理的默认值

快速入门

让我们以这个简单的示例为例,它代表了我们想要执行一系列长时间运行的阻塞任务。这些任务可能是ML推理、图像渲染或数据聚合。在任何情况下,它都需要处理几秒钟才能返回结果。特别是我们关注的是充分利用当前进程计算资源的逻辑。这与网络请求不同,网络请求有网络引入的延迟,可以通过asyncio轻松并行化。

from collections import Counter
from time import sleep, time

class MyTask:
    def entrypoint(self):
        results = [
            self.perform_work(i)
            for i in range(4)
        ]

        return self.rollup_statuses(results)

    def perform_work(self, identifier: int) -> int:
        sleep(2)
        return identifier // 2

    def rollup_statuses(self, responses: list[int]):
        return Counter([status for status in responses])

def main():
    start = time()
    task = MyTask()
    response = task.entrypoint()
    end = time()
    print(f"Response: {response} in {round(end-start, 2)}s")
    assert response == Counter({0: 2, 1: 2})

if __name__ == "__main__":
    main()

如果您运行此代码,您应该在处理一段时间后看到回显计数器。

$ python samples/test1.py
Response: Counter({0: 2, 1: 2}) in 8.01s

此代码还具有有效的类型提示,因此mypy在类型提示期间很满意。

$ mypy samples/test1.py
Success: no issues found in 1 source file

自然地,我们希望尽快获得结果,同时扩展到我们计算集群中机器(或机器)的资源。这几乎适合像Celery队列这样的东西 - 我们将为每个perform_work函数启动四个单独的工作。但我们如何处理聚合阶段?我们是否在主进程中阻塞,直到它完成?状态是如何管理的?如果主进程在我们完成之前退出,我们如何从我们之前的位置继续?

这种依赖链对于队列来说实际上并不好。相反,你可能需要一个更接近计算图或DAG的东西,它可以基于之前函数的成功完成来条件化后续函数。以下是你在dagorama中如何编写相同内容的示例。

from dagorama.decorators import dagorama
from dagorama.definition import DAGDefinition, resolve
from dagorama.runner import launch_workers
from dagorama_broker.launch import launch_broker

from collections import Counter
from time import sleep, time

class MyTask(DAGDefinition):
    @dagorama().syncfn
    def entrypoint(self):
        results = [
            self.perform_work(i)
            for i in range(4)
        ]

        return self.rollup_statuses(results)

    @dagorama().syncfn
    def perform_work(self, identifier: int) -> int:
        sleep(2)
        return identifier // 2

    @dagorama().syncfn
    def rollup_statuses(self, responses: list[int]):
        return Counter([status for status in responses])

def main():
    with launch_broker():
        start = time()
        task_definition = MyTask()
        instance, promise = task_definition()

        with launch_workers(4, exit_on_completion=True) as workers:
            # Wait for each to quit
            for worker in workers:
                worker.join()
        response = resolve(instance, promise)
        end = time() 

    print(f"Response: {response} in {round(end-start, 2)}s")
    assert response == Counter({0: 2, 1: 2})

if __name__ == "__main__":
    main()

为了将此逻辑放入一个脚本中,这里有一些不同的事情在进行。

  1. 我们用@dagorama装饰器包装了每个函数。这个装饰器表示应该在一个单独的worker节点上执行像self.perform_work()self.rollup_statuses()这样的函数执行。这类似于在传统队列中启动一个新的任务。
  2. 我们用with launch_broker()启动后台代理。这将启动一个单独的代理进程,它会在多个worker之间进行协调。
  3. 我们用launch_workers启动worker本身。在这种情况下,我们在单独的进程中执行工作。这也可以轻松地在不更改API或代码的情况下在单独的机器上执行。

与之前不同,我们现在完成的时间大约是主要工作的时间。

$ python run samples/test2.py
Response: Counter({0: 2, 1: 2}) in 3.03s

Mypy对我们的DAG定义也很满意。

$ mypy samples/test2.py
Success: no issues found in 1 source file

你会注意到核心MyTask类的差异非常小。

$ diff samples/test1.py samples/test2.py 
1c1,2
< class MyTask:
---
> class MyTask(DAGDefinition):
>     @dagorama().syncfn
9a11
>     @dagorama().syncfn
13a16
>     @dagorama().syncfn

这是dagorama的核心设计目标:编写纯Python代码并轻松扩展。

API说明

本节旨在成为你日常开发中需要了解的唯一部分,而无需偏离主题。

你希望相互流动的逻辑代码组应包含在一个继承自DAGDefinition的类中。你希望这段代码在每个worker节点上部署,以便它们能够访问相同的核心定义文件。Docker是首选机制,以确保相同的代码在每个设备上镜像,计算将按预期进行。当你用入口参数调用定义,如dag(1),它将为执行创建一个新的DAG实例。这个实例将包含DAG内的函数。例如,如果你正在处理5个单独的输入图像,你将希望启动5个DAGInstances。

dagorama代理将确保在调用较晚的DAG实例之前完成较早的DAG实例。优先级方案是按DAG实例化的顺序FIFO。这在需要从处理开始到DAG完成以用于近实时逻辑的情况中很有用。

你希望在每个单独的机器上执行的每个函数都应使用@dagorama装饰器包装。对这个函数的调用将被添加到计算图中,并适当分配。未装饰的类函数将在当前执行器内联运行。对于同步worker函数,使用@dagorama().syncfn装饰你的代码,对于异步函数,使用@dagorama().asyncfn

使用@dagorama装饰的函数将看起来像它返回类型提示的值给静态分析器,如mypy。这允许你通过传递逻辑值来编写更可解释的代码。然而,实际上,@dagorama函数在运行时将返回一个DAGPromise。这个DAGPromise没有意义——它还没有值,因为它还没有被传递给运行器。这些响应值应该只作为函数参数传递给其他使用@dagorama装饰的函数。当这样做时,worker将只执行该函数,一旦所有依赖项都已实现。

在某些情况下,你可能希望限制在特定硬件上运行的函数。一个常见的情况是,对于只应在GPU加速设备上运行的ML函数。我们遵循Kubernetes模型,为不应默认部署的每个函数添加一个污点,例如@dagorama(taint_name="GPU")。要从这个队列中拉取,worker必须明确指定这个污点,否则它们不会从支持队列中拉取。

要启动工作函数,请在您的虚拟环境中安装python包并运行

worker [--include-queue {queue}] [--exclude-queue {queue}] [--toleration {toleration}]

此工作应用程序支持以下环境变量

DAGORAMA_HOST - (可选)dagorama代理的主机 DAGORAMA_PORT - (可选)dagorama代理的端口

生产部署(进行中)

在本地测试之外,您可能不想在自己的机器上运行工作进程。相反,您可能希望将它们分布到多台机器上。在这种配置中,我们建议

  • 1个代理:在虚拟机、kubernetes pod或docker内部运行的Go可执行文件。由持久数据库支持,以便在运行时中断后恢复状态。
  • N个工人。执行计算的任务设备。可以是相同的物理硬件配置,也可以根据用例不同。
  • M个孵化器。第一个实例化DAG的服务。这通常是后端服务器等端应用程序。

Docker

为了方便起见,我们将代理打包为Docker镜像。此镜像基于alpine发行版,仅包含代理可执行文件,因此大小不到10MB。

docker pull piercefreeman/dagorama-broker:latest

类型提示

Dagorama应像普通函数一样满足类型提示。换句话说,您可以在将其传递到主DAG下游的其他函数之前将返回的DAGPromise视为已解决的值。

开发

鼓励对dagorama进行黑客攻击。以下是快速入门的一些步骤。

安装Python依赖项

我们使用Poetry管理依赖项。虽然这并不是严格必要的(pyproject.toml应在标准虚拟环境中通过pip安装),但是。

如果您还没有Poetry,请在这里安装它 这里

poetry install

安装gRPC

客户端通过gRPC与代理通信。您需要支持在Golang和Python中生成protobuf文件。

Golang快速入门: https://grpc.org.cn/docs/languages/go/quickstart/

go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
export PATH="$PATH:$(go env GOPATH)/bin"

当您更新grpc文件时,通过以下方式重新生成客户端和服务器定义文件

./build_protobuf.sh

单元测试

如果您想运行单元测试,还需要安装dagorama-broker。此便利包允许测试通过pytest固定点动态生成和销毁代理。

首先,在Python目录内创建golang代理的符号链接。这允许pip拥有可执行文件的构建逻辑。

ln -s $(pwd)/broker $(pwd)/dagorama-broker/broker

然后将其安装到当前环境中。

poetry run pip install -e ./dagorama-broker

项目详情


下载文件

下载适用于您的平台文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分发

dagorama-0.1.10.tar.gz (28.3 kB 查看哈希值)

上传时间

构建分发

dagorama-0.1.10-py3-none-any.whl (32.9 kB 查看哈希值)

上传时间 Python 3

由以下支持