跳转到主要内容

Socket.IO服务器,用于从客户端实时安排Celery任务。

项目描述

PyPI version Docker Image Version (latest semver)

Stirfried 🥡

Stirfried是一个ASGI HTTP/Socket.IO服务器,为基于浏览器的客户端和常规客户端提供对Celery任务的实时控制。

任务通过名称进行安排,这意味着当更改服务器可用的工人和任务时,服务器不一定需要更新。

Stirfried实现了一个简单可扩展的三层架构:客户端、服务器和工作者。任何一层都可以通过添加更多实例来扩展。

Stirfried提供了Socket.IO和HTTP API,具有三个核心功能

  • 安排任务
  • 撤销任务
  • 查询任务信息

在深入阅读README之前想先看看Stirfried的实际应用?请尝试运行示例

基于

工作者

使用pip/pipenv/poetry在您的Celery工作者中安装Stirfried

pip install stirfried

导入StirfriedTask

from stirfried.celery import StirfriedTask

全局配置基类

app = Celery(..., task_cls=StirfriedTask)

...或者按任务配置

@app.task(base=StirfriedTask)
def add(x, y, room=None):
    return x + y

服务器

可以通过运行korijn/stirfried Docker容器并公开端口8000来运行服务器,或者通过克隆此存储库,使用poetry安装依赖项,并按示例代码中所示启动uvicorn服务器来运行服务器。

您可以通过settings.py文件以及标准的Celery配置机制来配置服务器和工作者。在docker部署场景中,您可以将在/app/settings.py路径挂载设置文件。

客户端

客户端可以使用标准的Socket.IO库通过Socket.IO API连接,并使用纯window.fetch通过HTTP API连接。

任务对象模式

通过将以下任务对象提交给任一API来安排任务

{
    "task_name": "",  // (required) task name
    "args": [],       // (optional) task arguments
    "kwargs": {},     // (optional) task keyword arguments
    "room": "",       // (optional) custom room override, only processed if
                      //            `custom_rooms` is enabled
                      //            NOTE: can also be used to disable server events
                      //                  for this task by passing the sentinel room
                      //                  ("NO_EMIT" by default)
    "chain": []       // (optional) array of task objects to chain onto the main task
                      //            task objects use the same schema, except for
                      //            the `chain` property which cannot be nested further
                      //            NOTE: chained tasks are applied in reverse order
}

Socket.IO API

事件以以下格式描述:name(args[, optional]) -> callback_args

客户端可以触发以下服务器正在监听的事件

事件 描述
send_task({task_name[, args][, kwargs][, room][, chain]}) -> {status, data} 安排一个任务。使用回调在客户端接收回复。 status 表示安排是否成功,data 包含任务 id 或失败时的错误消息。客户端可以使用任务 id 作为参考来处理后续服务器触发的事件。请参考 任务对象架构 获取更多信息。
revoke_task(task_id) 撤销任务。如果任务不存在,则不会失败,如果任务正在运行,则不会执行任何操作。
task_info(task_id) -> {id, state, result} 查询任务信息。使用回调在客户端接收回复。只有配置了 Celery 结果后端时才有效。

客户端可以监听以下服务器触发的事件,这些事件直接与 Celery 任务类的回调相关联,但 on_progress 除外,它是 Stirfried 的扩展,可能由任务实现以支持进度事件

事件 描述
on_progress({current, total, info, task_id, task_name}) 在任务进度更新时触发。只有当任务调用 emit_progress 时,才会触发此事件。
on_retry({task_id, task_name[, einfo]}) 在任务重试时自动触发。如果 stirfried_error_info=True,则 einfo 将被包含。
on_failure({task_id, task_name[, einfo]}) 在任务失败时自动触发。如果 stirfried_error_info=True,则 einfo 将被包含。
on_success({retval, task_id, task_name}) 在任务成功时自动触发。
on_return({status, retval, task_id, task_name}) 在任务成功和失败时自动触发。

HTTP API

端点 描述
POST /task 安排一个任务。将任务对象作为请求体的 JSON 提交。请参考 任务对象架构 获取更多信息。
DELETE /task/{id} 撤销任务。如果任务不存在,则不会失败,如果任务正在运行,则不会执行任何操作。
GET /task/{id} 查询任务信息。只有配置了 Celery 结果后端时才有效。

设置

您可以从相同的 settings.py 文件配置 Celery、Socket.IO 和 Stirfried。Stirfried 设置以 stirfried_ 前缀开头,Socket.IO 设置以 socketio_ 前缀开头,Celery 设置按原样使用(不带前缀)。

