跳转到主要内容

一个提供RQ(Redis Queue)与Django集成的应用

项目描述

Django-RQ

Build Status

Django与RQ(一个基于Redis的Python队列库)的集成。 Django-RQ是一个简单的应用,允许你在django的settings.py中配置队列,并在你的项目中轻松使用它们。

支持Django-RQ

如果你觉得django-rq很有用,请考虑通过Tidelift来支持其开发。

要求

安装

  • 安装django-rq(或从PyPI下载

pip install django-rq
  • django_rq添加到settings.py中的INSTALLED_APPS

INSTALLED_APPS = (
    # other apps
    "django_rq",
)
  • 在django的settings.py中配置你的队列

RQ_QUEUES = {
    'default': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
        'USERNAME': 'some-user',
        'PASSWORD': 'some-password',
        'DEFAULT_TIMEOUT': 360,
        'REDIS_CLIENT_KWARGS': {    # Eventual additional Redis connection arguments
            'ssl_cert_reqs': None,
        },
    },
    'with-sentinel': {
        'SENTINELS': [('localhost', 26736), ('localhost', 26737)],
        'MASTER_NAME': 'redismaster',
        'DB': 0,
        # Redis username/password
        'USERNAME': 'redis-user',
        'PASSWORD': 'secret',
        'SOCKET_TIMEOUT': 0.3,
        'CONNECTION_KWARGS': {  # Eventual additional Redis connection arguments
            'ssl': True
        },
        'SENTINEL_KWARGS': {    # Eventual Sentinel connection arguments
            # If Sentinel also has auth, username/password can be passed here
            'username': 'sentinel-user',
            'password': 'secret',
        },
    },
    'high': {
        'URL': os.getenv('REDISTOGO_URL', 'redis://localhost:6379/0'), # If you're on Heroku
        'DEFAULT_TIMEOUT': 500,
    },
    'low': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
    }
}

RQ_EXCEPTION_HANDLERS = ['path.to.my.handler'] # If you need custom exception handlers
  • 在你的urls.py中包含django_rq.urls

urlpatterns += [
    path('django-rq/', include('django_rq.urls'))
]

用法

将任务放入队列

Django-RQ允许你轻松地将任务放入在settings.py中定义的任何队列。它提供了一些实用函数

  • enqueue - 将任务推送到默认队列

import django_rq
django_rq.enqueue(func, foo, bar=baz)
  • get_queue - 返回一个Queue实例。

import django_rq
queue = django_rq.get_queue('high')
queue.enqueue(func, foo, bar=baz)

除了name参数外,get_queue还接受default_timeoutis_asyncautocommitconnectionqueue_class参数。例如

queue = django_rq.get_queue('default', autocommit=True, is_async=True, default_timeout=360)
queue.enqueue(func, foo, bar=baz)

您可以向此函数提供自己的单例Redis连接对象,这样它就不会为每个队列定义创建新的连接对象。这可以帮助您限制与Redis服务器的连接数。例如

import django_rq
import redis
redis_cursor = redis.StrictRedis(host='', port='', db='', password='')
high_queue = django_rq.get_queue('high', connection=redis_cursor)
low_queue = django_rq.get_queue('low', connection=redis_cursor)
  • get_connection - 接受单个队列名称参数(默认为“default”),并返回队列的Redis服务器连接

import django_rq
redis_conn = django_rq.get_connection('high')
  • get_worker - 接受可选的队列名称,并返回指定队列的新RQ Worker实例(或默认队列)

import django_rq
worker = django_rq.get_worker() # Returns a worker for "default" queue
worker.work()
worker = django_rq.get_worker('low', 'high') # Returns a worker for "low" and "high"

@job装饰器

要将可调用对象轻松转换为RQ任务,您还可以使用随django_rq提供的@job装饰器

from django_rq import job

@job
def long_running_func():
    pass
long_running_func.delay() # Enqueue function in "default" queue

@job('high')
def long_running_func():
    pass
long_running_func.delay() # Enqueue function in "high" queue

您可以传递RQ作业装饰器接受的任何参数

@job('default', timeout=3600)
def long_running_func():
    pass
long_running_func.delay() # Enqueue function with a timeout of 3600 seconds.

您可以通过DEFAULT_RESULT_TTL设置指定默认值给result_ttl装饰器关键字参数

RQ = {
    'DEFAULT_RESULT_TTL': 5000,
}

使用此设置,作业装饰器将result_ttl设置为5000,除非明确指定。

