跳转到主要内容

工作流引擎。

项目描述

CI pipeline status PyPI Contributor Covenant Maintainer codecov Checked with mypy Code style: black

刀片

刀片是一个分布式、可扩展、持久和高度可用的编排引擎,以可扩展和弹性的方式执行异步和同步长时间运行的业务逻辑。刀片需要Python 3.7或更高版本以使用新的async/await语法和变量类型注解。

披萨订购和配送工作流示例

以下是如何使用此库构建和运行披萨订购工作流的示例

Pizza Workflow

披萨工作流由2个流程组成

  • 订单:负责与订单服务通信,下订单(CommandTask)并等待订单准备就绪(ListenerTask)
  • 配送:一旦订单准备就绪,此流程将与配送服务通信,开始披萨订单的配送(CommandTask)

步骤1 实例化刀片

workflow_engine = Dagger(
                    broker=KAFKA_ADMIN_CLIENT_URL,
                    store="aerospike://",
                    consumer_auto_offset_reset="latest",
                    task_update_topic="task_update_topic",
                    trigger_interval=600,
                    aerospike_config=aerospike_config,
                    enable_changelog=False,
                    web_port=6066,
                    serializer="raw",
                )

步骤2 定义订单流程的叶命令和监听任务

假设订单微服务通过一个Kafka主题 - pizza_order_topic处理传入的订单,JSON模式如下

{
  "order_id": "id",
  "pizza_type": "1",
  "customer_id": "customer_id"
}

使用这些信息,让我们通过覆盖实现业务逻辑的execute方法来构建OrderCommandTask,该方法将如何将有效载荷发送到订单服务通过Kafka主题

class OrderCommandTask(KafkaCommandTask[str, str]):
    async def execute(
        self,
        runtime_parameters: Dict[str, str],
        workflow_instance: ITemplateDAGInstance,
    ) -> None:
        payload = {
            "order_id": runtime_parameters["order_id"],
            "customer_id": runtime_parameters["customer_id"],
            "pizza_type": runtime_parameters["pizza_type"],
        }
        await workflow_engine.topics[self.topic].send(
            value=json.dumps(payload)
        )

执行OrderCommandTask后,工作流应进入WAIT_STATE,直到它从OrderService收到有关订单状态的消息。假设Order Service在订单准备就绪时在以下JSON格式的消息上发送一个消息到Kafka主题:order_status_topic

{
  "order_id": "id",
  "status": "READY"
}

让我们通过在监听器上实现 get_correlatable_keys_from_payloadon_message 方法来对 OrderListenerTask 进行建模,以处理 order_status_topic 上的消息。它还需要将 correletable_key 指定为 order_id 以查找有效载荷。

class PizzaWaitForReadyListener(KafkaListenerTask[str, str]):
    correlatable_key = "order_id"

    async def get_correlatable_keys_from_payload(
        self, payload: Any
    ) -> List[TaskLookupKey]:
        tpayload = json.loads(payload)
        key = tpayload[self.correlatable_key]
        return [(self.correlatable_key, key)]

    async def on_message(
            self, runtime_parameters: Dict[str, VT], *args: Any, **kwargs: Any
    ) -> bool :
        logger.info(f"Pizza Order is Ready")
        return True

当订单服务在 order_status_topic 上发送状态消息时,Dagger 会调用 get_correlatable_keys_from_payload 来确定该消息属于哪个工作流实例。一旦确定工作流实例,它就会在相应的 ListenerTask 上调用 on_message

现在我们已经对 LEAF 任务进行了建模,让我们将它们附加到父 Order 流程。

def pizza_ordering_process(
    process_name: str = "Order"
) -> IProcessTemplateDAGBuilder:
    dag_builder = DAGBuilderHelper(dagger_app=workflow_engine)
    root_task = dag_builder.build_and_link_tasks(
        [
           dag_builder.generic_command_task_builder(
                topic="pizza_order_topic",
                task_type=OrderCommandTask,
                process_name=process_name,
            ),
            dag_builder.generic_listener_task_builder(
                topic="PizzaWaitForReadyListener",
                task_type=PizzaWaitForReadyListener,
                process_name=process_name,
            ),
        ]
    )
    return dag_builder.generic_process_builder(process_name=process_name, root_task=root_task)

当命令任务和 PizzaWaitForReadyListener 都完成时,订单流程就处于 COMPLETED 状态,然后工作流转换到执行下一个流程 Delivery

步骤 3:为 Delivery 流程定义 Leaf 命令任务

假设配送服务只需要一个具有以下模式的 HTTP POST 请求

{
  "order_id": "id",
  "customer_id": "customer_id"
}

