跳转到主要内容

为RQ(Redis Queue)提供作业调度功能

项目描述

RQ调度器

RQ调度器是一个小型包,为基于RQ(一个Redis的Python队列库)的作业调度功能。

https://travis-ci.org/rq/rq-scheduler.svg?branch=master

支持RQ调度器

如果您发现rq-scheduler很有用,请考虑通过Tidelift支持其开发。

需求

安装

您可以通过pip安装RQ调度器

pip install rq-scheduler

或者您可以从PyPI下载最新稳定包。

使用方法

安排一个作业涉及做两件不同的事情

  1. 将作业放入调度器

  2. 运行一个调度器,它将在时间到来时将安排好的作业移动到队列中

安排作业

您可以通过两种方式安排一个作业。第一种是使用RQ调度器的enqueue_at

from redis import Redis
from rq import Queue
from rq_scheduler import Scheduler
from datetime import datetime

scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue
scheduler = Scheduler('foo', connection=Redis()) # Get a scheduler for the "foo" queue

# You can also instantiate a Scheduler using an RQ Queue
queue = Queue('bar', connection=Redis())
scheduler = Scheduler(queue=queue, connection=queue.connection)

# Puts a job into the scheduler. The API is similar to RQ except that it
# takes a datetime object as first argument. So for example to schedule a
# job to run on Jan 1st 2020 we do:
scheduler.enqueue_at(datetime(2020, 1, 1), func) # Date time should be in UTC

# Here's another example scheduling a job to run at a specific date and time (in UTC),
# complete with args and kwargs.
scheduler.enqueue_at(datetime(2020, 1, 1, 3, 4), func, foo, bar=baz)

# You can choose the queue type where jobs will be enqueued by passing the name of the type to the scheduler
# used to enqueue
scheduler = Scheduler('foo', queue_class="rq.Queue")
scheduler.enqueue_at(datetime(2020, 1, 1), func) # The job will be enqueued at the queue named "foo" using the queue type "rq.Queue"

第二种方法是使用 enqueue_in。该方法不接收一个 datetime 对象,而是需要一个 timedelta,并将作业安排在 X 秒/分钟/小时/天/周后运行。例如,如果我们想在一天中的几个时段监测一条推文的受欢迎程度,我们可以这样做:

from datetime import timedelta

# Schedule a job to run 10 minutes, 1 hour and 1 day later
scheduler.enqueue_in(timedelta(minutes=10), count_retweets, tweet_id)
scheduler.enqueue_in(timedelta(hours=1), count_retweets, tweet_id)
scheduler.enqueue_in(timedelta(days=1), count_retweets, tweet_id)

重要:您在处理 RQ Scheduler 时,应始终使用 UTC 日期时间。

周期性重复作业

从版本 0.3 开始,RQ Scheduler 也支持创建周期性和重复作业。您可以通过 schedule 方法实现这一点。请注意,此功能需要 RQ >= 0.3.1。

以下是实现方法:

scheduler.schedule(
    scheduled_time=datetime.utcnow(), # Time for first execution, in UTC timezone
    func=func,                     # Function to be queued
    args=[arg1, arg2],             # Arguments passed into function when executed
    kwargs={'foo': 'bar'},         # Keyword arguments passed into function when executed
    interval=60,                   # Time before the function is called again, in seconds
    repeat=10,                     # Repeat this number of times (None means repeat forever)
    meta={'foo': 'bar'}            # Arbitrary pickleable data on the job itself
)

重要提示:如果您设置了重复作业,必须确保您要么没有设置 result_ttl 值,要么设置一个大于间隔的值。否则,作业详情条目将过期,作业不会重新安排。

Cron 作业

从版本 0.6.0 开始,RQ Scheduler 也支持创建 Cron 作业,您可以使用它来创建周期性作业,这些作业可以定期在固定时间、日期或间隔运行。有关更多信息,请查看 https://en.wikipedia.org/wiki/Cron。您可以通过 cron 方法实现这一点。

以下是实现方法:

scheduler.cron(
    cron_string,                # A cron string (e.g. "0 0 * * 0")
    func=func,                  # Function to be queued
    args=[arg1, arg2],          # Arguments passed into function when executed
    kwargs={'foo': 'bar'},      # Keyword arguments passed into function when executed
    repeat=10,                  # Repeat this number of times (None means repeat forever)
    result_ttl=300              # Specify how long (in seconds) successful jobs and their results are kept. Defaults to -1 (forever)
    ttl=200                     # Specifies the maximum queued time (in seconds) before it's discarded. Defaults to None (infinite TTL).
    queue_name=queue_name,      # In which queue the job should be put in
    meta={'foo': 'bar'},        # Arbitrary pickleable data on the job itself
    use_local_timezone=False    # Interpret hours in the local timezone
)

