一个提供RQ(Redis Queue)与Django集成的应用
项目描述
Django-RQ
Django与RQ(一个基于Redis的Python队列库)的集成。 Django-RQ是一个简单的应用,允许你在django的settings.py中配置队列,并在你的项目中轻松使用它们。
支持Django-RQ
如果你觉得django-rq很有用,请考虑通过Tidelift来支持其开发。
要求
安装
安装django-rq(或从PyPI下载)
pip install django-rq
将django_rq添加到settings.py中的INSTALLED_APPS
INSTALLED_APPS = (
# other apps
"django_rq",
)
在django的settings.py中配置你的队列
RQ_QUEUES = {
'default': {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
'USERNAME': 'some-user',
'PASSWORD': 'some-password',
'DEFAULT_TIMEOUT': 360,
'REDIS_CLIENT_KWARGS': { # Eventual additional Redis connection arguments
'ssl_cert_reqs': None,
},
},
'with-sentinel': {
'SENTINELS': [('localhost', 26736), ('localhost', 26737)],
'MASTER_NAME': 'redismaster',
'DB': 0,
# Redis username/password
'USERNAME': 'redis-user',
'PASSWORD': 'secret',
'SOCKET_TIMEOUT': 0.3,
'CONNECTION_KWARGS': { # Eventual additional Redis connection arguments
'ssl': True
},
'SENTINEL_KWARGS': { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username': 'sentinel-user',
'password': 'secret',
},
},
'high': {
'URL': os.getenv('REDISTOGO_URL', 'redis://localhost:6379/0'), # If you're on Heroku
'DEFAULT_TIMEOUT': 500,
},
'low': {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
}
}
RQ_EXCEPTION_HANDLERS = ['path.to.my.handler'] # If you need custom exception handlers
在你的urls.py中包含django_rq.urls
urlpatterns += [
path('django-rq/', include('django_rq.urls'))
]
用法
将任务放入队列
Django-RQ允许你轻松地将任务放入在settings.py中定义的任何队列。它提供了一些实用函数
enqueue - 将任务推送到默认队列
import django_rq
django_rq.enqueue(func, foo, bar=baz)
get_queue - 返回一个Queue实例。
import django_rq
queue = django_rq.get_queue('high')
queue.enqueue(func, foo, bar=baz)
除了name参数外,get_queue还接受default_timeout、is_async、autocommit、connection和queue_class参数。例如
queue = django_rq.get_queue('default', autocommit=True, is_async=True, default_timeout=360)
queue.enqueue(func, foo, bar=baz)
您可以向此函数提供自己的单例Redis连接对象,这样它就不会为每个队列定义创建新的连接对象。这可以帮助您限制与Redis服务器的连接数。例如
import django_rq
import redis
redis_cursor = redis.StrictRedis(host='', port='', db='', password='')
high_queue = django_rq.get_queue('high', connection=redis_cursor)
low_queue = django_rq.get_queue('low', connection=redis_cursor)
get_connection - 接受单个队列名称参数(默认为“default”),并返回队列的Redis服务器连接
import django_rq
redis_conn = django_rq.get_connection('high')
get_worker - 接受可选的队列名称,并返回指定队列的新RQ Worker实例(或默认队列)
import django_rq
worker = django_rq.get_worker() # Returns a worker for "default" queue
worker.work()
worker = django_rq.get_worker('low', 'high') # Returns a worker for "low" and "high"
@job装饰器
要将可调用对象轻松转换为RQ任务,您还可以使用随django_rq提供的@job装饰器
from django_rq import job
@job
def long_running_func():
pass
long_running_func.delay() # Enqueue function in "default" queue
@job('high')
def long_running_func():
pass
long_running_func.delay() # Enqueue function in "high" queue
您可以传递RQ作业装饰器接受的任何参数
@job('default', timeout=3600)
def long_running_func():
pass
long_running_func.delay() # Enqueue function with a timeout of 3600 seconds.
您可以通过DEFAULT_RESULT_TTL设置指定默认值给result_ttl装饰器关键字参数
RQ = {
'DEFAULT_RESULT_TTL': 5000,
}
使用此设置,作业装饰器将result_ttl设置为5000,除非明确指定。
运行工作进程
django_rq提供了一种管理命令,用于为每个作为参数指定的队列启动一个工作进程
python manage.py rqworker high default low
如果您想以爆发模式运行rqworker,可以传递--burst标志
python manage.py rqworker high default low --burst
如果您需要使用自定义工作进程、作业或队列类,最好使用全局设置(参见自定义队列类和自定义作业和工作进程类)。但是,也可以使用以下命令行选项覆盖此类设置。
要使用自定义工作进程类,可以使用--worker-class标志并指定工作进程的路径
python manage.py rqworker high default low --worker-class 'path.to.GeventWorker'
要使用自定义队列类,可以使用--queue-class标志并指定队列类的路径
python manage.py rqworker high default low --queue-class 'path.to.CustomQueue'
要使用自定义作业类,提供--job-class标志。
从版本2.10开始,支持运行RQ的工作进程池
python manage.py rqworker-pool default low medium --num-workers 4
支持计划作业
从RQ 1.2.0版本开始,您可以为作业使用内置调度程序。例如
from django_rq.queues import get_queue
queue = get_queue('default')
job = queue.enqueue_at(datetime(2020, 10, 10), func)
如果您使用内置调度程序,必须以支持调度的形式启动工作进程
python manage.py rqworker --with-scheduler
或者您可以使用RQ Scheduler。安装后,您还可以使用get_scheduler函数返回Scheduler实例,该实例用于settings.py中定义的队列RQ_QUEUES。例如
import django_rq
scheduler = django_rq.get_scheduler('default')
job = scheduler.enqueue_at(datetime(2020, 10, 10), func)
您还可以使用管理命令rqscheduler来启动调度程序
python manage.py rqscheduler
支持django-redis和django-redis-cache
如果您已安装django-redis或django-redis-cache,您可以指示django_rq使用Redis缓存中的相同连接信息。这有两个优点:它是DRY,并且可以利用您的缓存设置中可能进行的任何优化(如使用连接池或Hiredis。)
要使用配置,请使用一个字典,其中键为 USE_REDIS_CACHE,指向您 RQ_QUEUES 字典中所需缓存的名称。不言而喻,所选缓存必须存在并使用 Redis 后端。有关配置说明,请参阅各自的 Redis 缓存包文档。重要的是要指出,由于 django-redis-cache 的 ShardedClient 将缓存分割到多个 Redis 连接中,因此它不起作用。
以下是 django-redis 的示例设置片段
CACHES = {
'redis-cache': {
'BACKEND': 'redis_cache.cache.RedisCache',
'LOCATION': 'localhost:6379:1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'MAX_ENTRIES': 5000,
},
},
}
RQ_QUEUES = {
'high': {
'USE_REDIS_CACHE': 'redis-cache',
},
'low': {
'USE_REDIS_CACHE': 'redis-cache',
},
}
队列统计信息
django_rq 还提供了一个仪表板,可在 /django-rq/(或安装期间在 urls.py 中设置的任何 URL)处监视队列的状态。
您还可以通过在 settings.py 中添加 RQ_SHOW_ADMIN_LINK = True 将仪表板链接添加到 /admin 中。但是,请注意,这将覆盖默认的 admin 模板,因此可能会与其他修改默认 admin 模板的应用程序冲突。
这些统计信息也可以通过 /django-rq/stats.json 以 JSON 格式获得,该 URL 可供工作人员访问。如果您需要通过其他 HTTP 客户端(用于监控目的)访问此视图,可以定义 RQ_API_TOKEN 并通过 /django-rq/stats.json/<API_TOKEN> 访问。
注意:计划中的作业统计信息显示来自 RQ 内置调度器 的作业,而不是可选的 RQ 调度器。
此外,这些统计信息还可以通过命令行访问。
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
配置 Sentry
Sentry 应在 Django 的 settings.py 中配置,具体说明请参阅 Sentry 文档。
在运行 rqworker 命令时,可以通过传递 sentry-dsn 选项来覆盖默认的 Django Sentry 配置
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
这将覆盖任何现有的 Django 配置并重新初始化 Sentry,设置以下 Sentry 选项
{
'debug': options.get('sentry_debug'),
'ca_certs': options.get('sentry_ca_certs'),
'integrations': [RedisIntegration(), RqIntegration(), DjangoIntegration()]
}
配置日志记录
RQ 使用 Python 的 logging,这意味着您可以在 django 的 settings.py 中轻松配置 rqworker 的日志记录机制。例如
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"rq_console": {
"format": "%(asctime)s %(message)s",
"datefmt": "%H:%M:%S",
},
},
"handlers": {
"rq_console": {
"level": "DEBUG",
"class": "rq.logutils.ColorizingStreamHandler",
"formatter": "rq_console",
"exclude": ["%(asctime)s"],
},
},
'loggers': {
"rq.worker": {
"handlers": ["rq_console", "sentry"],
"level": "DEBUG"
},
}
}
自定义队列类
默认情况下,每个队列都将使用 DjangoRQ 类。如果您想使用自定义队列类,可以通过在 RQ_QUEUES 中为每个队列添加 QUEUE_CLASS 选项来做到这一点
RQ_QUEUES = {
'default': {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
'QUEUE_CLASS': 'module.path.CustomClass',
}
}
或者您可以在 RQ 设置中将 DjangoRQ 指定为所有队列的自定义类
RQ = {
'QUEUE_CLASS': 'module.path.CustomClass',
}
自定义队列类应继承自 django_rq.queues.DjangoRQ。
如果您正在使用多个队列类(不推荐),请确保只为具有相同队列类的队列运行工作者。例如,如果您在 RQ_QUEUES 中定义了两个队列,其中一个指定了自定义类,则您必须为每个队列至少运行两个单独的工作者。
自定义作业和工作者类
类似于自定义队列类,可以通过使用 JOB_CLASS 和 WORKER_CLASS 设置来配置全局自定义作业和工作者类
RQ = {
'JOB_CLASS': 'module.path.CustomJobClass',
'WORKER_CLASS': 'module.path.CustomWorkerClass',
}
自定义作业类应继承自 rq.job.Job。如果配置,它将用于所有作业。
自定义工作者类应继承自 rq.worker.Worker。除非由 rqworker 管理命令的 worker-class 选项覆盖,否则它将用于运行所有工作者。
测试提示
为了简化测试过程,您可以通过这种方式同步运行一个工作者
from django.test import TestCase
from django_rq import get_worker
class MyTest(TestCase):
def test_something_that_creates_jobs(self):
... # Stuff that init jobs.
get_worker().work(burst=True) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
同步模式
您可以将选项 ASYNC 设置为 False,以将同步操作设置为特定队列的默认选项。这将导致任务立即执行,并在它们被派遣的同一线程上执行,这对于测试和调试很有用。例如,您可能需要在设置文件中的队列配置之后添加以下内容
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING:
for queueConfig in RQ_QUEUES.values():
queueConfig['ASYNC'] = False
注意,在调用 get_queue 时显式设置 is_async 参数将覆盖此设置。
运行测试
要运行 django_rq 的测试套件
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=.
在 Ubuntu 上部署
创建一个运行高、默认和低队列的 rqworker 服务。
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory=<<path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python \
<<path_to_your_project_folder>>/manage.py \
rqworker high default low
[Install]
WantedBy=multi-user.target
启用并启动服务
sudo systemctl enable rqworker
sudo systemctl start rqworker
在 Heroku 上部署
使用以下内容将 django-rq 添加到您的 requirements.txt 文件中
pip freeze > requirements.txt
更新您的 Procfile
web: gunicorn --pythonpath="$PWD/your_app_name" config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
提交并重新部署。然后添加您的新的工作进程
heroku scale worker=1
变更日志
请参阅 CHANGELOG.md。
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。