跳转到主要内容

基于asyncio和redis的分布式Python工作队列

项目描述

SAQ

SAQ(简单异步队列)是一个基于asyncio和redis构建的简单且性能优异的工作队列框架。它可以用于使用工作者处理后台工作。例如,您可以使用SAQ来安排电子邮件、执行长查询或进行昂贵的数据分析。

文档

它使用 redis-py >= 4.2。

它与 RQ 类似,并受到 ARQ 的极大启发。与RQ不同,它是异步的,因此如果您的作业是异步的,则速度显著更快。即使它们不是,由于开销较低,SAQ仍然相当快。

SAQ可选带有简单的UI来监控工作者和作业。

SAQ Web UI

安装

# minimal install
pip install saq

# web + hiredis
pip install saq[web,hiredis]

用法

usage: saq [-h] [--workers WORKERS] [--verbose] [--web]
           [--extra-web-settings EXTRA_WEB_SETTINGS]
           [--port PORT] [--check]
           settings

Start Simple Async Queue Worker

positional arguments:
  settings              Namespaced variable containing
                        worker settings eg: eg
                        module_a.settings

options:
  -h, --help            show this help message and exit
  --workers WORKERS     Number of worker processes
  --verbose, -v         Logging level: 0: ERROR, 1: INFO,
                        2: DEBUG
  --web                 Start web app. By default, this
                        only monitors the current
                        worker's queue. To monitor
                        multiple queues, see '--extra-
                        web-settings'
  --extra-web-settings EXTRA_WEB_SETTINGS, -e EXTRA_WEB_SETTINGS
                        Additional worker settings to
                        monitor in the web app
  --port PORT           Web app port, defaults to 8080
  --check               Perform a health check

environment variables:
  AUTH_USER     basic auth user, defaults to admin
  AUTH_PASSWORD basic auth password, if not specified, no auth will be used

示例

import asyncio

from saq import CronJob, Queue

# all functions take in context dict and kwargs
async def test(ctx, *, a):
    await asyncio.sleep(0.5)
    # result should be json serializable
    # custom serializers and deserializers can be used through Queue(dump=,load=)
    return {"x": a}

async def cron(ctx):
  print("i am a cron job")

async def startup(ctx):
    ctx["db"] = await create_db()

async def shutdown(ctx):
    await ctx["db"].disconnect()

async def before_process(ctx):
    print(ctx["job"], ctx["db"])

async def after_process(ctx):
    pass

queue = Queue.from_url("redis://127.0.0.1")

settings = {
    "queue": queue,
    "functions": [test],
    "concurrency": 10,
    "cron_jobs": [CronJob(cron, cron="* * * * * */5")], # run every 5 seconds
    "startup": startup,
    "shutdown": shutdown,
    "before_process": before_process,
    "after_process": after_process,
}

要启动工作者,假设前一个文件在python路径中可用

saq module.file.settings

注意: module.file.settings 也可以是一个返回设置字典的可调用对象。

入队任务

# schedule a job normally
job = await queue.enqueue("test", a=1)

# wait 1 second for the job to complete
await job.refresh(1)
print(job.results)

# run a job and return the result
print(await queue.apply("test", a=2))

# Run multiple jobs concurrently and collect the results into a list
print(await queue.map("test", [{"a": 3}, {"a": 4}]))

# schedule a job in 10 seconds
await queue.enqueue("test", a=1, scheduled=time.time() + 10)

演示

启动工作进程

python -m saq examples.simple.settings --web

导航到 Web UI

入队任务

python examples/simple.py

与ARQ的比较

SAQ深受ARQ的启发,但有几个增强功能。

  1. 通过利用BLMOVERPOPLPUSH和NOTIFY来避免轮询
    1. SAQ的延迟比ARQ低得多,延迟小于5毫秒。ARQ的默认轮询频率为0.5秒
    2. SAQ比ARQ快8倍以上
  2. 用于监控队列和工作进程的Web界面
  3. 心跳监控以避免丢失任务
  4. 更健壮的故障处理
    1. 存储堆栈跟踪
    2. 清除挂起的任务
    3. 取消的任务与失败的任务(机器重新部署)的处理不同
  5. 任务前后的钩子
  6. 轻松运行多个工作进程以利用更多核心

开发

python -m venv env
source env/bin/activate
pip install -e ".[dev,web]"
docker run -d -p 6379:6379 redis
docker run -d -p 5432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust postgres
make style test

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

saq-0.17.0.tar.gz (53.7 kB 查看哈希)

上传时间

构建分布

saq-0.17.0-py3-none-any.whl (57.7 kB 查看哈希)

上传时间 Python 3

由以下机构支持