基于redis-limpyd的队列/作业系统,redis-limpyd是一个Python中的redis ORM(类似)
项目描述
redis-limpyd-jobs
基于redis-limpyd(redis ORM(类似)在Python中)的队列/作业系统
在哪里找到它
安装
支持Python版本2.7和3.5到3.8(CPython和PyPy)。
支持Redis-server版本>=3。
支持Redis-py版本>=3。
支持Redis-limpyd版本>=2。
如果您需要低于上述要求的旧版本,您仍然可以使用limpyd-extensions版本<2。
pip install redis-limpyd-jobs
请注意,除了redis-limpyd(最小版本1.2)(两者都通过pypi自动安装)外,您实际上还需要redis-limpyd-extensions(最小版本v1.0)
如何工作
redis-limpyd-jobs 提供了三个 limpyd 模型(Queue、Job、Error),以及一个 Worker 类。
这些模型实现了运行异步任务所需的最小功能。
使用 Job 模型来存储待执行的任务。
Queue 模型将存储一个包含任务的列表,并具有优先级系统。
Error 模型将存储所有错误。
Worker 类用于在进程中遍历队列并运行任务。
简单示例
from limpyd_jobs import STATUSES, Queue, Job, Worker
# The function to run when a job is called by the worker
def do_stuff(job, queue):
# here do stuff with your job
pass
# Create a first job, name 'job:1', in a queue named 'myqueue', with a
# priority of 1. The higher the priority, the sooner the job will run
job1 = Job.add_job(identifier='job:1', queue_name='myqueue', priority=1)
# Add another job in the same queue, with a higher priority, and a different
# identifier (if the same was used, no new job would be added, but the
# existing job's priority would have been updated)
job2 = Job.add_job(identifier='job:2', queue_name='myqueue', priority=2)
# Create a worker for the queue used previously, asking to call the
# "do_stuff" function for each job, and to stop after 2 jobs
worker = Worker(queues='myqueue', callback=do_stuff, max_loops=2)
# Now really run the jobs
worker.run()
# Here our jobs are done, our queue is empty
queue1 = Queue.get_queue('myqueue', priority=1)
queue2 = Queue.get_queue('myqueue', priority=2)
# nothing waiting
print queue1.waiting.lmembers(), queue2.waiting.lmembers()
>> [] []
# two jobs in success (show PKs of jobs)
print queue1.success.lmembers(), queue2.success.lmembers()
>> ['limpyd_jobs.models.Job:1', 'limpyd_jobs.models.Job:2']
# Check our jobs statuses
print job1.status.hget() == STATUSES.SUCCESS
>> True
print job2.status.hget() == STATUSES.SUCCESS
>> True
您可以看到它是如何工作的。
使用 Job.add_job 创建一个任务。
使用 Worker() 创建一个工作者,其中 callback 参数用于设置每个任务运行的函数。
调用 worker.run 启动工作者。
请注意,您可以运行任意数量的工作者,甚至可以在同一队列名称下运行。内部,我们使用 blpop redis 命令以原子方式获取任务。
但是,您也可以只运行一个工作者,只有一个队列,在回调函数中根据任务的 idenfitier 属性执行不同的操作。
工作者能够捕获 SIGINT/SIGTERM 信号,在退出前完成当前任务的执行。如果与 supervisord 一起使用,非常有用。
如果您想在任务、队列或错误中存储更多信息,或者想要在工作者中实现不同的行为,这很容易,因为您可以在 limpyd-jobs 中创建所有内容的子类,包括 limpyd 模型或 Worker 类。
模型
Job
任务存储了运行任务所需的所有信息。
注意:如果您想通过子类化 Job 模型来添加自己的字段、run 方法等,请注意,该类必须是 python 模块的第一级(即不在父类或函数中)才能正常工作。
Job 字段
identifier
一个字符串(InstanceHashField,索引),用于标识任务。
当使用(推荐)add_job 类方法时,您不能在等待队列中有许多具有相同标识符的任务。如果您在另一个具有相同标识符的任务仍在同一等待队列中时创建一个新任务,所执行的操作取决于两个任务的优先级:- 如果新任务的优先级较低(或相等),则将其丢弃 - 如果新任务的优先级较高,则现有任务的优先级将更新为较高。
在两种情况下,add_job 类方法都返回现有任务,并丢弃新任务。
使用标识符的一个常见方法是至少存储一个可以标识我们想要应用任务的对象的方式:- 您可以为唯一的任务有一个或多个队列,并且只将对象的 id 存储在 identifier 字段中 - 您可以为每个执行许多任务的一个或多个队列,然后您可能还想将任务也存储在 identifier 字段中:“task:id”
注意,通过子类化 Job 模型,您可以为 Job 添加新字段以存储任务和其他所需参数,作为参数(例如,照片的大小用于调整大小,要发送的消息等)
status
一个字符串(InstanceHashField,已索引),用于存储作业的实际状态。
它是一个单字母,但我们提供了一个类来帮助更详细地使用它:STATUSES
from limpyd_jobs import STATUSES
print STATUSES.SUCCESS
>> "s"
当通过 add_job 类方法创建作业时,其状态设置为 STATUSES.WAITING,或者如果设置了 delayed_until,则为 STATUSES.DELAYED。当工作者选择执行它时,状态变为 STATUSES.RUNNING。完成时,状态为 STATUSES.SUCCESS 或 STATUSES.ERROR。另一个可用的状态是 STATUSES.CANCELED,如果您想在队列中取消作业而不删除它,则很有用。
您也可以显示状态的全字符串
print STATUSES.by_value(my_job.status.hget())
>> "SUCCESS"
优先级
一个字符串(InstanceHashField,已索引,默认 = 0),用于存储作业的优先级。
作业的优先级决定了它将在哪个队列对象中存储。工作者监听具有某些名称和不同优先级的所有队列,但尊重优先级(反向)顺序:优先级越高,作业将越早执行。
我们选择使用“`更高优先级更好”的方法来始终有机会将作业添加到比其他任何作业都高的优先级。
直接更新作业的优先级不会改变其存储的队列。但通过(推荐的)add_job 类方法添加作业时,如果存在具有相同标识符的作业,其优先级将更新(仅当新的优先级更高时)并将作业移动到更高优先级的队列。
添加
一个字符串(InstanceHashField),用于存储作业添加到其队列的时间和日期(datetime.utcnow() 的字符串表示形式)。
当与 end 字段结合使用时很有用,可以计算作业持续时间。
开始
一个字符串(InstanceHashField),用于存储作业从队列中获取的时间(在回调函数被调用之前)的时间和日期(datetime.utcnow() 的字符串表示形式)。
当与 end 字段结合使用时很有用,可以计算作业持续时间。
结束
一个字符串(InstanceHashField),用于存储作业完成或发生错误时的时间和日期(完成后的 datetime.utcnow() 的字符串表示形式)。
当与 start 字段结合使用时很有用,可以计算作业持续时间。
尝试次数
一个整数(以字符串形式存储为 InstanceHashField),用于存储作业执行的次数。如果在发生错误后重新排队,则可能超过一次。
delayed_until
表示 datetime 对象的字符串表示形式,直到作业可以位于队列的 delayed 列表(redis 有序集合)中。
在调用 add_job 时可以设置,通过传递一个 delayed_until 参数,它必须是一个 datetime,或者一个 delayed_for 参数,它必须是一个秒数(整数或浮点数)或一个 timedelta 对象。将 delayed_for 参数添加到当前时间(datetime.utcnow())以计算 delayed_until。
如果一个工作在执行后出错,并且工作器具有正的 requeue_delay_delta 属性,则相应的将设置 delayed_until 字段,这对于在一段时间后重试错误的工作很有用。
排队
当该字段设置为 '1' 时,表示当前由队列管理:等待、延迟、运行。在调用 enqueue_or_delay 时设置此标志,当工作被取消、成功完成或出错且未重新排队时,由工作器移除。当调用 add_job 时,将检查此字段以测试是否已存在相同的作业。
错误取消
如果您不希望在出错时重新排队作业,则必须将该字段设置为 True 值(请注意,Redis 存储字符串,因此 0 将保存为 "0",因此它将是 True……因此不要将其设置为 False 或 0 以获得 False 值:您可以将其留空)。
请注意,如果您想为一个类中的所有作业执行此操作,您可能想将此类中的 always_cancel_on_error 属性设置为 True。
作业属性
queue_model
通过 add_job 方法添加作业时,将使用此属性中定义的模型来获取或创建队列。默认设置为 Queue,但如果要将其更新为您自己的模型,则还必须子类化 Job 模型,并更新此属性。
queue_name
默认为 None,可以在覆盖 Job 类时设置,以避免将 queue_name 参数传递给作业的方法(尤其是 add_job 方法)
请注意,如果您没有子类化 Job 模型,则可以将 queue_model 参数传递给 add_job 方法。
always_cancel_on_error
如果您想在出错时不重新排队此类中的所有作业,请将此属性设置为 True。如果将其保留为其默认值 False,您仍然可以通过将它们的 cancel_on_error 字段设置为 True 值来逐个执行。
作业属性和方法
ident(属性)
ident 属性是模型 + 作业主键的字符串表示,存储在队列中,允许检索作业。
must_be_cancelled_on_error(属性)
must_be_cancelled_on_error 属性返回一个布尔值,指示在执行过程中发生错误时,作业是否不得重新排队。
默认情况下它将是 False,但有两种方法可以更改此行为
将作业类的 always_cancel_on_error 设置为 True。
将作业的 cancel_on_error 字段设置为 True 值
duration(属性)
duration 属性简单地返回计算作业所用的时间。如果设置了 start 和 end 字段,则返回值是 datetime.timedelta 对象,否则返回 None。
run(方法)
这是工作的主要方法,你必须重写的方法,当工作由工作者执行时进行一些繁重的工作。
此方法的返回值将传递给工作者的job_success,然后,如果已定义,将传递给工作的on_success方法。
默认情况下会引发一个NotImplemented错误。
参数
queue:从其中检索工作的队列。
requeue(方法)
requeue方法允许当工作执行失败时将其放回等待(或延迟)队列。
参数
queue_name=None 要保存工作的队列名称。如果没有定义,将使用工作类的名称。如果两者都没有定义,将引发异常。
priority=None 新工作的优先级。如果没有定义,工作将保持其实际优先级。
delayed_until=None 将此设置为datetime对象以设置工作真正放回队列的日期。实际的delayed_until也可以通过传递delayed_for参数来设置。
delayed_for=None 工作真正放回队列前等待的秒数(作为int、float或timedelta对象)。它将计算工作的delayed_until字段。
queue_model=None 用于存储队列的模型。默认情况下,它设置为在Job模型的queue_model属性中定义的Queue。如果没有设置此参数,将使用该属性。请注意,在您的子类中将它作为属性设置,或者在requeue或默认的Queue模型中作为参数设置,否则将使用默认的Queue模型,并且工作将不会保存到预期的队列模型中。
enqueue_or_delay(方法)
这是在add_job和requeue中调用的方法,它将根据delayed_until将工作放入等待或延迟队列,具体取决于参数的定义。如果此参数定义且在未来,则工作将延迟,否则它将简单地排队。
此方法还将工作的queued标志设置为'1'。
参数
queue_name=None 要保存工作的队列名称。如果没有定义,将使用工作类的名称。如果两者都没有定义,将引发异常。
priority=None 新工作的优先级。如果没有定义,使用工作实际的优先级。
delayed_until=None 工作将在延迟队列中保持的日期(必须是datetime对象或其字符串表示形式)。它将在此日期之前不被处理。
prepend=False 设置为True以将工作添加到等待列表的开始处,以便首先执行(只有在不延迟的情况下)
queue_model=None 用于存储队列的模型。请参阅add_job和requeue。
on_started(幽灵方法)
如果您的作业模型(默认情况下不存在,即“幽灵”)定义了此方法,则当工作被工作者检索并且即将执行(“等待”状态)时调用此方法。
参数
queue:从其中检索工作的队列。
on_success(幽灵方法)
如果您的作业模型(默认情况下不存在,即“幽灵”)定义了此方法,则工作者将在工作执行成功时调用此方法(没有引发任何异常)。
参数
queue:从其中检索工作的队列。
结果 工作者执行方法返回的数据,该方法调用并返回作业(或提供给工作者的回调)的run方法的结果。
on_error(幽灵方法)
如果您的作业模型上定义了此方法(默认情况下不存在,即“幽灵”),则当作业执行失败(抛出异常)时,工作者会调用该方法。
参数
queue:从其中检索工作的队列。
exception:执行期间抛出的异常。
traceback:异常发生时的跟踪信息,如果工作者的save_tracebacks属性设置为True。
on_skipped(幽灵方法)
如果您的作业模型上定义了此方法(默认情况下不存在,即“幽灵”),则当工作者刚刚取出的作业无法执行(因为其状态不是“等待”)时,将调用该方法。另一个可能的原因是作业在执行过程中被取消(通过将其状态设置为STATUSES.CANCELED)。
queue:从其中检索工作的队列。
on_requeued(幽灵方法)
如果您的作业模型上定义了此方法(默认情况下不存在,即“幽灵”),则当作业失败并被工作者重新排队时,工作者会调用该方法。
queue:从其中检索工作的队列。
on_delayed(幽灵方法)
如果您的作业模型上定义了此方法(默认情况下不存在,即“幽灵”),则当作业在执行过程中被延迟(通过将其状态设置为STATUSES.DELAYED)时,工作者会调用该方法。请注意,您还可以将作业的delayed_until值设置为正确的日期时间(UTC日期时间的字符串表示),否则工作者会将其延迟60秒。
如果作业的状态在队列的等待列表中仍然为STATUSES.DELAYED,它也可以被调用。
queue:从其中检索工作的队列。
作业类方法
add_job
类方法add_job是创建作业的主要(也是推荐)方式。它会检查队列中是否已存在具有相同标识符的作业(未完成)并更新其优先级(如果找到,则移动到正确的队列)。如果没有找到现有作业,则将创建一个新的作业并将其添加到队列中。
参数
identifier:标识字段值。
queue_name=None:保存作业的队列名称。如果没有定义,将使用类名称。如果两者都未定义,则抛出异常。
priority=0:新作业的优先级,或已存在作业的新优先级,如果此优先级高于现有优先级。
queue_model:用于存储队列的模型。默认情况下,它设置为Queue,在Job模型的queue_model属性中定义。如果未设置参数,则使用该属性。请小心地将它作为属性设置在子类中,或作为add_job或默认Queue模型的参数,否则将使用默认的Queue模型,并且作业将不会保存到预期的队列模型中。
prepend=False:默认情况下,所有新作业都添加到等待列表的末尾(并从开始处取出,这是一个先进先出列表),但您可以将作业强制添加到等待列表的起始位置,以便首先执行,只需将prepend参数设置为True。如果作业已存在,则将其移动到列表的起始位置。
delayed_until=None 将此设置为 datetime 对象,以将作业设置为由将来执行。如果已定义且在未来,则作业将被添加到延迟列表(Redis有序集合)而不是等待列表。也可以通过传递 delayed_for 参数来设置实际的 delayed_until。
delayed_for=None 在将作业添加到等待列表之前等待的秒数(整数、浮点数或 timedelta 对象)。它将计算作业的 delayed_until 字段。
如果您使用 Job 模型的子类,可以通过将它们作为命名参数传递给 add_job 方法来传递额外的参数,如果创建新的作业(但不会在等待队列中找到现有作业),则将保存它们。
get_model_repr
返回模型的字符串表示形式,用于计算作业的 ident 属性。
get_from_ident
返回通过作业的 ident 属性之前获取的字符串的作业。
参数
ident 包含作业的模型表示及其主键的字符串,由 ident 属性返回。
队列
队列存储具有给定优先级的等待作业列表,并保留成功作业和错误作业的列表。
队列字段
name
一个字符串(InstanceHashField,已索引),由 add_job 方法用于查找存储该队列的位置。许多队列可以具有相同的名称,但不同的优先级。
此名称还用于工人查找它需要等待哪些队列。
优先级
一个字符串(InstanceHashField,已索引,默认值为 0),用于存储队列作业的优先级。队列中的所有作业都视为具有此优先级。这就是为什么,如 Job 模型的属性字段所述,更改作业的属性不会更改其实际属性。但是,通过 Job 模型的 add_job 类方法添加具有相同标识符的新作业可以更新作业的优先级,将其移动到另一个具有正确优先级的队列。
如前所述,优先级越高,队列中的作业执行得越早。如果一个队列的优先级为 2,而另一个队列的优先级为 0 或 1,则 所有 具有优先级 2 的作业都将执行(至少是获取),而不考虑工人的数量。
等待
一个列表(ListField),用于存储等待状态作业的主键。它是一个先进先出列表:作业追加到右侧(通过 rpush),并通过 blpop 从左侧获取。
当获取时,此列表中的作业将被执行,然后根据回调是否引发异常推送到 success 或 error 列表。如果等待列表中的作业不在等待状态,则工人将跳过它。
成功
一个列表(ListField),用于存储从等待列表中获取并成功执行的作业的主键。
错误
一个列表(ListField),用于存储从等待列表中获取但执行失败的作业的主键。
延迟
用于存储延迟任务的有序集合(SortedSetField),其中包含具有未来delayed_until日期时间的任务。使用delayed_until字段的时戳表示作为该有序集合的分数,以简化检索已准备就绪的任务。
队列属性
Queue模型没有特定的属性。
队列属性和方法
first_delayed(属性)
返回一个表示延迟队列中第一个准备就绪任务的元组。它是一个包含任务的pk和它的delayed_until值的时戳表示(它是有序集合的分数)。
如果延迟队列为空,则返回None。
first_delayed_time(属性)
返回第一个准备就绪的延迟任务的时戳表示,如果延迟队列为空,则返回None。
delay_job(方法)
将任务放入延迟队列。
参数
job 要延迟的任务。
delayed_until 一个指定任务应何时返回到等待队列的datetime对象。它将被转换为作为延迟列表分数的时戳,该列表是一个redis有序集合。
enqueue_job(方法)
将任务放入等待列表。
参数
job 要入队的任务。
prepend=False 设置为True将在等待列表的开始处添加任务,使其成为第一个执行的任务。
requeue_delayed_jobs(方法)
此方法将检查延迟队列中所有现在可以执行的任务,并将它们放回等待列表。
此方法将返回失败列表,每个失败都是一个包含任务的ident属性的值和导致失败的异常信息的元组。
注意,只有当任务的状态是STATUSES.DELAYED时,才改变任务的状态。这允许在之前取消延迟任务。
队列类方法
get_queue
get_queue类方法是获取Queue对象的推荐方法。给定一个名称和优先级,它将返回找到的队列或创建一个队列(如果没有找到匹配的队列)。
参数
name 要获取或创建的队列的名称。
priority 要获取或创建的队列的优先级。
如果您使用Queue模型的子类,您可以通过将它们作为命名参数传递来简单地向get_queue方法传递额外的参数,如果创建新队列,则将保存它们(但如果找到现有队列则不会)。
get_waiting_keys
get_waiting_keys类方法返回所有具有给定名称的现有(等待)队列,按优先级排序(逆序:优先级最高的排在前面),然后按名称排序。返回值是每个匹配队列的waiting列表的redis键列表。它由工作者作为blpopredis命令的参数内部使用。
参数
names 要考虑的队列名称(如果只有一个名称,则可以是字符串,或字符串列表)
count_waiting_jobs
count_waiting_jobs类方法返回给定队列名称中仍等待的任务数量,结合所有优先级。
参数
names 要考虑的队列名称(如果只有一个名称,则可以是字符串,或字符串列表)
count_delayed_jobs
《count_delayed_jobs》类方法返回给定队列名称中仍处于延迟状态的任务数量,结合所有优先级。
参数
names 要考虑的队列名称(如果只有一个名称,则可以是字符串,或字符串列表)
get_all
《get_all》类方法返回给定名称的所有队列列表。
参数
name
要考虑的队列名称(如果只有一个名称,则可以是字符串,或者字符串的列表)
get_all_by_priority
参数
name
get_all_by_priority
《get_all_by_priority》类方法返回给定名称的所有队列列表,按照优先级排序(优先级最高者优先),然后按名称排序。
Error
《Error》模型用于存储由工作者未能成功执行的任务产生的错误。
其主要目的是能够按队列名称、任务模型、任务标识符、日期、异常类名称或代码筛选错误。您可以使用自己的《Error》模型子类,然后存储额外的字段,并按这些字段进行筛选。
Error字段
job_model_repr
一个字符串(InstanceHashField,索引),用于存储任务的模型字符串表示。
job_pk
一个字符串(InstanceHashField,索引),用于存储生成错误的任务的键。
queue_name
idenfitier
一个字符串(InstanceHashField,索引),用于存储失败的任务的标识符。
一个字符串(InstanceHashField,索引),用于存储任务失败时所在的队列名称。
date_time
DEPRECATED:此字段已被date_time替代,但为了兼容性而保留
一个字符串(InstanceHashField,索引),用于存储错误的日期和时间(到秒的时间)(datetime.utcnow的字符串表示)。此字段是索引的,因此您可以按日期和时间筛选错误(字符串模式,而不是日期和时间的部分,即date_time__gt='2017-01-01'),这对绘图很有用。
date
DEPRECATED:此字段已被date_time替代,但为了兼容性而保留
一个字符串(InstanceHashField),用于存储错误的日期(只有日期,没有时间)(datetime.utcnow().date的字符串表示)。此字段是索引的,因此您可以按日期筛选错误,这对绘图很有用。
time
一个字符串(InstanceHashField),用于存储错误的时间(只有时间,没有日期)(datetime.utcnow().time的字符串表示)。
type
一个字符串(InstanceHashField,索引),用于存储错误的类型。它是原始抛出异常的类的名称。
code
一个字符串(InstanceHashField,索引),用于存储原始抛出异常的code属性的值。如果没有此类属性,则在此不存储任何内容。
message
一个字符串(InstanceHashField),用于存储原始抛出异常的字符串表示。
traceback
一个字符串(InstanceHashField),用于存储原始抛出异常的堆栈跟踪的字符串表示(工作者可能未填充它)。
Error属性和方法
datetime
此属性返回一个基于《Error》对象《date_time》字段内容的《datetime》对象。
《add_error》类方法是向《Error》模型添加条目的主要(且推荐)方式,它接受简单的参数,这些参数将被拆分(job变为identifier和job_pk,when变为date和time,error变为code和message)。
参数
queue_name:作业来源队列的名称。
job:生成错误的作业,我们将从中提取job_pk和identifier。
error:从中提取代码和消息的异常。
when=None:从中提取日期和时间的datetime对象。
如果没有填写,将使用datetime.utcnow()。
trace=None:要存储的字符串化的跟踪信息。
如果您使用《Error》模型的子类,可以通过传递命名参数简单地向《add_error》方法传递额外的参数,它们将被保存在要创建的对象中。
collection_for_job
《collection_for_job》是一个辅助工具,用于检索与给定作业关联的错误,更确切地说,是具有相同标识符的所有此作业实例的错误。
结果是《limpyd》集合,您可以对其进行filter、instances等操作。
参数
job:我们想要错误的作业。
工作者
《Worker》类
《Worker》类处理所有逻辑,与《Queue》和《Job》模型一起工作。
主要行为包括:- 读取给定名称的队列键 - 等待队列中有可用的作业 - 执行作业 - 管理成功或错误 - 在定义的作业数量或最大持续时间(如果定义)后退出,或者在捕获到SIGINT/SIGTERM信号时退出。
该类被拆分为许多短方法,以便您可以对其进行子类化以更改/添加/删除所需的内容。
构造函数参数和工作者的属性
以下工作者的每个属性都可以通过构造函数中的参数设置,使用完全相同的名称。因此,在这里一起描述了这两个。
queues
要工作的队列名称。它可以是一个字符串列表/元组,或者一个由逗号(不带空格)分隔的字符串,或者一个没有逗号的字符串以表示单个队列。
注意,所有队列必须来自相同的《queue_model》。
默认为None,但如果未设置且未在子类中定义,将引发《LimpydJobsException》。
queue_model
用于队列的模型。默认情况下,它是《limpyd_jobs》中包含的《Queue》模型,但您可以使用默认模型的子类来添加字段、方法等。
error_model
用于保存错误的模型。默认情况下,它是《limpyd_jobs》中包含的《Error》模型,但您可以使用默认模型的子类来添加字段、方法等。
logger_name
《limpyd_jobs》使用python的《logging》模块,因此这是为工作者创建的记录器的名称。默认值为《LOGGER_NAME》,其中《LOGGER_NAME》在《limpyd_jobs.workers》中定义,值为“limpyd-jobs”。
logger_level
这是使用在 logger_name 中定义的名称创建的记录器的等值面,默认为 logging.INFO。
save_errors
一个布尔值,默认为 True,表示当作业执行不成功时,是否需要在 Error 模型(或 error_model 中定义的模型)中保存错误。
save_tracebacks
一个布尔值,默认为 True,表示当作业执行不成功时(并且只有当 save_errors 为 True 时),是否需要在 Error 模型(或 error_model 中定义的模型)中保存异常的堆栈跟踪。
max_loops
在工作者生命周期内要执行的最大循环次数(获取 + 执行一个作业),默认为 1000。注意,在此次数后,工作者结束(run 方法不能再次执行)。
目的是为了避免内存泄漏变得过于严重。
max_duration
如果定义了,工作者将在其 run 方法被调用至少这个秒数时结束。默认设置为 None,表示没有最大持续时间。
terminate_gracefully
为了避免中断作业的执行,如果 terminate_gracefully 设置为 True(默认值),则捕获 SIGINT 和 SIGTERM 信号,要求工作者在当前作业完成后退出。
callback
回调是在获取作业时运行的函数。默认情况下是工作者的 execute 方法(该方法调用作业的 run 方法,如果没有覆盖,则抛出 NotImplemented 错误),但你可以传递任何接受作业和队列作为参数的函数。
使用队列的名称和作业的标识符+模型(通过 job.ident),如果需要,可以根据队列管理许多操作。
如果此回调(或 execute 方法)抛出异常,则作业被视为错误。在其他情况下,它被视为成功,并将返回值传递给 job_success 方法,以便您可以对其进行处理。
timeout
超时被用作我们用于从等待列表中获取作业的 blpop redis 命令的参数。默认为 30 秒,但您可以将它更改为任何正数(以秒为单位)。您可以将它设置为 0,如果您不想将超时应用于 blpop 命令。
最好始终设置超时,以便重新进入主循环并调用 must_stop 方法以查看工作者是否必须退出。请注意,在发生超时的情况下,循环次数不会更新,因此一点点的 timeout 不会改变由 max_loops 定义循环次数。
fetch_priorities_delay
fetch_priorities_delay 是当前工作者获取优先级列表之间的延迟。
如果添加了一个作业,其优先级在工作者运行开始时不存在,则它将在此延迟过期之前不被考虑。
请注意,如果此延迟为 5 秒(默认为 25),而 timeout 参数为 30,则您可能需要等待 30 秒才能获取新的优先级,因为如果工作者实际管理的优先级队列中没有作业,则时间由 redis 掌管。
fetch_delayed_delay
“fetch_delayed_delay”表示工作队列中延迟任务两次抓取之间的延迟。
注意,如果这个延迟是5秒(默认值为25秒),而“timeout”参数是30秒,您可能需要等待30秒才能进行新的延迟抓取,因为没有任务在由工作器实际管理的优先级队列中,时间掌握在Redis手中。
requeue_times
这是任务执行失败时将被重新排队(重新入队)的次数。它将被放回同一个队列。
该属性默认为0,因此默认情况下任务不会被重新排队。
requeue_priority_delta
此数字将被添加到将被重新排队的任务的当前优先级中。默认设置为-1,以便每次重新排队时都降低优先级。
requeue_delay_delta
这是在将错误任务再次添加到等待队列之前等待的秒数,默认设置为30秒:当任务执行失败时,它将被放入延迟队列30秒,然后将被放回等待队列(取决于“fetch_delayed_delay”属性)。
其他工作器属性
在子类化的情况下,您可能需要这些属性,这些属性在工作器使用期间创建和定义。
keys
一个等待列表队列键的列表,工作器正在监听这些键以获取新任务。由“update_keys”方法填充。
status
工作器的当前状态。默认为“None”,直到调用“run”方法,之后将其设置为“starting”,在获取可用队列时。然后设置为“waiting”,在工作器等待新任务时。当抓取到任务时,状态设置为“running”。最后,当循环结束时,将其设置为“terminated”。
如果状态不是“None”,则不能调用“run”方法。
logger
由“set_logger”方法定义的logger(来自Python模块logging)。
num_loops
工作器完成的循环次数,每次从等待列表中抓取任务时增加,即使任务被跳过(状态不良等),或在错误中。当此数字等于“max_loops”属性时,工作器结束。
end_forced
当为“True”时,请求工作器在执行当前任务后自行终止。可以手动设置为“True”,或者在捕获到SIGINT/SIGTERM信号时。
end_signal_caught
当捕获到SIGINT/SIGTERM时(只有当“terminate_gracefully”为“True”时),此布尔值设置为“True”。
start_date
默认为“None”,当“run”方法开始时设置为“datetime.utcnow()”。
end_date
默认为“None”,当“run”方法结束时设置为“datetime.utcnow()”。
wanted_end_date
默认为“None”,它根据“start_date”和“max_duration”计算出来,以知道工作器何时必须停止。如果没有定义“max_duration”,则始终为“None”。
connection
这是一个属性,而不是一个属性,用于获取当前到Redis服务器的连接。
parameters
这是一个元组,包含工作器构造函数接受的全部参数。
parameters = ('queues', 'callback', 'queue_model', 'error_model',
'logger_name', 'logger_level', 'save_errors',
'save_tracebacks', 'max_loops', 'max_duration',
'terminate_gracefuly', 'timeout', 'fetch_priorities_delay',
'fetch_delayed_delay', 'requeue_times',
'requeue_priority_delta', 'requeue_delay_delta')
工作器方法
如前所述,“Worker”类分解成许多小方法,以简化子类化。以下是公共方法的列表
__init__
签名
def __init__(self, queues=None, **kwargs):
返回无。
这是Worker
类的构造函数(猜对了 ;)),期望所有参数(定义在parameters
中),这些参数也可以作为类属性定义。
它验证这些参数,准备日志并初始化其他属性。
你可以重写它来添加、验证、初始化其他参数或属性。
handle_end_signal
签名
def handle_end_signal(self):
返回无。
如果terminate_gracefully
为True
,则在该构造函数中调用。它将SIGINT和SIGTERM信号连接到catch_end_signal
方法。
你可以重写它来捕获更多信号或在进行连接到catch_end_signal
方法之前的检查。
stop_handling_end_signal
签名
def stop_handling_end_signal(self):
返回无。
在run
方法结束时调用,因为我们不再需要捕获SIGINT和SIGTERM信号。当在Python shell中启动worker时很有用,以便最终让shell处理这些信号。在脚本中无意义,因为当run
方法退出时,脚本已结束。
set_logger
签名
def set_logger(self):
返回无。
在构造函数中调用以初始化日志记录器,使用logger_name
和logger_level
,并将其保存到self.logger
中。
must_stop
签名
def must_stop(self):
返回布尔值。
在主循环中被调用,在以下条件之一满足时退出循环:捕获了结束信号,达到了max_loops
数量,或end_forced
被设置为True
。
wait_for_job
签名
def wait_for_job(self):
返回一个包含队列和任务的元组
在循环期间调用此方法,等待等待列表中可用的任务。当检索到任务时,返回找到任务的队列(queue_model
定义的模型的一个实例),以及任务本身。
get_job
签名
def get_job(self, job_ident):
返回一个任务。
在wait_for_job
期间调用,根据从等待列表中检索到的任务的ident
(模型+主键)获取基于任务的真正对象。
get_queue
签名
def get_queue(self, queue_redis_key):
返回一个队列。
在wait_for_job
期间调用,根据redis返回的键获取一个真正的队列对象(queue_model
定义的模型的一个实例),该键指示任务在哪个列表中找到。这个键不是队列的主键,而是其等待字段的redis键。
catch_end_signal
签名
def catch_end_signal(self, signum, frame):
返回无。
当捕获到SIGINT/SIGTERM信号时调用。它简单地将end_signal_caught
和end_forced
设置为True
,以告诉worker尽快终止。
execute
签名
def execute(self, job, queue):
默认返回空。
如果没有提供callback
参数来初始化worker并调用任务的run
方法,则调用此方法,默认情况下该方法会引发一个NotImplementedError
。
如果执行成功,不期望任何返回值,但如果提供了,它将传递给job_success
方法。如果发生错误,必须引发一个异常,该异常将传递给job_error
方法。
update_keys
签名
def update_keys(self):
返回无。
调用此方法更新内部keys
属性,它包含worker所监听的队列的等待列表的redis键。
它实际上在run
方法开始时调用,并在根据fetch_priorities_delay
间隔调用。请注意,如果当此方法被调用时不存在具有特定优先级的队列,但稍后通过添加带有add_job
的任务添加了,除非再次调用此update_keys
方法(通过程序或等待至少fetch_priorities_delay
秒),否则worker会忽略它。
run
签名
def run(self):
返回无。
这是工作者的主要方法,包含了所有逻辑:只要不需要停止(must_stop 方法的返回值),就从Redis中获取一个任务,如果这个任务确实处于等待状态,就执行它,并根据执行状态(成功、错误等)执行一些操作。
除了执行实际操作的(update_keys、wait_for_job)方法外,在执行过程中还会调用一些其他方法:关于运行的 run_started、run_ended 方法,以及关于任务的 job_skipped、job_started、job_success 和 job_error 方法。您可以在子类中重写这些方法,以根据您的需求调整行为。
run_started
签名
def run_started(self):
返回无。
在计算完键(使用 update_keys)后,此方法在 run 方法中被调用,在开始循环之前。默认情况下,它什么也不做,只是进行一次日志记录。
run_ended
签名
def run_ended(self):
返回无。
在即将退出 run 方法之前,将调用此方法。默认情况下,它什么也不做,只是进行一次日志记录。
job_skipped
签名
def job_skipped(self, job, queue):
返回无。
当在 run 方法中获取任务时,会检查其状态。如果它不是 STATUSES.WAITING,就会调用此 job_skipped 方法,并带有两个主要参数:任务和找到它的队列。
当任务在执行过程中被取消时(即如果执行完成后,任务的状态是 STATUSES.CANCELED),也会调用此方法。
此方法会移除任务的 queued 标志,记录由 job_skipped_message 方法返回的消息,然后如果定义了,就调用任务的 on_skipped 方法。
job_skipped_message
签名
def job_skipped_message(self, job, queue):
返回一个字符串,用于在 job_skipped 中记录。
job_started
签名
def job_started(self, job, queue):
返回无。
当获取任务并验证其状态(它必须是 STATUSES.WAITING)时,将调用 job_started 方法,在调用回调(如果没有定义回调,则为 execute 方法)之前,带有任务和找到它的队列。
此方法更新任务的 start 和 status 字段,然后记录由 job_started_message 返回的消息,最后如果定义了,就调用任务的 on_started 方法。
job_started_message
签名
def job_started_message(self, job, queue):
返回一个字符串,用于在 job_started 中记录。
job_success
签名
def job_success(self, job, queue, job_result):
返回无。
当回调(或 execute 方法)完成且未引发任何异常时,任务被视为成功,并调用 job_success 方法,带有任务和找到它的队列,以及回调方法的返回值。
请注意,如果执行完成后,任务的状态是 STATUS.CANCELED 或 STATUS.DELAYED,则不会调用此方法,并且任务不会被视作“成功”。在这些情况下,分别调用 job_skipped 和 job_delayed 方法。
此方法会移除任务的 queued 标志,更新其 end 和 status 字段,将任务移动到队列的成功列表中,然后记录由 job_success_message 返回的消息,最后如果定义了,就调用任务的 on_success 方法。
job_success_message
签名
def job_success_message(self, job, queue, job_result):
返回一个字符串以在 job_success 中记录。
job_delayed
签名
def job_delayed(self, job, queue):
返回无。
当回调(或 execute 方法)执行完毕,没有抛出异常,并且此时作业的状态是 STATUSES.DELAYED 时,作业不成功但也不在错误状态:它将被延迟。
如果作业在 waiting 队列中但其状态被设置为 STATUSES.DELAYED,则可以通过调用此方法来以另一种方式调用此方法。在这种情况下,作业不会被执行,而是通过调用此方法被延迟。
此方法检查作业是否有 delayed_until 值,如果没有或值无效,则将其设置为未来的60秒。您可能需要显式设置此值,或者至少清除该字段,因为如果作业最初被延迟,该值可能已被设置,但日期在之前,作业将被延迟到这个日期,所以,不是延迟,而是排队。
有了这个值,将调用队列的 enqueue_or_delay 方法来真正延迟作业。
然后,记录 job_delayed_message 返回的消息,最后调用(如果已定义),作业的 on_delayed 方法。
job_delayed_message
签名
def job_delayed_message(self, job, queue):
返回一个字符串以在 job_delayed 中记录。
job_error
签名
def job_error(self, job, queue, exception, trace=None):
返回无。
当回调(或 execute 方法)通过抛出异常终止时,将调用 job_error 方法,该方法带有作业及其所在的队列,以及抛出的异常和(如果 save_tracebacks 为 True),则带有回溯。
此方法将作业的 queued 标志移除(如果它不需要重新排队),更新其 end 和 status 字段,将作业移动到队列的 error 列表,如果 save_errors 为 True,则添加一个新错误对象,然后记录 job_error_message 返回的消息,并调用(如果定义),作业的 on_error 方法。
最后,如果作业的 must_be_cancelled_on_error 属性为 False,并且 requeue_times 工作器属性允许它(考虑作业的 tries 属性),则调用 requeue_job 方法。
job_error_message
签名
def job_error_message(self, job, queue, to_be_requeued_exception, trace=None):
返回一个字符串以在 job_error 中记录。
job_requeue_message
签名
def job_requeue_message(self, job, queue):
返回一个字符串以在作业被重新排队时在 job_error 中记录。
additional_error_fields
签名
def additional_error_fields(self, job, queue, exception, trace=None):
返回一个默认为空的字段字典,要添加到错误对象中。
此方法由 job_error 调用,让您定义一个字段/值字典,以添加到创建的错误对象中,如果您使用在 error_model 中定义的 Error 模型的子类。
要传递这些附加字段到错误对象,您必须在您的子类中重写此方法。
requeue_job
def requeue_job(self, job, queue, priority, delayed_for=None):
返回无。
当作业执行失败时,将调用此方法来重新排队作业,然后调用作业的 requeue 方法,然后其 requeued 方法,最后记录 job_requeue_message 返回的消息。
id
它是一个返回当前工作进程标识符的属性,用于在日志中区分每个工作进程的日志条目。
elapsed
这是一个返回属性,表示运行自run方法启动以来的时间。当run方法结束时,它是start_date和end_date之间的时间。
如果未调用run方法,它将被设置为None。
log
签名
def log(self, message, level='info'):
返回无。
log是围绕self.logger的一个简单包装器,它会自动在开头添加工作进程的id。它可以接受一个默认为info的level参数。
set_status
签名
def set_status(self, status):
返回无。
set_status简单地更新工作进程的status字段。
count_waiting_jobs
签名
def count_waiting_jobs(self):
返回此工作进程可以运行的等待状态作业的数量。
count_delayed_jobs
签名
def count_delayed_jobs(self):
返回此工作进程管理的延迟队列中的作业数量。
worker.py 脚本
为了帮助使用 limpyd_jobs,提供了一个可执行的Python脚本:scripts/worker.py(当从包中安装时,可用作limpyd-jobs-worker,添加到您的路径中)
此脚本具有高度的可配置性,可以帮助您启动工作进程,而无需编写脚本或自定义包含的脚本。
使用此脚本,您也不需要编写自定义工作进程,因为所有由工作进程处理的参数都可以作为脚本的参数传递。
此脚本基于在limpyd_jobs.workers中定义的WorkerConfig类,您可以通过继承它来自定义它,并且您可以告诉脚本使用您的类而不是默认类。
您甚至可以传递一个或多个Python路径以添加到sys.path。
此脚本旨在尽可能简化您的使用。
而不是解释所有参数,请参见此脚本--help命令的结果
$ limpyd-jobs-worker --help Usage: worker.py [options] Run a worker using redis-limpyd-jobs Options: --pythonpath=PYTHONPATH A directory to add to the Python path, e.g. --pythonpath=/my/module --worker-config=WORKER_CONFIG The worker config class to use, e.g. --worker- config=my.module.MyWorkerConfig, default to limpyd_jobs.workers.WorkerConfig --print-options Print options used by the worker, e.g. --print-options --dry-run Won't execute any job, just starts the worker and finish it immediatly, e.g. --dry-run --queues=QUEUES Name of the Queues to handle, comma separated e.g. --queues=queue1,queue2 --queue-model=QUEUE_MODEL Name of the Queue model to use, e.g. --queue- model=my.module.QueueModel --error-model=ERROR_MODEL Name of the Error model to use, e.g. --queue- model=my.module.ErrorModel --worker-class=WORKER_CLASS Name of the Worker class to use, e.g. --worker- class=my.module.WorkerClass --callback=CALLBACK The callback to call for each job, e.g. --worker- class=my.module.callback --logger-name=LOGGER_NAME The base name to use for logging, e.g. --logger-base- name="limpyd-jobs.%s" --logger-level=LOGGER_LEVEL The level to use for logging, e.g. --worker-class=ERROR --save-errors Save job errors in the Error model, e.g. --save-errors --no-save-errors Do not save job errors in the Error model, e.g. --no- save-errors --save-tracebacks Save exception tracebacks on job error in the Error model, e.g. --save-tracebacks --no-save-tracebacks Do not save exception tracebacks on job error in the Error model, e.g. --no-save-tracebacks --max-loops=MAX_LOOPS Max number of jobs to run, e.g. --max-loops=100 --max-duration=MAX_DURATION Max duration of the worker, in seconds (None by default), e.g. --max-duration=3600 --terminate-gracefuly Intercept SIGTERM and SIGINT signals to stop gracefuly, e.g. --terminate-gracefuly --no-terminate-gracefuly Do NOT intercept SIGTERM and SIGINT signals, so don't stop gracefuly, e.g. --no-terminate-gracefuly --timeout=TIMEOUT Max delay (seconds) to wait for a redis BLPOP call (0 for no timeout), e.g. --timeout=30 --fetch-priorities-delay=FETCH_PRIORITIES_DELAY Min delay (seconds) to wait before fetching new priority queues, e.g. --fetch-priorities-delay=20 --fetch-delayed-delay=FETCH_DELAYED_DELAY Min delay (seconds) to wait before updating delayed jobs, e.g. --fetch-delayed-delay=20 --requeue-times=REQUEUE_TIMES Number of time to requeue a failing job (default to 0), e.g. --requeue-times=5 --requeue-priority-delta=REQUEUE_PRIORITY_DELTA Delta to add to the actual priority of a failing job to be requeued (default to -1, ie one level lower), e.g. --requeue-priority-delta=-2 --requeue-delay-delta=REQUEUE_DELAY_DELTA How much time (seconds) to delay a job to be requeued (default to 30), e.g. --requeue-delay-delta=15 --database=DATABASE Redis database to use (host:port:db), e.g. --database=localhost:6379:15 --no-title Do not update the title of the worker's process, e.g. --no-title --version show program's version number and exit -h, --help show this help message and exit
除了--pythonpath、--worker-config、--print-options、--dry-run、--worker-class和--no-title之外,所有选项都将传递给工作进程。
因此,如果您使用默认模型,使用默认选项的默认工作进程,并启动一个工作进程来处理队列“queue-name”,您需要做的只是
limpyd-jobs-worker --queues=queue-name --callback=python.path.to.callback
我们使用 setproctitle 模块在进程名称中显示有用的信息,以便有如下内容
limpyd-jobs-worker#1566090 [init] queues=foo,bar limpyd-jobs-worker#1566090 [starting] queues=foo,bar loop=0/1000 waiting=10 delayed=0 limpyd-jobs-worker#1566090 [running] queues=foo,bar loop=1/1000 waiting=9 delayed=2 duration=0:00:15 limpyd-jobs-worker#1566090 [terminated] queues=foo,bar loop=10/1000 waiting=0 delayed=0 duration=0:12:27
您可以通过传递--no-title参数来禁用它。
请注意,如果没有为logger-name设置日志处理器,脚本将自动添加一个StreamHandler格式化程序,以提供如下日志
[19122] 2013-10-02 00:51:24,158 (limpyd-jobs) WARNING [038480] [test|job:1] job skipped (current status: SUCCESS)
(使用的格式为"[%(process)d] %(asctime)s (%(name)s) %(levelname)-8s %(message)s")
在加载工作进程类之前执行代码
有时您可能希望在加载工作进程类之前先做一些初始化工作,例如,使用django,来添加django.setup()
为此,简单地覆盖WorkerConfig类
import django
from limpyd_jobs.workers import WorkerConfig
class MyWorkerConfig(WorkerConfig):
def __init__(self, argv=None):
django.setup()
super(MyWorkerConfig, self).__init__(argv)
并使用--worker-config选项将Python路径传递给此类,使用limpyd-jobs-worker脚本。
测试
redis-limpyd-jobs包已完全测试(覆盖率:100%)。
要运行测试,这些测试不是通过 setup.py 文件安装的,你可以这样做
$ python run_tests.py [...] Ran 136 tests in 19.353s OK
或者如果你已经安装了 nosetests
$ nosetests [...] Ran 136 tests in 20.471s OK
nosetests 的配置在 setup.cfg 文件中,如果安装了 nose-cov,则包括覆盖率。
结语
你可以在 example.py 中看到一个完整的示例(在源代码中,而不是在已安装的包中)
要在你的 Redis 数据库上使用 limpyd_jobs 模型,而不是默认的数据库(localhost:6379:db=0),只需使用主模型的 use_database 方法即可
from limpyd.contrib.database import PipelineDatabase from limpyd_jobs.models import BaseJobsModel database = PipelineDatabase(host='localhost', port=6379, db=15) BaseJobsModel.use_database(database)
或者简单地更改连接设置
from limpyd_jobs.models import BaseJobsModel BaseJobsModel.database.connect(host='localhost', port=6379, db=15)