Socket.IO 服务器设置直接传递(但不带前缀)到 python-socketio 库的 AsyncServer 构造函数,有关可用选项,请参阅他们的 文档。有关选项,请参阅 Celery 文档

以下选项可用于配置 Stirfried 服务器和工作进程

类型 默认值 描述
stirfried_enable_http bool True 设置为 False 以禁用 HTTP API。
stirfried_enable_socketio bool True 设置为 False 以禁用 Socket.IO API。
stirfried_enable_task_info bool True 设置为 False 以禁用 task_info 事件和 GET /task/{id} 端点。
stirfried_enable_revoke_task bool True 设置为 False 以禁用 revoke_task 事件和 DELETE /task/{id} 端点。
stirfried_redis_url str "" Redis pubsub 上 Socket.IO API 服务器到服务器通信的连接字符串。如果您想使工作进程能够触发事件,则必须设置。
stirfried_available_tasks List[str] [] 如果非空,则 send_taskPOST /task 如果任务名称不在列表中,将失败。
stirfried_error_info bool False 设置为 True 以在事件、事件回调和 HTTP 响应中包含错误消息和跟踪信息。
stirfried_sentinel_room str "NO_EMIT" 一个可以传递给room参数以防止工作者为任务发出事件的魔法字符串值。这是发送到HTTP API的任务的默认room值,因为没有Socket.IO客户端。
stirfried_custom_rooms bool False 设置为True以允许客户端覆盖服务器发出事件的默认room
stirfried_header_task_map Dict[str, Dict[str, str] {} 配置以将标题映射到特定任务的键值参数。例如,{"send_email": {"Date": "date"}},当send_email任务被安排时,会导致Date标题的值注入到键值参数date中。这可以与Socket.IO的extraHeaders功能一起使用以实现授权和验证。

房间

对于Socket.IO API安排的任务,默认情况下,服务器发出的事件是发送给安排任务的客户端。对于HTTP API安排的任务,默认情况下不会发出服务器发出的事件。服务器通过将值注入Stirfried Celery任务的room关键字参数中来实现这一点。

StirfriedTask基类依赖于该关键字参数的存在。

这意味着您必须在任务定义中添加关键字参数room=None才能接收它。

如果启用custom_rooms,客户端可以通过发送自定义的room值(尽管不是通过任务的键值参数,请参阅API架构文档)来覆盖值。

进度事件

您可以通过在任务中调用self.emit_progress(current, total, info=None)从工作者中发出进度事件。

您可以使用可选的info关键字参数发送任意元数据,例如进度消息或早期结果。

请注意,您必须将bind=True传递给celery.task装饰器才能访问self实例变量。

@celery.task(bind=True)
def add(self, x, y, room=None):
    s = x
    self.emit_progress(50, 100)  # 50%
    s += y
    return s

二进制/大数据

Socket.IO客户端、服务器和Celery工作者支持msgpack传输,允许您直接使用二进制数据(无需手动转换为和从base64编码的字符串,并承受相应的性能损失)。

您还应该了解Redis客户端输出缓冲区限制。这意味着您不能发出超过一定值的事件(默认为32mb)。您可以通过各种方式覆盖此设置,以下是通过Redis服务器CLI执行此操作的方法

# client-output-buffer-limit <class> <hard limit> <soft limit> <soft seconds>
redis-server --client-output-buffer-limit pubsub 256mb 128mb 30

测试

当对Stirfried Celery工作者进行单元测试时,建议的方法是简单地从设置中省略stirfried_redis_url以禁用Redis连接,并在单元测试中直接调用任务函数。缺少Redis连接将切断通常会发出的事件。这种设置将允许您将任务视为常规函数,并按常规进行单元测试。

可选地,您可以使用标准的Python测试工具来修补/模拟对self.emit_progress的任何调用以测试这些。

示例代码

仓库包含一个示例,演示了Stirfried提供的所有功能。

您可以通过以下方式运行示例

  • 克隆仓库
  • 进入example目录
  • 运行docker-compose build
  • 然后运行docker-compose up
  • 打开您的浏览器并转到http://localhost:8080/
  • 您应该看到以下界面并准备好尝试Stirfried。

Stirfried 🥡 test client

项目详情


下载文件

下载您平台所需的文件。如果您不确定选择哪个,请了解有关安装软件包的更多信息。

源分布

stirfried-0.7.0.tar.gz (9.5 kB 查看哈希值)

上传时间

构建分布

stirfried-0.7.0-py3-none-any.whl (8.4 kB 查看哈希值)

上传时间 Python 3

由以下支持