用于处理外部服务任务的Camunda Python客户端。
项目描述
avikom-camunda-client
这个yogeshrnaik/camunda-external-task-client-python3的分支利用了async/await
,并以异步方式运行工作者,而不是线程。此外,它还包含了一些封装Camunda REST API的便利函数,可以加快开发速度。
安装
安装此客户端最简单的方法是通过pip
pip install git+https://github.com/OpenAvikom/camunda-external-task-client-python3
如果您想运行示例,首先克隆项目并在本地运行一切可能更容易
git clone https://github.com/OpenAvikom/camunda-external-task-client-python3.git
cd camunda-external-task-client-python3
pip install -e . # install module editable
Hello World 示例
此示例将向您展示如何让Python客户端与Camunda通信。
首先,我们假设Camunda正在运行。如果不是这种情况,您可能想看看Camunda平台Docker镜像。其次,我们假设Camunda运行在localhost:8080。如果不是,您需要相应地调整下面的示例。第三,您需要一个部署和运行BPMN流程的方法。Camunda Modeler可能是一个好的起点。
模型
文件 bpmn_process/hello_world.bpmn
包含一个非常简单的模型
我们有一个开始事件(左侧圆圈),一个执行任务的服务任务和一个结束事件(右侧圆圈)。当你点击“启动当前流程图”(模型器中的“播放”按钮)时,不会发生任何事情。但流程正在运行。你可以检查 localhost:8080 来确保。
现在让我们更仔细地看看“Hello World”外部任务(右上角带有齿轮的方形符号)。属性面板中有趣的字段是 Implementation
和 Topic
Implementation
告诉 Camunda,该任务应由一个 External
工作者解决,而 Topic
是一个字符串,大致描述了要执行的任务。我们看到 Camunda 现在正在等待一个能够执行带有 HelloWorldTask
主题的任务的外部工作者。Camunda Cockpit 将显示一个正在运行的流程,等待 HelloWorld
被外部流程完成。现在让我们创建一个 Python 客户端,该客户端将订阅该主题,并通过仅返回一个成功事件来执行任务。文件可以在 examples/hello_world.py
中找到
import aiohttp
import asyncio
from camunda.external_task.external_task import ExternalTask
from camunda.external_task.external_task_worker import ExternalTaskWorker
from camunda.external_task.external_task_result import ExternalTaskResult
async def main():
# let's create an async http context with aiohttp
# aiohttp will close the connection when the worker returns (it won't though)
async with aiohttp.ClientSession() as session:
# We create a worker with a task id and pass the http session as well as the REST endpoint of Camunda.
# You need to change 'base_url' in case your Camunda engine is configured differently.
worker = ExternalTaskWorker(
worker_id=1, base_url="http://localhost:8080/engine-rest", session=session
)
print("waiting for a task ...")
# Subscribe is an async function which will block until the worker is cancelled with `worker.cancel()`,
# In this example, no one will do this. We will stop the program with Ctrl+C instead
# When the worker detects a new task for the topic assigned to `topic_name` it will trigger the
# function/method passed to `action`.
await worker.subscribe(topic_names="HelloWorldTask", action=process)
# this will be called when a task for the subscribed topic is available
async def process(task: ExternalTask) -> ExternalTaskResult:
print("I got a task!")
# To communicate the successfull processing of a task, we return an ExternalTaskResult created by `task.complete` .
# If we call `task.failure` instead, Camunda will publish the task again until
# some client finally completes it or the maximum amount of retries is reached.
return task.complete()
# run the main task
asyncio.run(main())
您可以从项目文件夹中运行该示例
python examples/hello_world.py
您应该在您的终端中看到类似以下内容
python ./examples/hello_world.py
waiting for a task ...
I got a task!
如果您没有看到第二行,您可能需要启动 Camunda 流程(在模型器中)。
处理数据
上面的示例非常简单,因为我们的客户端只是返回一个成功事件/结果,但实际上没有发生任何事情。在下一个示例中,我们将让 Python 工作者决定传入的数字是奇数还是偶数。首先,让我们看一下 BPMN bpmn_process/odd_number.bpmn
我们有两个外部 ServiceTasks
,分别标记为 Number is even!
和 Number is odd!
。它们都使用相同的主题 EchoTask
,我们将使用它来打印文本到终端。查看属性面板并点击其中一个任务的 Input/Output
选项卡。您应该看到类似以下内容
该任务定义了一个名为 text
的输入参数,类型为 Text
,值为 Number is even!
(或 odd)。接下来,让我们看看 Number Check
<bpmn:serviceTask id="Activity_NumberCheck" name="Number Check" camunda:type="external" camunda:topic="NumberCheckTask">
<bpmn:extensionElements>
<camunda:inputOutput>
<camunda:inputParameter name="number">42</camunda:inputParameter>
<camunda:outputParameter name="isOdd">${result}</camunda:outputParameter>
</camunda:inputOutput>
</bpmn:extensionElements>
<bpmn:incoming>Flow_Start</bpmn:incoming>
<bpmn:outgoing>Flow_OutCome</bpmn:outgoing>
</bpmn:serviceTask>
现在您可能会说“等等!这从哪里来?”。您可以使用模型器 GUI 来编辑值,但您也可以直接编辑 BPMN-XML。在左下角,您将看到两个标签为 Diagram
和 XML
的选项卡。这是您更改视图的地方。
我们看到主题 camunda:topic
已设置为 NumberCheckTask
,并且服务任务在 camunda:inputOutput
块中包含两个参数。我们将 42
分配给 number
,并将接收到的输出参数 result
分配给环境变量 isOdd
。当然,您可以根据需要更改 number
的值。您也不必在 XML 视图中这样做。只需切换回 Diagram
,然后在 Number Check
的属性面板中查找 Input/Output
选项卡。
最后,我们来看看网关(带大 X 的菱形框)。网关本身只有一个标签。这没有任何作用,但可以用来澄清网关将评估什么。我们将评估之前分配的 isOdd
变量。为此,我们需要在离开网关的流程上设置条件。如果您点击带有 "true"
标签的底部流程,属性面板将看起来像这样
参数 name
只是为了使你的模型更容易理解。 条件类型
和 表达式
将决定一个流程是否被视为有效。我们选择 表达式
,因为我们想检查 isOdd
的值。在 Camunda 中,这就像上图所示一样完成。当 isOdd == "true"
时,将执行服务任务 数字是奇数!
,然后流程将结束。网关上方的流程有一条穿过流程的线,这意味着这是没有满足其他条件时的默认流程。因此,我们检查 isOdd
是否等于 "true"
,如果是,就执行 数字是奇数!
,否则执行 数字是偶数!
。
现在让我们看看 examples/odd_number.py 的样子
import aiohttp
import asyncio
from camunda.external_task.external_task import ExternalTask, Variables
from camunda.external_task.external_task_worker import ExternalTaskWorker
from camunda.external_task.external_task_result import ExternalTaskResult
async def main():
async with aiohttp.ClientSession() as session:
# a new parameter! We can pass 'asyncResponseTimeout' to Camunda to configure how long a 'fetch and lock' connection
# is kept alive. If this value is rather large, clients do not need to reconnect that often but it will take longer for a
# worker to properly shut down. The default value is 30 seconds (300000)
worker = ExternalTaskWorker(
worker_id=4,
base_url="http://localhost:8080/engine-rest",
session=session,
config={"asyncResponseTimeout": 5000}, # wait 5 seconds before timeout
)
# Our worker will now subscribe to two topics now
# We will create a new task with `asyncio.create_task` and await only the second subscribe
asyncio.create_task(worker.subscribe(topic_names="NumberCheckTask", action=number_check))
await worker.subscribe(topic_names="EchoTask", action=echo)
async def number_check(task: ExternalTask) -> ExternalTaskResult:
try:
number = task.context_variables["number"]
print(f"We received {number} for checking...")
# We set a locally scoped variable 'result' to 'true' or 'false'
task.local_variables.set_variable(
"result", "true" if int(number) % 2 != 0 else "false", Variables.ValueType.STRING
)
# We pass the variables object as LOCAL variables which will only be available in the context of the task
# that called the external task worker. The result must be assigned in case it should be used somewhere else.
# Just have a look at the odd_number.bpmn to see how.
return task.complete()
# If your input could not be parsed with `int()` the task will fail
# and another external service could try to do better.
except Exception as err:
print(f"Oh no! Something went wrong: {err}")
return task.failure(
error_message=err.__class__.__name__,
error_details=str(err),
max_retries=3,
retry_timeout=5000,
)
async def echo(task: ExternalTask) -> ExternalTaskResult:
print(f"Camunda wants to say: {task.context_variables['text']}")
return task.complete()
# run the main task
asyncio.run(main())
我们现在有两个回调,我们的工作者将订阅它们适当的主题,如 BPMN 模型所示。我们还减少了工作者的 asyncResponseTimeout
,以防止已经关闭的 Python 客户端意外获取任务。这只是为了简单起见的一个权宜之计。建议在实际关闭整个过程之前等待工作者订阅返回(通过等待 worker.cancel()
)。
在 number_check
和 echo
中,我们看到了如何从 ExternalTask
对象 task
中检索变量。请注意,使用 task.context_variables["key"]
检索变量将在 key
不存在时引发 KeyError
。为了处理可选变量,可以使用 task.context_variables.get_variable("key")
,如果找不到 key
,它将返回 None
。在 number_check
中,还展示了如何创建一个 Variables
对象,分配一个值,以及如何将此对象作为 局部 变量对象传递。局部变量只能在服务任务的作用域内使用。这就是为什么我们必须将 result
分配给 Camunda 中的输出参数。我们也可以将 variables
传递给 global_variables
。这样,result
就可以在整个流程中使用。然而,在更复杂的场景中,这可能会使环境变得混乱,甚至可能导致变量定义冲突。保持局部变量可以提高通过保持变量作用域狭窄和在 BPMN 模型中 可见 的控制。
当你运行示例时,你应该在启动 Camunda 流程后看到如下输出
python ./examples/odd_number.py
We received 42 for checking...
Camunda wants to say: Number is even!
管理多个工作者/订阅
上面提到了当客户端停止和重新启动时的一些问题。Camunda 连接可能在下一次 Camunda 调度任务时没有正确释放。这可能导致已停止/不活动的实例锁定任务。处理这个问题的一种方法是在捕获 KeyboardInterrupt
或任何其他表示关闭的事件时等待 worker.cancel()
,如 examples/manage_tasks.py 所示
import aiohttp
import asyncio
from camunda.external_task.external_task import ExternalTask, Variables
from camunda.external_task.external_task_worker import ExternalTaskWorker
from camunda.external_task.external_task_result import ExternalTaskResult
class Worker:
def __init__(self):
self.worker = None
self.loop = None
def start(self):
"""Run the worker and block forever"""
self.loop = asyncio.get_event_loop()
self.loop.run_until_complete(self._run())
async def _run(self):
async with aiohttp.ClientSession() as session:
self.worker = ExternalTaskWorker(
worker_id=4, base_url="http://localhost:8080/engine-rest", session=session
)
# dispatch the first subscription
self.loop.create_task(
self.worker.subscribe(topic_names="NumberCheckTask", action=number_check)
)
# and block the current task with the second subscription again
await self.worker.subscribe(topic_names="EchoTask", action=echo)
def stop(self):
self.loop.run_until_complete(self.worker.cancel())
async def number_check(task: ExternalTask) -> ExternalTaskResult:
try:
number = task.context_variables["number"]
print(f"We received {number} for checking...")
task.local_variables.set_variable(
"result", "true" if int(number) % 2 != 0 else "false", Variables.ValueType.STRING
)
return task.complete()
except Exception as err:
print(f"Oh no! Something went wrong: {err}")
return task.failure()
async def echo(task: ExternalTask) -> ExternalTaskResult:
print(f"Camunda wants to say: {task.context_variables['text']}")
return task.complete()
# run the main task
try:
worker = Worker()
worker.start()
except KeyboardInterrupt:
# Stopping workers might take a while.
# How long it will take depends on the chosen asyncResponseTimeout (default is 30000 ms)
print(f"Stopping workers...")
worker.stop()
print(f"All done!")
上面的代码基本上与之前的 odd_number
示例相同,但我们将这些异步部分包装在一个 Worker
类中,并添加了启动和停止工作者及其订阅的方法。根据你愿意等待多长时间来关闭,你可能需要调整 asyncResponseTimeout
。
项目详情
下载文件
下载适用于您的平台的文件。如果您不确定要选择哪一个,请了解更多关于 安装包 的信息。
源代码分发
构建分发
avikom_camunda_client-0.10.0.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | da70d993afe5fd9260300ba7566b9d30b6575a8119796f6837c859176688544a |
|
MD5 | e6366bd3ec5c39951b2a18f7affc2b0c |
|
BLAKE2b-256 | 3448cbe124226eedc8af9bcbbeef43987e44fd4963d5a76bda178ce2e8a67b81 |
avikom_camunda_client-0.10.0-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | d79895034f6bb5db53c07d70d412c64d562396b2343d25bb8f29e1e8e156fc58 |
|
MD5 | a650e16bc21020f1c0e1370978ff3d39 |
|
BLAKE2b-256 | 3fd14707aa25e944e6701bcfa9fa4d03bcdf968a5c1976f1f2dd3d3ce47a1e18 |