Socket.IO服务器,用于从客户端实时安排Celery任务。
项目描述
Stirfried 🥡
Stirfried是一个ASGI HTTP/Socket.IO服务器,为基于浏览器的客户端和常规客户端提供对Celery任务的实时控制。
任务通过名称进行安排,这意味着当更改服务器可用的工人和任务时,服务器不一定需要更新。
Stirfried实现了一个简单可扩展的三层架构:客户端、服务器和工作者。任何一层都可以通过添加更多实例来扩展。
Stirfried提供了Socket.IO和HTTP API,具有三个核心功能
- 安排任务
- 撤销任务
- 查询任务信息
在深入阅读README之前想先看看Stirfried的实际应用?请尝试运行示例。
基于
工作者
使用pip/pipenv/poetry在您的Celery工作者中安装Stirfried
pip install stirfried
导入StirfriedTask
from stirfried.celery import StirfriedTask
全局配置基类
app = Celery(..., task_cls=StirfriedTask)
...或者按任务配置
@app.task(base=StirfriedTask)
def add(x, y, room=None):
return x + y
服务器
可以通过运行korijn/stirfried
Docker容器并公开端口8000来运行服务器,或者通过克隆此存储库,使用poetry安装依赖项,并按示例代码中所示启动uvicorn
服务器来运行服务器。
您可以通过settings.py
文件以及标准的Celery配置机制来配置服务器和工作者。在docker部署场景中,您可以将在/app/settings.py
路径挂载设置文件。
客户端
客户端可以使用标准的Socket.IO库通过Socket.IO API连接,并使用纯window.fetch
通过HTTP API连接。
任务对象模式
通过将以下任务对象提交给任一API来安排任务
{
"task_name": "", // (required) task name
"args": [], // (optional) task arguments
"kwargs": {}, // (optional) task keyword arguments
"room": "", // (optional) custom room override, only processed if
// `custom_rooms` is enabled
// NOTE: can also be used to disable server events
// for this task by passing the sentinel room
// ("NO_EMIT" by default)
"chain": [] // (optional) array of task objects to chain onto the main task
// task objects use the same schema, except for
// the `chain` property which cannot be nested further
// NOTE: chained tasks are applied in reverse order
}
Socket.IO API
事件以以下格式描述:name(args[, optional]) -> callback_args
客户端可以触发以下服务器正在监听的事件
事件 | 描述 |
---|---|
send_task({task_name[, args][, kwargs][, room][, chain]}) -> {status, data} |
安排一个任务。使用回调在客户端接收回复。 status 表示安排是否成功,data 包含任务 id 或失败时的错误消息。客户端可以使用任务 id 作为参考来处理后续服务器触发的事件。请参考 任务对象架构 获取更多信息。 |
revoke_task(task_id) |
撤销任务。如果任务不存在,则不会失败,如果任务正在运行,则不会执行任何操作。 |
task_info(task_id) -> {id, state, result} |
查询任务信息。使用回调在客户端接收回复。只有配置了 Celery 结果后端时才有效。 |
客户端可以监听以下服务器触发的事件,这些事件直接与 Celery 任务类的回调相关联,但 on_progress
除外,它是 Stirfried 的扩展,可能由任务实现以支持进度事件
事件 | 描述 |
---|---|
on_progress({current, total, info, task_id, task_name}) |
在任务进度更新时触发。只有当任务调用 emit_progress 时,才会触发此事件。 |
on_retry({task_id, task_name[, einfo]}) |
在任务重试时自动触发。如果 stirfried_error_info=True ,则 einfo 将被包含。 |
on_failure({task_id, task_name[, einfo]}) |
在任务失败时自动触发。如果 stirfried_error_info=True ,则 einfo 将被包含。 |
on_success({retval, task_id, task_name}) |
在任务成功时自动触发。 |
on_return({status, retval, task_id, task_name}) |
在任务成功和失败时自动触发。 |
HTTP API
端点 | 描述 |
---|---|
POST /task |
安排一个任务。将任务对象作为请求体的 JSON 提交。请参考 任务对象架构 获取更多信息。 |
DELETE /task/{id} |
撤销任务。如果任务不存在,则不会失败,如果任务正在运行,则不会执行任何操作。 |
GET /task/{id} |
查询任务信息。只有配置了 Celery 结果后端时才有效。 |
设置
您可以从相同的 settings.py
文件配置 Celery、Socket.IO 和 Stirfried。Stirfried 设置以 stirfried_
前缀开头,Socket.IO 设置以 socketio_
前缀开头,Celery 设置按原样使用(不带前缀)。
Socket.IO 服务器设置直接传递(但不带前缀)到 python-socketio 库的 AsyncServer
构造函数,有关可用选项,请参阅他们的 文档。有关选项,请参阅 Celery 文档。
以下选项可用于配置 Stirfried 服务器和工作进程
键 | 类型 | 默认值 | 描述 |
---|---|---|---|
stirfried_enable_http |
bool |
True |
设置为 False 以禁用 HTTP API。 |
stirfried_enable_socketio |
bool |
True |
设置为 False 以禁用 Socket.IO API。 |
stirfried_enable_task_info |
bool |
True |
设置为 False 以禁用 task_info 事件和 GET /task/{id} 端点。 |
stirfried_enable_revoke_task |
bool |
True |
设置为 False 以禁用 revoke_task 事件和 DELETE /task/{id} 端点。 |
stirfried_redis_url |
str |
"" |
Redis pubsub 上 Socket.IO API 服务器到服务器通信的连接字符串。如果您想使工作进程能够触发事件,则必须设置。 |
stirfried_available_tasks |
List[str] |
[] |
如果非空,则 send_task 和 POST /task 如果任务名称不在列表中,将失败。 |
stirfried_error_info |
bool |
False |
设置为 True 以在事件、事件回调和 HTTP 响应中包含错误消息和跟踪信息。 |
stirfried_sentinel_room |
str |
"NO_EMIT" |
一个可以传递给room 参数以防止工作者为任务发出事件的魔法字符串值。这是发送到HTTP API的任务的默认room 值,因为没有Socket.IO客户端。 |
stirfried_custom_rooms |
bool |
False |
设置为True 以允许客户端覆盖服务器发出事件的默认room 。 |
stirfried_header_task_map |
Dict[str, Dict[str, str] |
{} |
配置以将标题映射到特定任务的键值参数。例如,{"send_email": {"Date": "date"}} ,当send_email 任务被安排时,会导致Date 标题的值注入到键值参数date 中。这可以与Socket.IO的extraHeaders 功能一起使用以实现授权和验证。 |
房间
对于Socket.IO API安排的任务,默认情况下,服务器发出的事件是发送给安排任务的客户端。对于HTTP API安排的任务,默认情况下不会发出服务器发出的事件。服务器通过将值注入Stirfried Celery任务的room
关键字参数中来实现这一点。
StirfriedTask
基类依赖于该关键字参数的存在。
这意味着您必须在任务定义中添加关键字参数room=None
才能接收它。
如果启用custom_rooms
,客户端可以通过发送自定义的room
值(尽管不是通过任务的键值参数,请参阅API架构文档)来覆盖值。
进度事件
您可以通过在任务中调用self.emit_progress(current, total, info=None)
从工作者中发出进度事件。
您可以使用可选的info
关键字参数发送任意元数据,例如进度消息或早期结果。
请注意,您必须将bind=True
传递给celery.task
装饰器才能访问self
实例变量。
@celery.task(bind=True)
def add(self, x, y, room=None):
s = x
self.emit_progress(50, 100) # 50%
s += y
return s
二进制/大数据
Socket.IO客户端、服务器和Celery工作者支持msgpack
传输,允许您直接使用二进制数据(无需手动转换为和从base64编码的字符串,并承受相应的性能损失)。
您还应该了解Redis客户端输出缓冲区限制。这意味着您不能发出超过一定值的事件(默认为32mb)。您可以通过各种方式覆盖此设置,以下是通过Redis服务器CLI执行此操作的方法
# client-output-buffer-limit <class> <hard limit> <soft limit> <soft seconds>
redis-server --client-output-buffer-limit pubsub 256mb 128mb 30
测试
当对Stirfried Celery工作者进行单元测试时,建议的方法是简单地从设置中省略stirfried_redis_url
以禁用Redis连接,并在单元测试中直接调用任务函数。缺少Redis连接将切断通常会发出的事件。这种设置将允许您将任务视为常规函数,并按常规进行单元测试。
可选地,您可以使用标准的Python测试工具来修补/模拟对self.emit_progress
的任何调用以测试这些。
示例代码
仓库包含一个示例,演示了Stirfried提供的所有功能。
您可以通过以下方式运行示例
- 克隆仓库
- 进入
example
目录 - 运行
docker-compose build
- 然后运行
docker-compose up
- 打开您的浏览器并转到
http://localhost:8080/
- 您应该看到以下界面并准备好尝试Stirfried。
项目详情
下载文件
下载您平台所需的文件。如果您不确定选择哪个,请了解有关安装软件包的更多信息。