为RQ(Redis Queue)提供作业调度功能
项目描述
RQ调度器
RQ调度器是一个小型包,为基于RQ(一个Redis的Python队列库)的作业调度功能。
支持RQ调度器
如果您发现rq-scheduler很有用,请考虑通过Tidelift支持其开发。
需求
安装
您可以通过pip安装RQ调度器。
pip install rq-scheduler
或者您可以从PyPI下载最新稳定包。
使用方法
安排一个作业涉及做两件不同的事情
将作业放入调度器
运行一个调度器,它将在时间到来时将安排好的作业移动到队列中
安排作业
您可以通过两种方式安排一个作业。第一种是使用RQ调度器的enqueue_at
from redis import Redis
from rq import Queue
from rq_scheduler import Scheduler
from datetime import datetime
scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue
scheduler = Scheduler('foo', connection=Redis()) # Get a scheduler for the "foo" queue
# You can also instantiate a Scheduler using an RQ Queue
queue = Queue('bar', connection=Redis())
scheduler = Scheduler(queue=queue, connection=queue.connection)
# Puts a job into the scheduler. The API is similar to RQ except that it
# takes a datetime object as first argument. So for example to schedule a
# job to run on Jan 1st 2020 we do:
scheduler.enqueue_at(datetime(2020, 1, 1), func) # Date time should be in UTC
# Here's another example scheduling a job to run at a specific date and time (in UTC),
# complete with args and kwargs.
scheduler.enqueue_at(datetime(2020, 1, 1, 3, 4), func, foo, bar=baz)
# You can choose the queue type where jobs will be enqueued by passing the name of the type to the scheduler
# used to enqueue
scheduler = Scheduler('foo', queue_class="rq.Queue")
scheduler.enqueue_at(datetime(2020, 1, 1), func) # The job will be enqueued at the queue named "foo" using the queue type "rq.Queue"
第二种方法是使用 enqueue_in。该方法不接收一个 datetime 对象,而是需要一个 timedelta,并将作业安排在 X 秒/分钟/小时/天/周后运行。例如,如果我们想在一天中的几个时段监测一条推文的受欢迎程度,我们可以这样做:
from datetime import timedelta
# Schedule a job to run 10 minutes, 1 hour and 1 day later
scheduler.enqueue_in(timedelta(minutes=10), count_retweets, tweet_id)
scheduler.enqueue_in(timedelta(hours=1), count_retweets, tweet_id)
scheduler.enqueue_in(timedelta(days=1), count_retweets, tweet_id)
重要:您在处理 RQ Scheduler 时,应始终使用 UTC 日期时间。
周期性重复作业
从版本 0.3 开始,RQ Scheduler 也支持创建周期性和重复作业。您可以通过 schedule 方法实现这一点。请注意,此功能需要 RQ >= 0.3.1。
以下是实现方法:
scheduler.schedule(
scheduled_time=datetime.utcnow(), # Time for first execution, in UTC timezone
func=func, # Function to be queued
args=[arg1, arg2], # Arguments passed into function when executed
kwargs={'foo': 'bar'}, # Keyword arguments passed into function when executed
interval=60, # Time before the function is called again, in seconds
repeat=10, # Repeat this number of times (None means repeat forever)
meta={'foo': 'bar'} # Arbitrary pickleable data on the job itself
)
重要提示:如果您设置了重复作业,必须确保您要么没有设置 result_ttl 值,要么设置一个大于间隔的值。否则,作业详情条目将过期,作业不会重新安排。
Cron 作业
从版本 0.6.0 开始,RQ Scheduler 也支持创建 Cron 作业,您可以使用它来创建周期性作业,这些作业可以定期在固定时间、日期或间隔运行。有关更多信息,请查看 https://en.wikipedia.org/wiki/Cron。您可以通过 cron 方法实现这一点。
以下是实现方法:
scheduler.cron(
cron_string, # A cron string (e.g. "0 0 * * 0")
func=func, # Function to be queued
args=[arg1, arg2], # Arguments passed into function when executed
kwargs={'foo': 'bar'}, # Keyword arguments passed into function when executed
repeat=10, # Repeat this number of times (None means repeat forever)
result_ttl=300 # Specify how long (in seconds) successful jobs and their results are kept. Defaults to -1 (forever)
ttl=200 # Specifies the maximum queued time (in seconds) before it's discarded. Defaults to None (infinite TTL).
queue_name=queue_name, # In which queue the job should be put in
meta={'foo': 'bar'}, # Arbitrary pickleable data on the job itself
use_local_timezone=False # Interpret hours in the local timezone
)
检索已安排的作业
有时您需要知道哪些作业已经被安排。您可以使用 get_jobs 方法获取已排队作业的列表
list_of_job_instances = scheduler.get_jobs()
在其最简单的形式中(如上例所示),此方法返回当前已安排执行的所有作业实例的列表。
此外,此方法还接受两个可选的关键字参数 until 和 with_times。第一个参数指定应返回哪些时间点的安排作业。它可以是一个 datetime / timedelta 实例,也可以是一个表示自纪元(1970-01-01 00:00:00)以来秒数的整数。第二个参数是一个布尔值,它确定是否应返回与作业实例一起的安排执行时间。
示例
# get all jobs until 2012-11-30 10:00:00
list_of_job_instances = scheduler.get_jobs(until=datetime(2012, 10, 30, 10))
# get all jobs for the next hour
list_of_job_instances = scheduler.get_jobs(until=timedelta(hours=1))
# get all jobs with execution times
jobs_and_times = scheduler.get_jobs(with_times=True)
# returns a list of tuples:
# [(<rq.job.Job object at 0x123456789>, datetime.datetime(2012, 11, 25, 12, 30)), ...]
检查作业是否已安排
您可以使用熟悉的 python in 操作符检查特定作业实例或作业 ID 是否已安排执行。
if job_instance in scheduler:
# Do something
# or
if job_id in scheduler:
# Do something
取消作业
要取消作业,只需将 Job 或作业 ID 传递给 scheduler.cancel。
scheduler.cancel(job)
请注意,无论指定的作业是否找到,此方法都返回 None。
运行调度程序
RQ Scheduler 附带一个名为 rqscheduler 的脚本,该脚本运行一个调度程序进程,该进程每分钟轮询 Redis 一次,并在需要执行时将已安排的作业移动到相关的队列中。
# This runs a scheduler process using the default Redis connection
rqscheduler
如果您想使用不同的 Redis 服务器,也可以这样做:
rqscheduler --host localhost --port 6379 --db 0
此脚本接受以下参数:
-H 或 --host:连接到的 Redis 服务器
-p 或 --port:连接到的端口
-d 或 --db:要使用的 Redis db
-P 或 --password:连接到 Redis 的密码
-b 或 --burst:以突发模式运行(将已过执行时间的计划任务加入队列并退出)
-i INTERVAL 或 --interval INTERVAL:调度器检查并添加到队列的新任务频率(秒,可以是浮点数以提高精度)。
-j 或 --job-class:指定rq使用的自定义任务类(python模块.Class)
-q 或 --queue-class:指定rq使用的自定义队列类(python模块.Class)
参数将从具有相同名称且以RQ_REDIS_为前缀的环境变量中获取默认值。
在Ubuntu上以服务方式运行调度器
sudo /etc/systemd/system/rqscheduler.service
[Unit]
Description=RQScheduler
After=network.target
[Service]
ExecStart=/home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/bin/python \
/home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/lib/<<YourPythonVersion>>/site-packages/rq_scheduler/scripts/rqscheduler.py
[Install]
WantedBy=multi-user.target
如果您配置的不是localhost或未设置在环境变量中,则还需要添加任何命令行参数。
启动、检查状态和启用服务
sudo systemctl start rqscheduler.service
sudo systemctl status rqscheduler.service
sudo systemctl enable rqscheduler.service
运行多个调度器
可以同时运行多个rq-scheduler实例。它允许
可靠性(没有单点故障)
故障转移(调度器实例自动尝试获取锁并安排任务)
在多个服务器实例上运行调度器以使部署一致且更容易
可以根据您的要求以任何方式运行多个调度器。通常,您只想在每个服务器/实例上运行一个调度器。
rqscheduler -i 5
# another shell/systemd service or ideally another server
rqscheduler -i 5
# different parameters can be provided to different schedulers
rqscheduler -i 10
实际示例:
scheduler_a 在 ec2_instance_a 上运行
如果 scheduler_a 崩溃或 ec2_instance_a 崩溃,那么我们的任务将根本不会安排
我们可以简单地运行2个调度器。另一个名为 scheduler_b 的调度器可以在 ec2_instance_b 上运行
现在 scheduler_a 和 scheduler_b 都会定期检查并安排任务
如果一个失败了,另一个仍然可以工作
项目详情
下载文件
下载适用于您平台的应用程序。如果您不确定选择哪个,请了解有关安装包的更多信息。