我们可以通过以下方式实现 execute 方法来对 DeliveryCommandTask 进行建模,以 POST 这个有效载荷

class DeliveryCommandTask(ExecutorTask[str, str]):
    async def execute(
        self, runtime_parameters: Dict[str, VT], workflow_instance: ITask = None
    ) -> None:
        payload = {
            "order_id": runtime_parameters["order_id"],
            "customer_id": runtime_parameters["customer_id"],
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(url="http://www.deliverysvc.com", json=payload):
                pass

让我们将其附加到父 Delivery 流程。

def pizza_delivery_process(
    process_name: str = "Delivery",
) -> IProcessTemplateDAGBuilder:
    dag_builder = DAGBuilderHelper(dagger_app=workflow_engine)
    root_task = dag_builder.build_and_link_tasks(
        [
            dag_builder.generic_executor_task_builder(
                task_type=DeliveryCommandTask,
                name=process_name,
            )
        ]
    )
    return dag_builder.generic_process_builder(
        process_name=process_name, root_task=root_task
    )

步骤 4:定义流程执行顺序并使用 register_template 注册工作流定义

根据工作流,我们希望首先执行 Order 流程,然后才是 Delivery 流程。工作流确保只有在 Order 流程中的任务都处于终止状态后,才执行 Delivery 任务。

@Dagger.register_template("PizzaWorkflow")
def register_pizza_workflow(template_name: str) -> ITemplateDAG:
    dag_builder_helper = DAGBuilderHelper(workflow_engine)
    order_process = dag_builder_helper.build_and_link_processes(
        [
            pizza_ordering_process(process_name="Order"),
            pizza_delivery_process(process_name="Delivery"),
        ]
    )
    return dag_builder_helper.generic_template(
        template_name=template_name, root_process=order_process
    )

步骤 5:定义一个 API 来实例化和执行披萨订单工作流

async def create_and_submit_pizza_delivery_workflow(
    order_id: str, customer_id: str, pizza_type: int
):
    pizza_workflow_template = workflow_engine.template_dags["PizzaWorkflow"]
    pizza_workflow_instance = await pizza_workflow_template.create_instance(
        uuid.uuid1(),
        repartition=False,  # Create this instance on the current worker
        order_id=order_id,
        customer_id=customer_id,
        pizza_type=pizza_type,
    )
    await workflow_engine.submit(pizza_workflow_instance, repartition=False)

步骤 6:启动工作进程

workflow_engine.main()

Dagger 是

简单

Dagger 非常易于使用。为了开始,应用程序需要安装这个库,使用默认模板定义 DAG 或根据用例扩展它们,创建这些 DAG 的实例并将它们调度执行。该库隐藏了从 Kafka 生产和消费的所有复杂性,维护 Kafka Streams 拓扑处理以及创建任务的持久化和恢复

高可用性

Dagger 具有高可用性,并且可以应对网络问题和服务器崩溃。在节点失败的情况下,它可以自动恢复状态存储(表示任务数据)或切换到备用节点。

分布式

根据需要启动更多应用程序实例以在系统上分配负载

快速

单个核心工作进程实例每秒可以处理数万个任务。Dagger 使用基于 rocksDB 的快速键值查找存储,并复制到 Kafka 主题以实现容错。

安装

您可以通过 Wayfair 工艺品库或从源代码安装 dagger。

使用 pip 安装

pip install wf-dagger

dagger 依赖于 faust-streaming 用于 Kafka 流处理

常见问题解答

支持哪个版本的 Python?

dagger 支持 Python 版本 >= 3.7

支持哪些 Kafka 版本?

dagger 支持 Kafka 版本 >= 0.10。

路线图

请参阅 开放问题 以获取功能建议(和已知问题)列表。

贡献

贡献使得开源社区成为一个如此美妙的学习、灵感和创造的地方。您所做的任何贡献都将受到 高度赞赏。有关详细的贡献指南,请参阅 CONTRIBUTING.md

许可协议

MIT LICENSE 许可下分发。有关更多信息,请参阅 LICENSE

联系方式

vikram patki - vpatki@wayfair.com

项目链接: https://github.com/wayfair-incubator/dagger

致谢

此模板借鉴自 https://github.com/othneildrew/Best-README-Template

文档

请查看 项目文档

关于存储库结构和如何与代码库合作的概述,请阅读开发指南

项目详情


下载文件

下载适用于您平台文件的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

wf-dagger-0.2.6.tar.gz (61.6 kB 查看哈希值)

上传时间 源代码

构建分发

wf_dagger-0.2.6-py3-none-any.whl (66.4 kB 查看哈希值)

上传时间 Python 3

由以下支持