基于asyncio和redis的分布式Python工作队列
项目描述
SAQ
SAQ(简单异步队列)是一个基于asyncio和redis构建的简单且性能优异的工作队列框架。它可以用于使用工作者处理后台工作。例如,您可以使用SAQ来安排电子邮件、执行长查询或进行昂贵的数据分析。
它使用 redis-py >= 4.2。
它与 RQ 类似,并受到 ARQ 的极大启发。与RQ不同,它是异步的,因此如果您的作业是异步的,则速度显著更快。即使它们不是,由于开销较低,SAQ仍然相当快。
SAQ可选带有简单的UI来监控工作者和作业。
安装
# minimal install
pip install saq
# web + hiredis
pip install saq[web,hiredis]
用法
usage: saq [-h] [--workers WORKERS] [--verbose] [--web]
[--extra-web-settings EXTRA_WEB_SETTINGS]
[--port PORT] [--check]
settings
Start Simple Async Queue Worker
positional arguments:
settings Namespaced variable containing
worker settings eg: eg
module_a.settings
options:
-h, --help show this help message and exit
--workers WORKERS Number of worker processes
--verbose, -v Logging level: 0: ERROR, 1: INFO,
2: DEBUG
--web Start web app. By default, this
only monitors the current
worker's queue. To monitor
multiple queues, see '--extra-
web-settings'
--extra-web-settings EXTRA_WEB_SETTINGS, -e EXTRA_WEB_SETTINGS
Additional worker settings to
monitor in the web app
--port PORT Web app port, defaults to 8080
--check Perform a health check
environment variables:
AUTH_USER basic auth user, defaults to admin
AUTH_PASSWORD basic auth password, if not specified, no auth will be used
示例
import asyncio
from saq import CronJob, Queue
# all functions take in context dict and kwargs
async def test(ctx, *, a):
await asyncio.sleep(0.5)
# result should be json serializable
# custom serializers and deserializers can be used through Queue(dump=,load=)
return {"x": a}
async def cron(ctx):
print("i am a cron job")
async def startup(ctx):
ctx["db"] = await create_db()
async def shutdown(ctx):
await ctx["db"].disconnect()
async def before_process(ctx):
print(ctx["job"], ctx["db"])
async def after_process(ctx):
pass
queue = Queue.from_url("redis://127.0.0.1")
settings = {
"queue": queue,
"functions": [test],
"concurrency": 10,
"cron_jobs": [CronJob(cron, cron="* * * * * */5")], # run every 5 seconds
"startup": startup,
"shutdown": shutdown,
"before_process": before_process,
"after_process": after_process,
}
要启动工作者,假设前一个文件在python路径中可用
saq module.file.settings
注意:
module.file.settings
也可以是一个返回设置字典的可调用对象。
入队任务
# schedule a job normally
job = await queue.enqueue("test", a=1)
# wait 1 second for the job to complete
await job.refresh(1)
print(job.results)
# run a job and return the result
print(await queue.apply("test", a=2))
# Run multiple jobs concurrently and collect the results into a list
print(await queue.map("test", [{"a": 3}, {"a": 4}]))
# schedule a job in 10 seconds
await queue.enqueue("test", a=1, scheduled=time.time() + 10)
演示
启动工作进程
python -m saq examples.simple.settings --web
导航到 Web UI
入队任务
python examples/simple.py
与ARQ的比较
SAQ深受ARQ的启发,但有几个增强功能。
- 通过利用BLMOVE或RPOPLPUSH和NOTIFY来避免轮询
- SAQ的延迟比ARQ低得多,延迟小于5毫秒。ARQ的默认轮询频率为0.5秒
- SAQ比ARQ快8倍以上
- 用于监控队列和工作进程的Web界面
- 心跳监控以避免丢失任务
- 更健壮的故障处理
- 存储堆栈跟踪
- 清除挂起的任务
- 取消的任务与失败的任务(机器重新部署)的处理不同
- 任务前后的钩子
- 轻松运行多个工作进程以利用更多核心
开发
python -m venv env
source env/bin/activate
pip install -e ".[dev,web]"
docker run -d -p 6379:6379 redis
docker run -d -p 5432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust postgres
make style test
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
saq-0.17.0.tar.gz (53.7 kB 查看哈希)
构建分布
saq-0.17.0-py3-none-any.whl (57.7 kB 查看哈希)