工作流引擎。
项目描述
刀片
刀片是一个分布式、可扩展、持久和高度可用的编排引擎,以可扩展和弹性的方式执行异步和同步长时间运行的业务逻辑。刀片需要Python 3.7或更高版本以使用新的async/await
语法和变量类型注解。
披萨订购和配送工作流示例
以下是如何使用此库构建和运行披萨订购工作流的示例
披萨工作流由2个流程组成
- 订单:负责与订单服务通信,下订单(CommandTask)并等待订单准备就绪(ListenerTask)
- 配送:一旦订单准备就绪,此流程将与配送服务通信,开始披萨订单的配送(CommandTask)
步骤1 实例化刀片
workflow_engine = Dagger(
broker=KAFKA_ADMIN_CLIENT_URL,
store="aerospike://",
consumer_auto_offset_reset="latest",
task_update_topic="task_update_topic",
trigger_interval=600,
aerospike_config=aerospike_config,
enable_changelog=False,
web_port=6066,
serializer="raw",
)
步骤2 定义订单流程的叶命令和监听任务
假设订单微服务通过一个Kafka主题 - pizza_order_topic
处理传入的订单,JSON模式如下
{
"order_id": "id",
"pizza_type": "1",
"customer_id": "customer_id"
}
使用这些信息,让我们通过覆盖实现业务逻辑的execute
方法来构建OrderCommandTask,该方法将如何将有效载荷发送到订单服务通过Kafka主题
class OrderCommandTask(KafkaCommandTask[str, str]):
async def execute(
self,
runtime_parameters: Dict[str, str],
workflow_instance: ITemplateDAGInstance,
) -> None:
payload = {
"order_id": runtime_parameters["order_id"],
"customer_id": runtime_parameters["customer_id"],
"pizza_type": runtime_parameters["pizza_type"],
}
await workflow_engine.topics[self.topic].send(
value=json.dumps(payload)
)
执行OrderCommandTask
后,工作流应进入WAIT_STATE
,直到它从OrderService收到有关订单状态的消息。假设Order Service在订单准备就绪时在以下JSON格式的消息上发送一个消息到Kafka主题:order_status_topic
{
"order_id": "id",
"status": "READY"
}
让我们通过在监听器上实现 get_correlatable_keys_from_payload
和 on_message
方法来对 OrderListenerTask
进行建模,以处理 order_status_topic
上的消息。它还需要将 correletable_key
指定为 order_id
以查找有效载荷。
class PizzaWaitForReadyListener(KafkaListenerTask[str, str]):
correlatable_key = "order_id"
async def get_correlatable_keys_from_payload(
self, payload: Any
) -> List[TaskLookupKey]:
tpayload = json.loads(payload)
key = tpayload[self.correlatable_key]
return [(self.correlatable_key, key)]
async def on_message(
self, runtime_parameters: Dict[str, VT], *args: Any, **kwargs: Any
) -> bool :
logger.info(f"Pizza Order is Ready")
return True
当订单服务在 order_status_topic
上发送状态消息时,Dagger 会调用 get_correlatable_keys_from_payload
来确定该消息属于哪个工作流实例。一旦确定工作流实例,它就会在相应的 ListenerTask 上调用 on_message
。
现在我们已经对 LEAF 任务进行了建模,让我们将它们附加到父 Order
流程。
def pizza_ordering_process(
process_name: str = "Order"
) -> IProcessTemplateDAGBuilder:
dag_builder = DAGBuilderHelper(dagger_app=workflow_engine)
root_task = dag_builder.build_and_link_tasks(
[
dag_builder.generic_command_task_builder(
topic="pizza_order_topic",
task_type=OrderCommandTask,
process_name=process_name,
),
dag_builder.generic_listener_task_builder(
topic="PizzaWaitForReadyListener",
task_type=PizzaWaitForReadyListener,
process_name=process_name,
),
]
)
return dag_builder.generic_process_builder(process_name=process_name, root_task=root_task)
当命令任务和 PizzaWaitForReadyListener
都完成时,订单流程就处于 COMPLETED
状态,然后工作流转换到执行下一个流程 Delivery
。
步骤 3:为 Delivery 流程定义 Leaf 命令任务
假设配送服务只需要一个具有以下模式的 HTTP POST 请求
{
"order_id": "id",
"customer_id": "customer_id"
}
我们可以通过以下方式实现 execute
方法来对 DeliveryCommandTask 进行建模,以 POST 这个有效载荷
class DeliveryCommandTask(ExecutorTask[str, str]):
async def execute(
self, runtime_parameters: Dict[str, VT], workflow_instance: ITask = None
) -> None:
payload = {
"order_id": runtime_parameters["order_id"],
"customer_id": runtime_parameters["customer_id"],
}
async with aiohttp.ClientSession() as session:
async with session.post(url="http://www.deliverysvc.com", json=payload):
pass
让我们将其附加到父 Delivery
流程。
def pizza_delivery_process(
process_name: str = "Delivery",
) -> IProcessTemplateDAGBuilder:
dag_builder = DAGBuilderHelper(dagger_app=workflow_engine)
root_task = dag_builder.build_and_link_tasks(
[
dag_builder.generic_executor_task_builder(
task_type=DeliveryCommandTask,
name=process_name,
)
]
)
return dag_builder.generic_process_builder(
process_name=process_name, root_task=root_task
)
步骤 4:定义流程执行顺序并使用 register_template
注册工作流定义
根据工作流,我们希望首先执行 Order
流程,然后才是 Delivery
流程。工作流确保只有在 Order
流程中的任务都处于终止状态后,才执行 Delivery
任务。
@Dagger.register_template("PizzaWorkflow")
def register_pizza_workflow(template_name: str) -> ITemplateDAG:
dag_builder_helper = DAGBuilderHelper(workflow_engine)
order_process = dag_builder_helper.build_and_link_processes(
[
pizza_ordering_process(process_name="Order"),
pizza_delivery_process(process_name="Delivery"),
]
)
return dag_builder_helper.generic_template(
template_name=template_name, root_process=order_process
)
步骤 5:定义一个 API 来实例化和执行披萨订单工作流
async def create_and_submit_pizza_delivery_workflow(
order_id: str, customer_id: str, pizza_type: int
):
pizza_workflow_template = workflow_engine.template_dags["PizzaWorkflow"]
pizza_workflow_instance = await pizza_workflow_template.create_instance(
uuid.uuid1(),
repartition=False, # Create this instance on the current worker
order_id=order_id,
customer_id=customer_id,
pizza_type=pizza_type,
)
await workflow_engine.submit(pizza_workflow_instance, repartition=False)
步骤 6:启动工作进程
workflow_engine.main()
Dagger 是
简单
Dagger 非常易于使用。为了开始,应用程序需要安装这个库,使用默认模板定义 DAG 或根据用例扩展它们,创建这些 DAG 的实例并将它们调度执行。该库隐藏了从 Kafka 生产和消费的所有复杂性,维护 Kafka Streams 拓扑处理以及创建任务的持久化和恢复
高可用性
Dagger 具有高可用性,并且可以应对网络问题和服务器崩溃。在节点失败的情况下,它可以自动恢复状态存储(表示任务数据)或切换到备用节点。
分布式
根据需要启动更多应用程序实例以在系统上分配负载
快速
单个核心工作进程实例每秒可以处理数万个任务。Dagger 使用基于 rocksDB 的快速键值查找存储,并复制到 Kafka 主题以实现容错。
安装
您可以通过 Wayfair 工艺品库或从源代码安装 dagger。
使用 pip
安装
pip install wf-dagger
dagger 依赖于 faust-streaming
用于 Kafka 流处理
常见问题解答
支持哪个版本的 Python?
dagger 支持 Python 版本 >= 3.7
支持哪些 Kafka 版本?
dagger 支持 Kafka 版本 >= 0.10。
路线图
请参阅 开放问题 以获取功能建议(和已知问题)列表。
贡献
贡献使得开源社区成为一个如此美妙的学习、灵感和创造的地方。您所做的任何贡献都将受到 高度赞赏。有关详细的贡献指南,请参阅 CONTRIBUTING.md
许可协议
在 MIT LICENSE
许可下分发。有关更多信息,请参阅 LICENSE
。
联系方式
vikram patki - vpatki@wayfair.com
项目链接: https://github.com/wayfair-incubator/dagger
致谢
此模板借鉴自 https://github.com/othneildrew/Best-README-Template。
文档
请查看 项目文档。
关于存储库结构和如何与代码库合作的概述,请阅读开发指南。
项目详情
下载文件
下载适用于您平台文件的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。