由Redis支持的Python后台任务队列,一个极简的Celery。
项目描述
WakaQ
由Redis支持的Python后台任务队列,一个超级极简的Celery。有关此项目的动机,请参阅这篇博客文章和相关的Hacker News讨论。WakaQ目前正在WakaTime.com的生产环境中使用。WakaQ还可在TypeScript中使用。
特性
- 队列优先级
- 延迟任务(在timedelta eta之后运行任务)
- 计划周期性任务
- 广播任务到所有工作者
- 任务软超时和硬超时限制
- 可选地在软超时后重试任务
- 使用
max_mem_percent
或max_tasks_per_worker
防止内存泄漏 - 超级极简
想要更多特性,如速率限制、任务去重等?很遗憾,不接受功能PR。最大特性属于您的应用程序工作进程任务。
安装
pip install wakaq
使用
import logging
from datetime import timedelta
from wakaq import WakaQ, Queue, CronTask
# use constants to prevent misspelling queue names
Q_HIGH = 'a-high-priority-queue'
Q_MED = 'a-medium-priority-queue'
Q_LOW = 'a-low-priority-queue'
Q_OTHER = 'another-queue'
Q_DEFAULT = 'default-lowest-priority-queue'
wakaq = WakaQ(
# List your queues and their priorities.
# Queues can be defined as Queue instances, tuples, or just a str.
queues=[
(0, Q_HIGH),
(1, Q_MED),
(2, Q_LOW),
Queue(Q_OTHER, priority=3, max_retries=5, soft_timeout=300, hard_timeout=360),
Q_DEFAULT,
],
# Number of worker processes. Must be an int or str which evaluates to an
# int. The variable "cores" is replaced with the number of processors on
# the current machine.
concurrency="cores*4",
# Raise SoftTimeout in a task if it runs longer than 30 seconds. Can also be set per
# task or queue. If no soft timeout set, tasks can run forever.
soft_timeout=30, # seconds
# SIGKILL a task if it runs longer than 1 minute. Can be set per task or queue.
hard_timeout=timedelta(minutes=1),
# If the task soft timeouts, retry up to 3 times. Max retries comes first
# from the task decorator if set, next from the Queue's max_retries,
# lastly from the option below. If No max_retries is found, the task
# is not retried on a soft timeout.
max_retries=3,
# Combat memory leaks by reloading a worker (the one using the most RAM),
# when the total machine RAM usage is at or greater than 98%.
max_mem_percent=98,
# Combat memory leaks by reloading a worker after it's processed 5000 tasks.
max_tasks_per_worker=5000,
# Schedule two tasks, the first runs every minute, the second once every ten minutes.
# Scheduled tasks can be passed as CronTask instances or tuples. To run scheduled
# tasks you must keep a wakaq scheduler running as a daemon.
schedules=[
# Runs mytask on the queue with priority 1.
CronTask('* * * * *', 'mytask', queue=Q_MED, args=[2, 2], kwargs={}),
# Runs mytask once every 5 minutes.
('*/5 * * * *', 'mytask', [1, 1], {}),
# Runs anothertask on the default lowest priority queue.
('*/10 * * * *', 'anothertask'),
],
)
# timeouts can be customized per task with a timedelta or integer seconds
@wakaq.task(queue=Q_MED, max_retries=7, soft_timeout=420, hard_timeout=480)
def mytask(x, y):
print(x + y)
@wakaq.task
def anothertask():
print("hello world")
@wakaq.wrap_tasks_with
def custom_task_decorator(fn):
def inner(*args, **kwargs):
# do something before each task runs
fn(*args, **kwargs)
# do something after each task runs
return inner
if __name__ == '__main__':
# add 1 plus 1 on a worker somewhere
mytask.delay(1, 1)
# add 1 plus 1 on a worker somewhere, overwriting the task's queue from medium to high
mytask.delay(1, 1, queue=Q_HIGH)
# print hello world on a worker somewhere, running on the default lowest priority queue
anothertask.delay()
# print hello world on a worker somewhere, after 10 seconds from now
anothertask.delay(eta=timedelta(minutes=10))
部署
优化
查看WakaQ 初始化参数以获取完整选项列表,例如 Redis 主机和 Redis 套接字超时值。
在生产环境中使用时,请确保为您的 Redis 服务器进程增加允许的最大开放端口。
当使用 eta 任务时,会使用 Redis 排序集合,因此 eta 任务将根据任务名称、参数和关键字参数自动去重。如果您想有多个具有相同参数的挂起 eta 任务,只需将废弃的随机字符串添加到任务的 kwargs 中即可,例如:str(uuid.uuid1())
。
作为守护进程运行
以下是一个示例 systemd 配置,用于将 wakaq-worker
作为守护进程运行
[Unit]
Description=WakaQ Worker Service
[Service]
WorkingDirectory=/opt/yourapp
ExecStart=/opt/yourapp/venv/bin/python /opt/yourapp/venv/bin/wakaq-worker --app=yourapp.wakaq
RemainAfterExit=no
Restart=always
RestartSec=30s
KillSignal=SIGINT
LimitNOFILE=99999
[Install]
WantedBy=multi-user.target
在 /etc/systemd/system/wakaqworker.service
创建一个包含上述内容的文件,然后运行
systemctl daemon-reload && systemctl enable wakaqworker
项目详情
关闭
WakaQ-2.1.24.tar.gz 的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | e5241fe322083c15ae4e3f0e633cf29010489a65112ff6eed62cd8c8be9a445e |
|
MD5 | c134582db7b4e72fc71190a005de06b6 |
|
BLAKE2b-256 | 23944334a7c56de7e2dd72a805fde206a009eea4c8dcd5af85e63a8bca5ee59e |