运行工作进程

django_rq提供了一种管理命令,用于为每个作为参数指定的队列启动一个工作进程

python manage.py rqworker high default low

如果您想以爆发模式运行rqworker,可以传递--burst标志

python manage.py rqworker high default low --burst

如果您需要使用自定义工作进程、作业或队列类,最好使用全局设置(参见自定义队列类自定义作业和工作进程类)。但是,也可以使用以下命令行选项覆盖此类设置。

要使用自定义工作进程类,可以使用--worker-class标志并指定工作进程的路径

python manage.py rqworker high default low --worker-class 'path.to.GeventWorker'

要使用自定义队列类,可以使用--queue-class标志并指定队列类的路径

python manage.py rqworker high default low --queue-class 'path.to.CustomQueue'

要使用自定义作业类,提供--job-class标志。

从版本2.10开始,支持运行RQ的工作进程池

python manage.py rqworker-pool default low medium --num-workers 4

支持计划作业

从RQ 1.2.0版本开始,您可以为作业使用内置调度程序。例如

from django_rq.queues import get_queue
queue = get_queue('default')
job = queue.enqueue_at(datetime(2020, 10, 10), func)

如果您使用内置调度程序,必须以支持调度的形式启动工作进程

python manage.py rqworker --with-scheduler

或者您可以使用RQ Scheduler。安装后,您还可以使用get_scheduler函数返回Scheduler实例,该实例用于settings.py中定义的队列RQ_QUEUES。例如

import django_rq
scheduler = django_rq.get_scheduler('default')
job = scheduler.enqueue_at(datetime(2020, 10, 10), func)

您还可以使用管理命令rqscheduler来启动调度程序

python manage.py rqscheduler

支持django-redis和django-redis-cache

如果您已安装django-redisdjango-redis-cache,您可以指示django_rq使用Redis缓存中的相同连接信息。这有两个优点:它是DRY,并且可以利用您的缓存设置中可能进行的任何优化(如使用连接池或Hiredis。)

要使用配置,请使用一个字典,其中键为 USE_REDIS_CACHE,指向您 RQ_QUEUES 字典中所需缓存的名称。不言而喻,所选缓存必须存在并使用 Redis 后端。有关配置说明,请参阅各自的 Redis 缓存包文档。重要的是要指出,由于 django-redis-cache 的 ShardedClient 将缓存分割到多个 Redis 连接中,因此它不起作用。

以下是 django-redis 的示例设置片段

CACHES = {
    'redis-cache': {
        'BACKEND': 'redis_cache.cache.RedisCache',
        'LOCATION': 'localhost:6379:1',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'MAX_ENTRIES': 5000,
        },
    },
}

RQ_QUEUES = {
    'high': {
        'USE_REDIS_CACHE': 'redis-cache',
    },
    'low': {
        'USE_REDIS_CACHE': 'redis-cache',
    },
}

队列统计信息

django_rq 还提供了一个仪表板,可在 /django-rq/(或安装期间在 urls.py 中设置的任何 URL)处监视队列的状态。

您还可以通过在 settings.py 中添加 RQ_SHOW_ADMIN_LINK = True 将仪表板链接添加到 /admin 中。但是,请注意,这将覆盖默认的 admin 模板,因此可能会与其他修改默认 admin 模板的应用程序冲突。

这些统计信息也可以通过 /django-rq/stats.json 以 JSON 格式获得,该 URL 可供工作人员访问。如果您需要通过其他 HTTP 客户端(用于监控目的)访问此视图,可以定义 RQ_API_TOKEN 并通过 /django-rq/stats.json/<API_TOKEN> 访问。

demo-django-rq-json-dashboard.png

注意:计划中的作业统计信息显示来自 RQ 内置调度器 的作业,而不是可选的 RQ 调度器

此外,这些统计信息还可以通过命令行访问。

python manage.py rqstats
python manage.py rqstats --interval=1  # Refreshes every second
python manage.py rqstats --json  # Output as JSON
python manage.py rqstats --yaml  # Output as YAML
demo-django-rq-cli-dashboard.gif

配置 Sentry

Sentry 应在 Django 的 settings.py 中配置,具体说明请参阅 Sentry 文档

在运行 rqworker 命令时,可以通过传递 sentry-dsn 选项来覆盖默认的 Django Sentry 配置

./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222

这将覆盖任何现有的 Django 配置并重新初始化 Sentry,设置以下 Sentry 选项

