未提供项目描述
项目描述
dagorama (WIP)
Dagorama是一个用于后台处理的具有偏见的计算库。您可以将它看作是Celery、Airflow和Dask之间的混合体。
其主要设计目标是让用户能够在纯Python中编写链式数据处理逻辑,并轻松扩展到外部专用机器。此外,它还力求提供
- 在纯Python中易于理解的流程控制,无需学习新的语言ORM
- 为现代Python编写,支持IDE建议和mypy类型提示
- 支持动态逻辑,如分支if语句,它根据某些运行时条件决定子树执行
- 在测试时易于集成到一台机器上的单元测试
- 为后台处理提供合理的默认值
快速入门
让我们以这个简单的示例为例,它代表了我们想要执行一系列长时间运行的阻塞任务。这些任务可能是ML推理、图像渲染或数据聚合。在任何情况下,它都需要处理几秒钟才能返回结果。特别是我们关注的是充分利用当前进程计算资源的逻辑。这与网络请求不同,网络请求有网络引入的延迟,可以通过asyncio轻松并行化。
from collections import Counter
from time import sleep, time
class MyTask:
def entrypoint(self):
results = [
self.perform_work(i)
for i in range(4)
]
return self.rollup_statuses(results)
def perform_work(self, identifier: int) -> int:
sleep(2)
return identifier // 2
def rollup_statuses(self, responses: list[int]):
return Counter([status for status in responses])
def main():
start = time()
task = MyTask()
response = task.entrypoint()
end = time()
print(f"Response: {response} in {round(end-start, 2)}s")
assert response == Counter({0: 2, 1: 2})
if __name__ == "__main__":
main()
如果您运行此代码,您应该在处理一段时间后看到回显计数器。
$ python samples/test1.py
Response: Counter({0: 2, 1: 2}) in 8.01s
此代码还具有有效的类型提示,因此mypy在类型提示期间很满意。
$ mypy samples/test1.py
Success: no issues found in 1 source file
自然地,我们希望尽快获得结果,同时扩展到我们计算集群中机器(或机器)的资源。这几乎适合像Celery队列这样的东西 - 我们将为每个perform_work
函数启动四个单独的工作。但我们如何处理聚合阶段?我们是否在主进程中阻塞,直到它完成?状态是如何管理的?如果主进程在我们完成之前退出,我们如何从我们之前的位置继续?
这种依赖链对于队列来说实际上并不好。相反,你可能需要一个更接近计算图或DAG的东西,它可以基于之前函数的成功完成来条件化后续函数。以下是你在dagorama中如何编写相同内容的示例。
from dagorama.decorators import dagorama
from dagorama.definition import DAGDefinition, resolve
from dagorama.runner import launch_workers
from dagorama_broker.launch import launch_broker
from collections import Counter
from time import sleep, time
class MyTask(DAGDefinition):
@dagorama().syncfn
def entrypoint(self):
results = [
self.perform_work(i)
for i in range(4)
]
return self.rollup_statuses(results)
@dagorama().syncfn
def perform_work(self, identifier: int) -> int:
sleep(2)
return identifier // 2
@dagorama().syncfn
def rollup_statuses(self, responses: list[int]):
return Counter([status for status in responses])
def main():
with launch_broker():
start = time()
task_definition = MyTask()
instance, promise = task_definition()
with launch_workers(4, exit_on_completion=True) as workers:
# Wait for each to quit
for worker in workers:
worker.join()
response = resolve(instance, promise)
end = time()
print(f"Response: {response} in {round(end-start, 2)}s")
assert response == Counter({0: 2, 1: 2})
if __name__ == "__main__":
main()
为了将此逻辑放入一个脚本中,这里有一些不同的事情在进行。
- 我们用
@dagorama
装饰器包装了每个函数。这个装饰器表示应该在一个单独的worker节点上执行像self.perform_work()
或self.rollup_statuses()
这样的函数执行。这类似于在传统队列中启动一个新的任务。 - 我们用
with launch_broker()
启动后台代理。这将启动一个单独的代理进程,它会在多个worker之间进行协调。 - 我们用launch_workers启动worker本身。在这种情况下,我们在单独的进程中执行工作。这也可以轻松地在不更改API或代码的情况下在单独的机器上执行。
与之前不同,我们现在完成的时间大约是主要工作的时间。
$ python run samples/test2.py
Response: Counter({0: 2, 1: 2}) in 3.03s
Mypy对我们的DAG定义也很满意。
$ mypy samples/test2.py
Success: no issues found in 1 source file
你会注意到核心MyTask类的差异非常小。
$ diff samples/test1.py samples/test2.py
1c1,2
< class MyTask:
---
> class MyTask(DAGDefinition):
> @dagorama().syncfn
9a11
> @dagorama().syncfn
13a16
> @dagorama().syncfn
这是dagorama的核心设计目标:编写纯Python代码并轻松扩展。
API说明
本节旨在成为你日常开发中需要了解的唯一部分,而无需偏离主题。
你希望相互流动的逻辑代码组应包含在一个继承自DAGDefinition
的类中。你希望这段代码在每个worker节点上部署,以便它们能够访问相同的核心定义文件。Docker是首选机制,以确保相同的代码在每个设备上镜像,计算将按预期进行。当你用入口参数调用定义,如dag(1)
,它将为执行创建一个新的DAG实例。这个实例将包含DAG内的函数。例如,如果你正在处理5个单独的输入图像,你将希望启动5个DAGInstances。
dagorama代理将确保在调用较晚的DAG实例之前完成较早的DAG实例。优先级方案是按DAG实例化的顺序FIFO。这在需要从处理开始到DAG完成以用于近实时逻辑的情况中很有用。
你希望在每个单独的机器上执行的每个函数都应使用@dagorama
装饰器包装。对这个函数的调用将被添加到计算图中,并适当分配。未装饰的类函数将在当前执行器内联运行。对于同步worker函数,使用@dagorama().syncfn
装饰你的代码,对于异步函数,使用@dagorama().asyncfn
。
使用@dagorama
装饰的函数将看起来像它返回类型提示的值给静态分析器,如mypy。这允许你通过传递逻辑值来编写更可解释的代码。然而,实际上,@dagorama函数在运行时将返回一个DAGPromise
。这个DAGPromise没有意义——它还没有值,因为它还没有被传递给运行器。这些响应值应该只作为函数参数传递给其他使用@dagorama
装饰的函数。当这样做时,worker将只执行该函数,一旦所有依赖项都已实现。
在某些情况下,你可能希望限制在特定硬件上运行的函数。一个常见的情况是,对于只应在GPU加速设备上运行的ML函数。我们遵循Kubernetes模型,为不应默认部署的每个函数添加一个污点,例如@dagorama(taint_name="GPU")
。要从这个队列中拉取,worker必须明确指定这个污点,否则它们不会从支持队列中拉取。
要启动工作函数,请在您的虚拟环境中安装python包并运行
worker [--include-queue {queue}] [--exclude-queue {queue}] [--toleration {toleration}]
此工作应用程序支持以下环境变量
DAGORAMA_HOST
- (可选)dagorama代理的主机 DAGORAMA_PORT
- (可选)dagorama代理的端口
生产部署(进行中)
在本地测试之外,您可能不想在自己的机器上运行工作进程。相反,您可能希望将它们分布到多台机器上。在这种配置中,我们建议
- 1个代理:在虚拟机、kubernetes pod或docker内部运行的Go可执行文件。由持久数据库支持,以便在运行时中断后恢复状态。
- N个工人。执行计算的任务设备。可以是相同的物理硬件配置,也可以根据用例不同。
- M个孵化器。第一个实例化DAG的服务。这通常是后端服务器等端应用程序。
Docker
为了方便起见,我们将代理打包为Docker镜像。此镜像基于alpine发行版,仅包含代理可执行文件,因此大小不到10MB。
docker pull piercefreeman/dagorama-broker:latest
类型提示
Dagorama应像普通函数一样满足类型提示。换句话说,您可以在将其传递到主DAG下游的其他函数之前将返回的DAGPromise视为已解决的值。
开发
鼓励对dagorama进行黑客攻击。以下是快速入门的一些步骤。
安装Python依赖项
我们使用Poetry管理依赖项。虽然这并不是严格必要的(pyproject.toml应在标准虚拟环境中通过pip安装),但是。
如果您还没有Poetry,请在这里安装它 这里。
poetry install
安装gRPC
客户端通过gRPC与代理通信。您需要支持在Golang和Python中生成protobuf文件。
Golang快速入门: https://grpc.org.cn/docs/languages/go/quickstart/
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
export PATH="$PATH:$(go env GOPATH)/bin"
当您更新grpc文件时,通过以下方式重新生成客户端和服务器定义文件
./build_protobuf.sh
单元测试
如果您想运行单元测试,还需要安装dagorama-broker
。此便利包允许测试通过pytest固定点动态生成和销毁代理。
首先,在Python目录内创建golang代理的符号链接。这允许pip拥有可执行文件的构建逻辑。
ln -s $(pwd)/broker $(pwd)/dagorama-broker/broker
然后将其安装到当前环境中。
poetry run pip install -e ./dagorama-broker
项目详情
下载文件
下载适用于您的平台文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。
源分发
构建分发
dagorama-0.1.10.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | c450724fe73a32d5463b964ce5801842c2b3402173e44e2f730bc19c7a83bd9f |
|
MD5 | 5692cbb7d9b6be83d8065c702ce4e9ec |
|
BLAKE2b-256 | b608274e2815c4d29337c3c5ecfc81ef5ce77226898f6bdacab11b3529d47a2a |