检索已安排的作业

有时您需要知道哪些作业已经被安排。您可以使用 get_jobs 方法获取已排队作业的列表

list_of_job_instances = scheduler.get_jobs()

在其最简单的形式中(如上例所示),此方法返回当前已安排执行的所有作业实例的列表。

此外,此方法还接受两个可选的关键字参数 untilwith_times。第一个参数指定应返回哪些时间点的安排作业。它可以是一个 datetime / timedelta 实例,也可以是一个表示自纪元(1970-01-01 00:00:00)以来秒数的整数。第二个参数是一个布尔值,它确定是否应返回与作业实例一起的安排执行时间。

示例

# get all jobs until 2012-11-30 10:00:00
list_of_job_instances = scheduler.get_jobs(until=datetime(2012, 10, 30, 10))

# get all jobs for the next hour
list_of_job_instances = scheduler.get_jobs(until=timedelta(hours=1))

# get all jobs with execution times
jobs_and_times = scheduler.get_jobs(with_times=True)
# returns a list of tuples:
# [(<rq.job.Job object at 0x123456789>, datetime.datetime(2012, 11, 25, 12, 30)), ...]

检查作业是否已安排

您可以使用熟悉的 python in 操作符检查特定作业实例或作业 ID 是否已安排执行。

if job_instance in scheduler:
    # Do something
# or
if job_id in scheduler:
    # Do something

取消作业

要取消作业,只需将 Job 或作业 ID 传递给 scheduler.cancel

scheduler.cancel(job)

请注意,无论指定的作业是否找到,此方法都返回 None

运行调度程序

RQ Scheduler 附带一个名为 rqscheduler 的脚本,该脚本运行一个调度程序进程,该进程每分钟轮询 Redis 一次,并在需要执行时将已安排的作业移动到相关的队列中。

# This runs a scheduler process using the default Redis connection
rqscheduler

如果您想使用不同的 Redis 服务器,也可以这样做:

rqscheduler --host localhost --port 6379 --db 0

此脚本接受以下参数:

  • -H--host:连接到的 Redis 服务器

  • -p--port:连接到的端口

  • -d--db:要使用的 Redis db

  • -P--password:连接到 Redis 的密码

  • -b--burst:以突发模式运行(将已过执行时间的计划任务加入队列并退出)

  • -i INTERVAL--interval INTERVAL:调度器检查并添加到队列的新任务频率(秒,可以是浮点数以提高精度)。

  • -j--job-class:指定rq使用的自定义任务类(python模块.Class)

  • -q--queue-class:指定rq使用的自定义队列类(python模块.Class)

参数将从具有相同名称且以RQ_REDIS_为前缀的环境变量中获取默认值。

在Ubuntu上以服务方式运行调度器

sudo /etc/systemd/system/rqscheduler.service

[Unit]
Description=RQScheduler
After=network.target

[Service]
ExecStart=/home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/bin/python \
    /home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/lib/<<YourPythonVersion>>/site-packages/rq_scheduler/scripts/rqscheduler.py

[Install]
WantedBy=multi-user.target

如果您配置的不是localhost或未设置在环境变量中,则还需要添加任何命令行参数。

启动、检查状态和启用服务

sudo systemctl start rqscheduler.service
sudo systemctl status rqscheduler.service
sudo systemctl enable rqscheduler.service

运行多个调度器

可以同时运行多个rq-scheduler实例。它允许

  • 可靠性(没有单点故障)

  • 故障转移(调度器实例自动尝试获取锁并安排任务)

  • 在多个服务器实例上运行调度器以使部署一致且更容易

可以根据您的要求以任何方式运行多个调度器。通常,您只想在每个服务器/实例上运行一个调度器。

rqscheduler -i 5

# another shell/systemd service or ideally another server
rqscheduler -i 5

# different parameters can be provided to different schedulers
rqscheduler -i 10

实际示例:

  • scheduler_aec2_instance_a 上运行

  • 如果 scheduler_a 崩溃或 ec2_instance_a 崩溃,那么我们的任务将根本不会安排

  • 我们可以简单地运行2个调度器。另一个名为 scheduler_b 的调度器可以在 ec2_instance_b 上运行

  • 现在 scheduler_ascheduler_b 都会定期检查并安排任务

  • 如果一个失败了,另一个仍然可以工作

您可以在 #212#195 中了解更多关于多个调度器的信息

项目详情


下载文件

下载适用于您平台的应用程序。如果您不确定选择哪个,请了解有关安装包的更多信息。

源代码分发

rq-scheduler-0.13.1.tar.gz (16.6 kB 查看哈希)

上传时间 源代码

构建分发

rq_scheduler-0.13.1-py2.py3-none-any.whl (13.9 kB 查看哈希)

上传于 Python 2 Python 3

支持者