跳转到主要内容

用于处理外部服务任务的Camunda Python客户端。

项目描述

avikom-camunda-client

Version Build Status PyPI GitHub commits License

这个yogeshrnaik/camunda-external-task-client-python3的分支利用了async/await,并以异步方式运行工作者,而不是线程。此外,它还包含了一些封装Camunda REST API的便利函数,可以加快开发速度。

安装

安装此客户端最简单的方法是通过pip

pip install git+https://github.com/OpenAvikom/camunda-external-task-client-python3

如果您想运行示例,首先克隆项目并在本地运行一切可能更容易

git clone https://github.com/OpenAvikom/camunda-external-task-client-python3.git
cd camunda-external-task-client-python3
pip install -e .   # install module editable 

Hello World 示例

此示例将向您展示如何让Python客户端与Camunda通信。

首先,我们假设Camunda正在运行。如果不是这种情况,您可能想看看Camunda平台Docker镜像。其次,我们假设Camunda运行在localhost:8080。如果不是,您需要相应地调整下面的示例。第三,您需要一个部署和运行BPMN流程的方法。Camunda Modeler可能是一个好的起点。

模型

文件 bpmn_process/hello_world.bpmn 包含一个非常简单的模型

我们有一个开始事件(左侧圆圈),一个执行任务的服务任务和一个结束事件(右侧圆圈)。当你点击“启动当前流程图”(模型器中的“播放”按钮)时,不会发生任何事情。但流程正在运行。你可以检查 localhost:8080 来确保。

现在让我们更仔细地看看“Hello World”外部任务(右上角带有齿轮的方形符号)。属性面板中有趣的字段是 ImplementationTopic

Implementation 告诉 Camunda,该任务应由一个 External 工作者解决,而 Topic 是一个字符串,大致描述了要执行的任务。我们看到 Camunda 现在正在等待一个能够执行带有 HelloWorldTask 主题的任务的外部工作者。Camunda Cockpit 将显示一个正在运行的流程,等待 HelloWorld 被外部流程完成。现在让我们创建一个 Python 客户端,该客户端将订阅该主题,并通过仅返回一个成功事件来执行任务。文件可以在 examples/hello_world.py 中找到

import aiohttp
import asyncio

from camunda.external_task.external_task import ExternalTask
from camunda.external_task.external_task_worker import ExternalTaskWorker
from camunda.external_task.external_task_result import ExternalTaskResult


async def main():
    # let's create an async http context with aiohttp
    # aiohttp will close the connection when the worker returns (it won't though)
    async with aiohttp.ClientSession() as session:
        # We create a worker with a task id and pass the http session as well as the REST endpoint of Camunda.
        # You need to change 'base_url' in case your Camunda engine is configured differently.
        worker = ExternalTaskWorker(
            worker_id=1, base_url="http://localhost:8080/engine-rest", session=session
        )
        print("waiting for a task ...")
        # Subscribe is an async function which will block until the worker is cancelled with `worker.cancel()`,
        # In this example, no one will do this. We will stop the program with Ctrl+C instead
        # When the worker detects a new task for the topic assigned to `topic_name` it will trigger the
        # function/method passed to `action`.
        await worker.subscribe(topic_names="HelloWorldTask", action=process)


# this will be called when a task for the subscribed topic is available
async def process(task: ExternalTask) -> ExternalTaskResult:
    print("I got a task!")
    # To communicate the successfull processing of a task, we return an ExternalTaskResult created by `task.complete` .
    # If we call `task.failure` instead, Camunda will publish the task again until
    # some client finally completes it or the maximum amount of retries is reached.
    return task.complete()


# run the main task
asyncio.run(main())

您可以从项目文件夹中运行该示例

python examples/hello_world.py

您应该在您的终端中看到类似以下内容

python ./examples/hello_world.py
waiting for a task ...
I got a task!

如果您没有看到第二行,您可能需要启动 Camunda 流程(在模型器中)。

处理数据

上面的示例非常简单,因为我们的客户端只是返回一个成功事件/结果,但实际上没有发生任何事情。在下一个示例中,我们将让 Python 工作者决定传入的数字是奇数还是偶数。首先,让我们看一下 BPMN bpmn_process/odd_number.bpmn

我们有两个外部 ServiceTasks,分别标记为 Number is even!Number is odd!。它们都使用相同的主题 EchoTask,我们将使用它来打印文本到终端。查看属性面板并点击其中一个任务的 Input/Output 选项卡。您应该看到类似以下内容