{
    'debug': options.get('sentry_debug'),
    'ca_certs': options.get('sentry_ca_certs'),
    'integrations': [RedisIntegration(), RqIntegration(), DjangoIntegration()]
}

配置日志记录

RQ 使用 Python 的 logging,这意味着您可以在 django 的 settings.py 中轻松配置 rqworker 的日志记录机制。例如

LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "rq_console": {
            "format": "%(asctime)s %(message)s",
            "datefmt": "%H:%M:%S",
        },
    },
    "handlers": {
        "rq_console": {
            "level": "DEBUG",
            "class": "rq.logutils.ColorizingStreamHandler",
            "formatter": "rq_console",
            "exclude": ["%(asctime)s"],
        },
    },
    'loggers': {
        "rq.worker": {
            "handlers": ["rq_console", "sentry"],
            "level": "DEBUG"
        },
    }
}

自定义队列类

默认情况下,每个队列都将使用 DjangoRQ 类。如果您想使用自定义队列类,可以通过在 RQ_QUEUES 中为每个队列添加 QUEUE_CLASS 选项来做到这一点

RQ_QUEUES = {
    'default': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
        'QUEUE_CLASS': 'module.path.CustomClass',
    }
}

或者您可以在 RQ 设置中将 DjangoRQ 指定为所有队列的自定义类

RQ = {
    'QUEUE_CLASS': 'module.path.CustomClass',
}

自定义队列类应继承自 django_rq.queues.DjangoRQ

如果您正在使用多个队列类(不推荐),请确保只为具有相同队列类的队列运行工作者。例如,如果您在 RQ_QUEUES 中定义了两个队列,其中一个指定了自定义类,则您必须为每个队列至少运行两个单独的工作者。

自定义作业和工作者类

类似于自定义队列类,可以通过使用 JOB_CLASSWORKER_CLASS 设置来配置全局自定义作业和工作者类

RQ = {
    'JOB_CLASS': 'module.path.CustomJobClass',
    'WORKER_CLASS': 'module.path.CustomWorkerClass',
}

自定义作业类应继承自 rq.job.Job。如果配置,它将用于所有作业。

自定义工作者类应继承自 rq.worker.Worker。除非由 rqworker 管理命令的 worker-class 选项覆盖,否则它将用于运行所有工作者。

测试提示

为了简化测试过程,您可以通过这种方式同步运行一个工作者

from django.test import TestCase
from django_rq import get_worker

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        ...                      # Stuff that init jobs.
        get_worker().work(burst=True)  # Processes all jobs then stop.
        ...                      # Asserts that the job stuff is done.

同步模式

您可以将选项 ASYNC 设置为 False,以将同步操作设置为特定队列的默认选项。这将导致任务立即执行,并在它们被派遣的同一线程上执行,这对于测试和调试很有用。例如,您可能需要在设置文件中的队列配置之后添加以下内容

# ... Logic to set DEBUG and TESTING settings to True or False ...

# ... Regular RQ_QUEUES setup code ...

if DEBUG or TESTING:
    for queueConfig in RQ_QUEUES.values():
        queueConfig['ASYNC'] = False

注意,在调用 get_queue 时显式设置 is_async 参数将覆盖此设置。

运行测试

要运行 django_rq 的测试套件

`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=.

在 Ubuntu 上部署

创建一个运行高、默认和低队列的 rqworker 服务。

sudo vi /etc/systemd/system/rqworker.service

[Unit]
Description=Django-RQ Worker
After=network.target

[Service]
WorkingDirectory=<<path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python \
    <<path_to_your_project_folder>>/manage.py \
    rqworker high default low

[Install]
WantedBy=multi-user.target

启用并启动服务

sudo systemctl enable rqworker
sudo systemctl start rqworker

在 Heroku 上部署

使用以下内容将 django-rq 添加到您的 requirements.txt 文件中

pip freeze > requirements.txt

更新您的 Procfile

web: gunicorn --pythonpath="$PWD/your_app_name" config.wsgi:application

worker: python your_app_name/manage.py rqworker high default low

提交并重新部署。然后添加您的新的工作进程

heroku scale worker=1

变更日志

请参阅 CHANGELOG.md

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源代码分发

django-rq-2.10.2.tar.gz (49.7 kB 查看哈希值)

上传时间 源代码

构建分发

django_rq-2.10.2-py2.py3-none-any.whl (56.5 kB 查看哈希值)

上传时间 Python 2 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面