跳转到主要内容

由Redis支持的Python后台任务队列,一个极简的Celery。

项目描述

logo WakaQ

wakatime

由Redis支持的Python后台任务队列,一个超级极简的Celery。有关此项目的动机,请参阅这篇博客文章和相关的Hacker News讨论。WakaQ目前正在WakaTime.com的生产环境中使用。WakaQ还可在TypeScript中使用。

特性

  • 队列优先级
  • 延迟任务(在timedelta eta之后运行任务)
  • 计划周期性任务
  • 广播任务到所有工作者
  • 任务软超时硬超时限制
  • 可选地在软超时后重试任务
  • 使用max_mem_percentmax_tasks_per_worker防止内存泄漏
  • 超级极简

想要更多特性,如速率限制、任务去重等?很遗憾,不接受功能PR。最大特性属于您的应用程序工作进程任务。

安装

pip install wakaq

使用

import logging
from datetime import timedelta
from wakaq import WakaQ, Queue, CronTask


# use constants to prevent misspelling queue names
Q_HIGH = 'a-high-priority-queue'
Q_MED = 'a-medium-priority-queue'
Q_LOW = 'a-low-priority-queue'
Q_OTHER = 'another-queue'
Q_DEFAULT = 'default-lowest-priority-queue'


wakaq = WakaQ(

    # List your queues and their priorities.
    # Queues can be defined as Queue instances, tuples, or just a str.
    queues=[
        (0, Q_HIGH),
        (1, Q_MED),
        (2, Q_LOW),
        Queue(Q_OTHER, priority=3, max_retries=5, soft_timeout=300, hard_timeout=360),
        Q_DEFAULT,
    ],

    # Number of worker processes. Must be an int or str which evaluates to an
    # int. The variable "cores" is replaced with the number of processors on
    # the current machine.
    concurrency="cores*4",

    # Raise SoftTimeout in a task if it runs longer than 30 seconds. Can also be set per
    # task or queue. If no soft timeout set, tasks can run forever.
    soft_timeout=30,  # seconds

    # SIGKILL a task if it runs longer than 1 minute. Can be set per task or queue.
    hard_timeout=timedelta(minutes=1),

    # If the task soft timeouts, retry up to 3 times. Max retries comes first
    # from the task decorator if set, next from the Queue's max_retries,
    # lastly from the option below. If No max_retries is found, the task
    # is not retried on a soft timeout.
    max_retries=3,

    # Combat memory leaks by reloading a worker (the one using the most RAM),
    # when the total machine RAM usage is at or greater than 98%.
    max_mem_percent=98,

    # Combat memory leaks by reloading a worker after it's processed 5000 tasks.
    max_tasks_per_worker=5000,

    # Schedule two tasks, the first runs every minute, the second once every ten minutes.
    # Scheduled tasks can be passed as CronTask instances or tuples. To run scheduled
    # tasks you must keep a wakaq scheduler running as a daemon.
    schedules=[

        # Runs mytask on the queue with priority 1.
        CronTask('* * * * *', 'mytask', queue=Q_MED, args=[2, 2], kwargs={}),

        # Runs mytask once every 5 minutes.
        ('*/5 * * * *', 'mytask', [1, 1], {}),

        # Runs anothertask on the default lowest priority queue.
        ('*/10 * * * *', 'anothertask'),
    ],
)


# timeouts can be customized per task with a timedelta or integer seconds
@wakaq.task(queue=Q_MED, max_retries=7, soft_timeout=420, hard_timeout=480)
def mytask(x, y):
    print(x + y)


@wakaq.task
def anothertask():
    print("hello world")


@wakaq.wrap_tasks_with
def custom_task_decorator(fn):
    def inner(*args, **kwargs):
        # do something before each task runs
        fn(*args, **kwargs)
        # do something after each task runs
    return inner


if __name__ == '__main__':

    # add 1 plus 1 on a worker somewhere
    mytask.delay(1, 1)

    # add 1 plus 1 on a worker somewhere, overwriting the task's queue from medium to high
    mytask.delay(1, 1, queue=Q_HIGH)

    # print hello world on a worker somewhere, running on the default lowest priority queue
    anothertask.delay()

    # print hello world on a worker somewhere, after 10 seconds from now
    anothertask.delay(eta=timedelta(minutes=10))

部署

优化

查看WakaQ 初始化参数以获取完整选项列表,例如 Redis 主机和 Redis 套接字超时值。

在生产环境中使用时,请确保为您的 Redis 服务器进程增加允许的最大开放端口

当使用 eta 任务时,会使用 Redis 排序集合,因此 eta 任务将根据任务名称、参数和关键字参数自动去重。如果您想有多个具有相同参数的挂起 eta 任务,只需将废弃的随机字符串添加到任务的 kwargs 中即可,例如:str(uuid.uuid1())

作为守护进程运行

以下是一个示例 systemd 配置,用于将 wakaq-worker 作为守护进程运行

[Unit]
Description=WakaQ Worker Service

[Service]
WorkingDirectory=/opt/yourapp
ExecStart=/opt/yourapp/venv/bin/python /opt/yourapp/venv/bin/wakaq-worker --app=yourapp.wakaq
RemainAfterExit=no
Restart=always
RestartSec=30s
KillSignal=SIGINT
LimitNOFILE=99999

[Install]
WantedBy=multi-user.target

/etc/systemd/system/wakaqworker.service 创建一个包含上述内容的文件,然后运行

systemctl daemon-reload && systemctl enable wakaqworker

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

WakaQ-2.1.24.tar.gz (19.7 kB 查看散列)

上传时间:

由以下组织支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面