该任务定义了一个名为 text 的输入参数,类型为 Text,值为 Number is even!(或 odd)。接下来,让我们看看 Number Check

<bpmn:serviceTask id="Activity_NumberCheck" name="Number Check" camunda:type="external" camunda:topic="NumberCheckTask">
    <bpmn:extensionElements>
    <camunda:inputOutput>
        <camunda:inputParameter name="number">42</camunda:inputParameter>
        <camunda:outputParameter name="isOdd">${result}</camunda:outputParameter>
    </camunda:inputOutput>
    </bpmn:extensionElements>
    <bpmn:incoming>Flow_Start</bpmn:incoming>
    <bpmn:outgoing>Flow_OutCome</bpmn:outgoing>
</bpmn:serviceTask>

现在您可能会说“等等!这从哪里来?”。您可以使用模型器 GUI 来编辑值,但您也可以直接编辑 BPMN-XML。在左下角,您将看到两个标签为 DiagramXML 的选项卡。这是您更改视图的地方。

我们看到主题 camunda:topic 已设置为 NumberCheckTask,并且服务任务在 camunda:inputOutput 块中包含两个参数。我们将 42 分配给 number,并将接收到的输出参数 result 分配给环境变量 isOdd。当然,您可以根据需要更改 number 的值。您也不必在 XML 视图中这样做。只需切换回 Diagram,然后在 Number Check 的属性面板中查找 Input/Output 选项卡。

最后,我们来看看网关(带大 X 的菱形框)。网关本身只有一个标签。这没有任何作用,但可以用来澄清网关将评估什么。我们将评估之前分配的 isOdd 变量。为此,我们需要在离开网关的流程上设置条件。如果您点击带有 "true" 标签的底部流程,属性面板将看起来像这样

参数 name 只是为了使你的模型更容易理解。 条件类型表达式 将决定一个流程是否被视为有效。我们选择 表达式,因为我们想检查 isOdd 的值。在 Camunda 中,这就像上图所示一样完成。当 isOdd == "true" 时,将执行服务任务 数字是奇数!,然后流程将结束。网关上方的流程有一条穿过流程的线,这意味着这是没有满足其他条件时的默认流程。因此,我们检查 isOdd 是否等于 "true",如果是,就执行 数字是奇数!,否则执行 数字是偶数!

现在让我们看看 examples/odd_number.py 的样子

import aiohttp
import asyncio

from camunda.external_task.external_task import ExternalTask, Variables
from camunda.external_task.external_task_worker import ExternalTaskWorker
from camunda.external_task.external_task_result import ExternalTaskResult


async def main():
    async with aiohttp.ClientSession() as session:
        # a new parameter! We can pass 'asyncResponseTimeout' to Camunda to configure how long a 'fetch and lock' connection
        # is kept alive. If this value is rather large, clients do not need to reconnect that often but it will take longer for a
        # worker to properly shut down. The default value is 30 seconds (300000)
        worker = ExternalTaskWorker(
            worker_id=4,
            base_url="http://localhost:8080/engine-rest",
            session=session,
            config={"asyncResponseTimeout": 5000},  # wait 5 seconds before timeout
        )
        # Our worker will now subscribe to two topics now
        # We will create a new task with `asyncio.create_task` and await only the second subscribe
        asyncio.create_task(worker.subscribe(topic_names="NumberCheckTask", action=number_check))
        await worker.subscribe(topic_names="EchoTask", action=echo)


async def number_check(task: ExternalTask) -> ExternalTaskResult:
    try:
        number = task.context_variables["number"]
        print(f"We received {number} for checking...")
        # We set a locally scoped variable 'result' to 'true' or 'false'
        task.local_variables.set_variable(
            "result", "true" if int(number) % 2 != 0 else "false", Variables.ValueType.STRING
        )
        # We pass the variables object as LOCAL variables which will only be available in the context of the task
        # that called the external task worker. The result must be assigned in case it should be used somewhere else.
        # Just have a look at the odd_number.bpmn to see how.
        return task.complete()
    # If your input could not be parsed with `int()` the task will fail
    # and another external service could try to do better.
    except Exception as err:
        print(f"Oh no! Something went wrong: {err}")
        return task.failure(
            error_message=err.__class__.__name__,
            error_details=str(err),
            max_retries=3,
            retry_timeout=5000,
        )


async def echo(task: ExternalTask) -> ExternalTaskResult:
    print(f"Camunda wants to say: {task.context_variables['text']}")
    return task.complete()


# run the main task
asyncio.run(main())

我们现在有两个回调,我们的工作者将订阅它们适当的主题,如 BPMN 模型所示。我们还减少了工作者的 asyncResponseTimeout,以防止已经关闭的 Python 客户端意外获取任务。这只是为了简单起见的一个权宜之计。建议在实际关闭整个过程之前等待工作者订阅返回(通过等待 worker.cancel())。

number_checkecho 中,我们看到了如何从 ExternalTask 对象 task 中检索变量。请注意,使用 task.context_variables["key"] 检索变量将在 key 不存在时引发 KeyError。为了处理可选变量,可以使用 task.context_variables.get_variable("key"),如果找不到 key,它将返回 None。在 number_check 中,还展示了如何创建一个 Variables 对象,分配一个值,以及如何将此对象作为 局部 变量对象传递。局部变量只能在服务任务的作用域内使用。这就是为什么我们必须将 result 分配给 Camunda 中的输出参数。我们也可以将 variables 传递给 global_variables。这样,result 就可以在整个流程中使用。然而,在更复杂的场景中,这可能会使环境变得混乱,甚至可能导致变量定义冲突。保持局部变量可以提高通过保持变量作用域狭窄和在 BPMN 模型中 可见 的控制。

当你运行示例时,你应该在启动 Camunda 流程后看到如下输出

python ./examples/odd_number.py
We received 42 for checking...
Camunda wants to say: Number is even!

管理多个工作者/订阅

上面提到了当客户端停止和重新启动时的一些问题。Camunda 连接可能在下一次 Camunda 调度任务时没有正确释放。这可能导致已停止/不活动的实例锁定任务。处理这个问题的一种方法是在捕获 KeyboardInterrupt 或任何其他表示关闭的事件时等待 worker.cancel(),如 examples/manage_tasks.py 所示

import aiohttp
import asyncio

from camunda.external_task.external_task import ExternalTask, Variables
from camunda.external_task.external_task_worker import ExternalTaskWorker
from camunda.external_task.external_task_result import ExternalTaskResult


class Worker:
    def __init__(self):
        self.worker = None
        self.loop = None

    def start(self):
        """Run the worker and block forever"""
        self.loop = asyncio.get_event_loop()
        self.loop.run_until_complete(self._run())

    async def _run(self):
        async with aiohttp.ClientSession() as session:
            self.worker = ExternalTaskWorker(
                worker_id=4, base_url="http://localhost:8080/engine-rest", session=session
            )
            # dispatch the first subscription
            self.loop.create_task(
                self.worker.subscribe(topic_names="NumberCheckTask", action=number_check)
            )
            # and block the current task with the second subscription again
            await self.worker.subscribe(topic_names="EchoTask", action=echo)

    def stop(self):
        self.loop.run_until_complete(self.worker.cancel())


async def number_check(task: ExternalTask) -> ExternalTaskResult:
    try:
        number = task.context_variables["number"]
        print(f"We received {number} for checking...")
        task.local_variables.set_variable(
            "result", "true" if int(number) % 2 != 0 else "false", Variables.ValueType.STRING
        )
        return task.complete()
    except Exception as err:
        print(f"Oh no! Something went wrong: {err}")
        return task.failure()


async def echo(task: ExternalTask) -> ExternalTaskResult:
    print(f"Camunda wants to say: {task.context_variables['text']}")
    return task.complete()


# run the main task
try:
    worker = Worker()
    worker.start()
except KeyboardInterrupt:
    # Stopping workers might take a while.
    # How long it will take depends on the chosen asyncResponseTimeout (default is 30000 ms)
    print(f"Stopping workers...")
    worker.stop()
print(f"All done!")

上面的代码基本上与之前的 odd_number 示例相同,但我们将这些异步部分包装在一个 Worker 类中,并添加了启动和停止工作者及其订阅的方法。根据你愿意等待多长时间来关闭,你可能需要调整 asyncResponseTimeout

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定要选择哪一个,请了解更多关于 安装包 的信息。

源代码分发

avikom_camunda_client-0.10.0.tar.gz (21.8 kB 查看哈希值)

上传时间 源代码

构建分发

avikom_camunda_client-0.10.0-py3-none-any.whl (21.4 kB 查看哈希值)

上传时间 Python 3

支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面