跳转到主要内容

Zope3的远程处理队列

项目描述

此包为Zope3提供了一种使用mongodb而不是ZODB的远程处理队列。

README

此包提供了一种远程处理器。该远程处理器作为一个简单的对象实现,使用mongodb作为存储。处理器可以在另一个线程中执行预定义的任务。还可以使用不同的调度项在特定时间运行任务。

RemoteProcessor使用了两个不同的处理器。一个处理任务,另一个从调度器中挑选项目并添加任务。这种分离在实现分布式概念时很有用。这意味着一个或多个应用程序可以根据给定的调度项安排任务项。另一个应用程序处理任务,并不知道如何安排下一个任务项。

由于我们使用此远程调度器进行低CPU密集型任务,我们提供了多处理。这是通过在主工作线程中运行多个工作进程来完成的。如果您使用子进程进行任务处理,您将得到一个真正的多进程处理器,它不受当前Python进程的限制。

您可以在远程处理器中配置任务工作进程可以启动的线程数量。请参阅jobWorkerArguments/maxThreads。默认情况下,此数字使用您机器上安装的CPU数量。

实现使用mongodb作为其组件的存储。这意味着任务、任务工厂和调度项都使用m01.mongo提供的ORM概念存储在mongodb中。

请参阅p01.remote查看基于ZODB的远程处理器实现,但请注意,p01.remote实现尚未提供工作进程和调度器处理器的分离。

设置

>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing

现在让我们创建两个远程处理器。我们可以使用我们的远程队列站点实现

>>> from zope.security.proxy import removeSecurityProxy
>>> from m01.remote import interfaces

我们的测试远程处理器应作为应用程序根可用

>>> rp = root
>>> rp
<TestProcessor None>

让我们发现可用的任务

>>> dict(root._jobs)
{}

任务容器最初是空的,因为我们还没有添加任何任务工厂。现在让我们定义一个简单的任务工厂,它只是回显输入字符串

>>> echoJob = testing.EchoJob({})

现在我们可以设置任务输入

>>> echoJob.input = {'foo': u'blah'}

任务唯一的API要求是可以调用。现在我们确保任务能够工作。注意我们用远程处理器实例调用我们的任务,这是我们的初始化应用程序根

>>> echoJob(root)
{'foo': u'blah'}

让我们将任务添加到可用任务列表中

>>> rp.addJobFactory(u'echo', echoJob)

echo任务现在在远程处理器中可用

>>> dict(rp._jobFactories)
{u'echo': <EchoJob u'echo'>}

由于远程处理器不能立即完成任务,因此入站任务由队列管理。首先,我们请求执行echo任务

>>> jobid1 = rp.addJob(u'echo', {'foo': 'bar'})
>>> jobid1
u'...'
>>> sorted([job.status for job in rp._jobs.values()])
[u'queued']

addJob()函数将名为“echo”的任务安排为使用指定参数执行。该方法返回一个任务ID,我们可以用它来查询任务。addJob()函数将任务标记为已排队。

>>> rp.getJobStatus(jobid1)
u'queued'

由于任务尚未被处理,状态设置为“已排队”。此外,尚无结果可用

>>> rp.getJobResult(jobid1) is None
True

只要任务没有被处理,就可以取消它

>>> rp.cancelJob(jobid1)
>>> rp.getJobStatus(jobid1)
u'cancelled'
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled']

默认情况下不会启动工作进程处理器

>>> rp.isProcessing
False

为了获得干净的日志环境,让我们清除日志堆栈

>>> logger.clear()

现在我们可以通过调用startProcessor来启动远程处理器

>>> rp.startProcessor()

看吧 - 远程处理器正在处理

>>> rp.isProcessing
True

检查日志将证明已启动远程处理器

>>> print logger
m01.remote INFO
  Processor 'root-worker' started

让我们再次停止处理器

>>> rp.stopProcessor()
>>> rp.isProcessing
False

现在让我们从一个处理过的任务中获取结果,但首先提交新添加的任务

>>> jobid2 = rp.addJob(u'echo', {'foo': u'bar'})
>>> transaction.commit()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'queued']

现在创建一个工作进程并调用我们的简单工作进程来处理新任务

>>> class FakeWorker(object):
...
...     def __init__(self, rp):
...         self.rp = rp
...
...     def __call__(self):
...         try:
...             result = self.rp.processNextJob()
...             transaction.commit()
...         except Exception, error:
...             transaction.commit()
>>> worker = FakeWorker(rp)
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed']

首先检查任务是否已处理

>>> rp.getJobStatus(jobid2)
u'completed'
>>> rp.getJobResult(jobid2)
{u'foo': u'bar'}

错误处理

现在,让我们定义一个会引发错误的新的任务

>>> errorJob = testing.RemoteExceptionJob()
>>> rp.addJobFactory(u'error', errorJob)

现在添加并执行它

>>> jobid3 = rp.addJob(u'error')
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error']

现在让我们看看发生了什么

>>> rp.getJobStatus(jobid3)
u'error'
>>> errors = rp.getJobErrors(jobid3)
>>> errors
[<JobError u'...'>]

这样的JobError项提供了以下数据

>>> error = tuple(errors)[0]
>>> data = error.dump()
>>> data = m01.mongo.dictify(data)
>>> pprint(data)
{'_id': ObjectId('...'),
 '_type': u'JobError',
 'created': datetime.datetime(..., ..., ..., ..., ..., ..., ..., tzinfo=<bson.tz_util.FixedOffset object at ...>),
 'tb': u"<p>Traceback (most recent call last):..."}

如您所见,存储为tb的跟踪信息是最重要的信息

>>> print data['tb']
<p>Traceback (most recent call last):</p>
<ul>
<li>  Module m01.remote.processor, line 297, in _processJob<br />
    job.output = job(self)</li>
<li>  Module m01.remote.testing, line 86, in __call__<br />
    raise exceptions.RemoteException('An error occurred.')</li>
</ul><p>RemoteException: An error occurred.<br />
</p>

尝试用不太好的错误也试试

>>> fatalJob = testing.FatalExceptionJob()
>>> rp.addJobFactory(u'fatal', fatalJob)

现在添加并执行它

>>> jobid4 = rp.addJob(u'fatal')
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'queued']
>>> job4 = rp._jobs[jobid4]
>>> job4.retryCounter
1
>>> job4.status == u'queued'
True
>>> job4.errors
[<JobError u'...'>]

再次处理这个任务,但首先将我们的retryTime设置为过时的值,以模拟自上次调用以来时间已经过去

>>> import datetime
>>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC)
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'queued']
>>> job4 = rp._jobs[jobid4]
>>> job4.retryCounter
2
>>> job4.errors
[<JobError u'...'>,
 <JobError u'...'>]

再次处理任务的第3次。现在它不会重新抛出异常,但错误消息会附加到错误列表中。

>>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC)
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'error']

现在让我们看看发生了什么

>>> job4 = rp._jobs[jobid4]
>>> job4.retryCounter
3
>>> job4.status
u'error'
>>> rp.getJobStatus(jobid4)
u'error'
>>> job4.errors
[<JobError u'...'>,
 <JobError u'...'>,
 <JobError u'...'>]
>>> rp.getJobErrors(jobid4)
[<JobError u'...'>,
 <JobError u'...'>,
 <JobError u'...'>]

出于管理目的,远程处理器还允许您检查所有任务

>>> pprint(dict(rp._jobs))
{u'...': <EchoJob u'...' ...>,
 u'...': <EchoJob u'...' ...>,
 u'...': <RemoteExceptionJob u'...' ...>,
 u'...': <FatalExceptionJob u'...' ...>}

要删除不再需要的任务,我们可以使用removeJobs方法。

>>> jobid8 = rp.addJob(u'echo', {'blah': 'blah'})
>>> transaction.commit()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'error', u'queued']
>>> rp.removeJobs()
{u'cancelled': 1, u'completed': 1, u'error': 2}
>>> sorted([job.status for job in rp._jobs.values()])
[u'queued']

现在处理最后一个待处理任务,并确保我们不会得到更多任务

>>> rp.pullNextJob()
<EchoJob u'...' ...>

线程行为

每个远程处理器都在一个单独的线程中运行,允许它们独立操作。任务应该设计成避免冲突错误。

让我们启动我们在此点定义的远程处理器,并看看因此运行了哪些线程

>>> rp.startProcessor()

>>> import pprint
>>> import threading

>>> def show_threads():
...     threads = [t for t in threading.enumerate()
...                if t.getName().startswith('root')]
...     threads.sort(key=lambda t: t.getName())
...     pprint.pprint(threads)

>>> show_threads()
[<Thread(root-worker, started daemon ...)>]

让我们停止远程处理器,并给后台线程一个机会来接收消息

>>> rp.stopProcessor()

>>> import time
>>> time.sleep(2)

线程已经退出了

>>> print [t for t in threading.enumerate()
...        if t.getName().startswith('root')]
[]

任务工作者

队列中任务的真正处理由一个称为任务工作者的单独组件来处理。此组件通常在自己的线程中运行,并提供了自己的主循环。

>>> import time
>>> import transaction

worker模块提供了一个任务工作者,它一次执行一个任务。另一个工作者根据调度项设置安排新任务。让我们创建必要的组件来测试任务工作者

  1. 创建远程处理器

>>> from m01.remote import testing
>>> rp = root
>>> rp.isProcessing
False
>>> rp.isScheduling
False
  1. 注册一个简单地睡眠并写入消息的任务

>>> data = {'retryDelay': 1}
>>> sleepJob = testing.SleepJob(data)
>>> rp.addJobFactory(u'sleep', sleepJob)

SimpleJobWorker

此工作者一次执行一个任务。它被设计用于耗时较长且占用计算机大部分处理能力的任务。

让我们首先注册几个任务

>>> jobid1 = rp.addJob(u'sleep', (0.04, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (0.1,  2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0,    3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.08, 4))
>>> time.sleep(0.2)
>>> transaction.commit()

现在让我们首先检查我们是否可以访问任务

>>> job = rp._jobs.get(jobid1)
>>> job
<SleepJob u'...' ...>

让我们尝试如果任务是否准备好处理

>>> rp.getJobStatus(jobid1)
u'queued'
>>> rp.getJobStatus(jobid2)
u'queued'
>>> rp.getJobStatus(jobid3)
u'queued'
>>> rp.getJobStatus(jobid4)
u'queued'

让我们首先直接执行一个任务。simple worker构造函数的第一个参数是远程处理器实例。所有其他参数都是可选的,可以在RemoteProcessor类中定义为工作者参数,请参阅jobWorkerArguments和schedulerWorkerArguments

>>> from m01.remote.worker import SimpleJobWorker
>>> worker = SimpleJobWorker(rp, waitTime=0.0)

现在让我们处理第一个任务。我们清除日志,我们还必须结束任何现有的交互,以便在这个线程中处理任务

>>> logger.clear()
>>> from zope.security import management
>>> management.endInteraction()
>>> worker.doProcessNextJob()
True
>>> print logger
m01.remote INFO
  Job: 1

现在让我们在远程处理器内部使用工作者。由于工作者构造函数还接受其他参数,因此也指定了这些参数

>>> rp.jobWorkerFactory = SimpleJobWorker
>>> rp.jobWorkerFactory
<class 'm01.remote.worker.SimpleJobWorker'>
>>> rp.jobWorkerArguments
{'waitTime': 0.0}

为了测试目的,等待时间已设置为0。它实际上默认设置为1秒。现在让我们开始处理任务,稍等片刻,以便所有任务都完成,然后再次停止处理

>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)

日志显示所有任务都已处理。但更重要的是,它们都是按照定义的顺序完成的。注意,第一个任务在启动远程处理器之前就被处理了。是的,这意味着远程处理器可以在队列未启动的情况下处理任务。启动远程处理器仅意味着任务被处理,而不必手动执行。

>>> print logger
m01.remote INFO
  Job: 1
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  Job: 2
m01.remote INFO
  Job: 3
m01.remote INFO
  Job: 4
m01.remote INFO
  Processor 'root-worker' stopped
>>> logger.clear()

任务中的事务

使用SimpleJobWorker,任务_不应该_更改事务状态,因为远程处理器对任务的行政管理和任务本身都在同一事务中运行,所以从任务内部中止它可能会搞乱行政部分。

这是一个回归测试,中止任务内部的事务不会导致无限循环(因为SimpleJobWorker在事务内部拉取任务,所以如果它被中止,任务将保留在队列中)

>>> testing.testCounter
0
>>> counter = 0
>>> data = {'counter': counter}
>>> abortJob = testing.TransactionAbortJob(data)
>>> rp.addJobFactory(u'abortJob', abortJob)
>>> jobid = rp.addJob(u'abortJob', (1))
>>> time.sleep(0.5)
>>> jobid = rp.addJob(u'abortJob', (2))
>>> transaction.commit()
>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> transaction.abort() # prevent spurious conflict errors
>>> testing.testCounter
2
>>> print logger
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  Job: 1
m01.remote INFO
  Job: 2
m01.remote INFO
  Processor 'root-worker' stopped

重置测试计数器

>>> testing.testCounter = 0

MultiJobProcessor

多线程任务工作者可以同时执行多个任务。它被设计用于耗时较长但使用很少处理能力的任务。

让我们添加一些新的作业来执行

>>> jobid1 = rp.addJob(u'sleep', (0.04, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (1.0,  2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0,    3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.2,  4))
>>> time.sleep(0.2)
>>> transaction.commit()

在测试远程处理器中的工作者之前,我们先单独查看每个方法。因此我们实例化工作者

>>> from m01.remote.worker import MultiJobWorker
>>> worker = MultiJobWorker(rp, waitTime=0, maxThreads=2)

最大线程数也可以设置

>>> worker.maxThreads
2

任何时间都可以审查所有工作线程

>>> worker.threads
[]
>>> from zope.security import management
>>> management.endInteraction()

让我们拉取一个新的作业

>>> job = worker.doPullNextJob()
>>> job
<SleepJob u'...' ...>

在执行之前,我们需要先拉取作业,这样数据库就会标记作业为正在处理,没有新的线程可以获取相同的作业。如您所见,作业被标记为处理状态

>>> job.status
u'processing'

一旦我们拉取了特定的作业,我们就可以处理它

>>> logger.clear()
>>> print logger
>>> worker.doProcessJob(job.__name__)
>>> print logger
m01.remote INFO
  Job: 1

现在让我们看看如何在任务服务中使用处理器。这主要意味着设置处理器工厂

>>> management.newInteraction()
>>> rp.jobWorkerFactory = MultiJobWorker
>>> rp.jobWorkerArguments = {'waitTime': 1.0, 'maxThreads': 2}
>>> transaction.commit()
>>> logger.clear()

现在让我们处理剩余的作业

>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(1.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)

如您所见,这次作业不再按顺序完成,因为它们执行所需的时间不同

>>> print logger
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 3
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 4
m01.remote INFO
  Job: 2
m01.remote INFO
  Processor 'root-worker' stopped

现在我们将线程限制设置为四,构建一组新的作业以证明所有作业将同时运行

>>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 4}
>>> jobid1 = rp.addJob(u'sleep', (0.3, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (0.4, 2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0.1, 3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.5, 4))
>>> time.sleep(0.2)
>>> transaction.commit()

如果所有任务一次性处理,作业3应该首先完成。您还可以看到,作业4在工作者记录处理之前立即处理

>>> logger.clear()
>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(1.0)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> print logger
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 3
m01.remote INFO
  Job: 1
m01.remote INFO
  Job: 2
m01.remote INFO
  Job: 4
m01.remote INFO
  Processor 'root-worker' stopped

现在将线程限制设置为两个,构建一组新的作业以证明不会同时运行超过两个线程

>>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 2}
>>> transaction.commit()
>>> jobid1 = rp.addJob(u'sleep', (0.3, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (0.4, 2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0.2, 3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.5, 4))
>>> time.sleep(0.2)
>>> transaction.commit()

如果所有任务一次性处理,作业3应该首先完成,但由于作业需要等待可用的线程,它将排在第三位。现在我们可以运行作业并查看结果

>>> logger.clear()
>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(1.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> print logger
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 1
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 2
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 3
m01.remote INFO
  Job: 4
m01.remote INFO
  Processor 'root-worker' stopped

调度器

调度器概念实现为包含调度项的附加调度容器。

>>> from m01.mongo import UTC
>>> import m01.remote.scheduler
>>> from m01.remote import interfaces
>>> from m01.remote import testing

现在让我们先获取我们的测试远程处理器,该处理器包含我们的调度容器

>>> remoteProcessor = root
>>> remoteProcessor
<TestProcessor None>
>>> scheduler = remoteProcessor._scheduler
>>> tuple(scheduler.values())
()

延迟

我们可以为延迟作业处理添加一个调度项。让我们添加这样一个项

>>> import datetime
>>> def getNextTime(dt, seconds):
...     return dt + datetime.timedelta(seconds=seconds)
>>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC)
>>> now10 = getNextTime(now, 10)
>>> delay = 10
>>> data = {'jobName': u'echo 1', 'active': True, 'delay': delay,
...         'retryDelay': 5, 'nextCallTime': now10}
>>> firstEcho = m01.remote.scheduler.Delay(data)
>>> interfaces.IDelay.providedBy(firstEcho)
True

延迟设置为10

>>> firstEcho.delay
10

和重试延迟设置为5

>>> firstEcho.retryDelay
5

并将显式的下一次调用时间设置为现在+10

>>> firstEcho.nextCallTime == getNextTime(now, 10)
True

并且我们的重试时间设置为None

>>> firstEcho.retryTime is None
True

现在我们可以将延迟项添加到调度器

>>> scheduler.add(firstEcho)
u'...'

如您所见,调度器包含一个项

>>> sorted(scheduler.values())
[<Delay ... for: u'echo 1'>]

接下来,我们将测试一些调度器API方法。首先检查我们是否可以使用updateRetryTime更新缓存中项的重试时间

>>> scheduler.updateRetryTime(firstEcho.dump(), now)
False

如您所见,我们没有得到新的重试时间。这是因为我们没有使用正确的调用时间。让我们尝试使用正确的下一次调用时间

>>> now10 = getNextTime(now, 10)
>>> now15 = getNextTime(now, 15)
>>> retryTime = scheduler.updateRetryTime(firstEcho.dump(), now10)
>>> retryTime == now15
True

如您所见,新的重试时间是使用5秒的重试延迟。这个重试时间用于锁定项。这意味着项在时间过去之前不会被选中。

现在让我们尝试另一个内部API方法,它可以获取添加缓存中的下一个项

>>> scheduler.getNextCachedItem(now)

如您所见,该方法没有返回项,让我们尝试使用下一个计划调用时间

>>> nextCallTime = firstEcho.nextCallTime
>>> scheduler.getNextCachedItem(now10)
<Delay ... for: u'echo 1'>

如您所见,重试时间基于下一个调用时间和重试延迟进行设置

>>> firstEcho.retryTime == getNextTime(nextCallTime, 5)
True

现在重要的一部分。让我们测试我们的方法,该方法负责获取包括Mongo中的下一个项。此方法使用上述两个方法。当然,使用当前时间我们将不会获取任何项

>>> scheduler.pullNextSchedulerItem(now) is None
True

但现在已经需要另一个下一个调用时间,因为之前的调用更新了项的下一个调用时间。让我们首先检查下一个调用时间

>>> firstEcho.nextCallTime == now10
True

但如您所见,重试时间在我们的上次测试中已经设置。这意味着如果我们至少使用比重试时间更大的时间,我们才会得到一个项

>>> firstEcho.retryTime == now15
True
>>> scheduler.pullNextSchedulerItem(now10)
>>> scheduler.pullNextSchedulerItem(now15)
<Delay ... for: u'echo 1'>

现在,让我们检查我们的计划项时间

>>> now20 = getNextTime(now15, 5)
>>> firstEcho.nextCallTime == now10
True

注意,我们的重试时间是用当前调用时间和重试延迟计算的。如果我们使用调用时间作为重试时间计算的基础,那就没有意义了

>>> firstEcho.retryTime == now20
True

pullNextSchedulerItem方法返回一个挂起的项或None,因为我们没有挂起的项

>>> scheduler.pullNextSchedulerItem(now) is None
True

现在让我们在某个调度器时间内添加第二个调度项

>>> import datetime
>>> delay = 10
>>> data = {'jobName': u'echo 2', 'active': True, 'delay': delay,
...         'retryDelay': 5}
>>> secondEcho = m01.remote.scheduler.Delay(data)
>>> scheduler.add(secondEcho)
u'...'
>>> sorted(scheduler.values(), key=lambda x:(x.__name__, x.__name__))
[<Delay ... for: u'echo 1'>, <Delay ... for: u'echo 2'>]
>>> scheduler.remove(firstEcho)
>>> scheduler.remove(secondEcho)
>>> tuple(scheduler.values())
()

adjustCallTime

在我们测试cron项目之前,让我们测试一下我们的方法,该方法可以将给定的日期时间重置为最小的起始点。例如,如果小时被用作计算基础,我们需要从第一分钟开始计数。

>>> from m01.remote.scheduler import adjustCallTime
>>> now = datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC)
>>> now
datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC)
>>> item = m01.remote.scheduler.Cron({'jobName': u'bar', 'minute': [5]})
>>> adjustCallTime(item, now)
datetime.datetime(2010, 10, 25, 16, 6, 0, 123, tzinfo=UTC)

Cron

可能更有趣的实现是cron计划任务项。这个cron项目可以在特定给定的时间安排任务。让我们设置这样的cron项目。

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5}
>>> cronItem = m01.remote.scheduler.Cron(data)

cronItem提供了ISchedulerItem和ICron接口。

>>> interfaces.ISchedulerItem.providedBy(cronItem)
True
>>> interfaces.ICron.providedBy(cronItem)
True

如您所见,cron项目还提供了一个retryDelay。

>>> cronItem.retryDelay
5

让我们首先解释一下它是如何工作的。cron计划程序提供了一个下一次调用时间戳。如果计算出的下一次调用时间小于上一次调用时间,cron计划任务项将计算新的下一次调用时间,并将其存储为nextCallTime,同时将上一个nextCallTime返回。这将确保我们至少进行一次时间计算调用,因为每次cron计划任务项被询问下一次调用时间时,都会使用存储的nextCallTime。只有当现有的下一次调用时间小于给定的调用时间时,cron计划任务项才会计算下一次调用时间。

现在让我们测试cron作为计划任务项。设置一个简单的cron项目,周期为5分钟。

>>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC)
>>> now
datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)
>>> data = {'jobName': u'echo cron', 'active': True, 'retryDelay': 5,
...         'minute': [5], 'nextCallTime': now}
>>> cronEcho = m01.remote.scheduler.Cron(data)

现在将项目添加到计划程序。

>>> scheduler.add(cronEcho)
u'...'

如您所见,我们的cron项目是基于给定的nextCallTime进行计划的。

>>> cronEcho.nextCallTime
datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)

retrytime为空。

>>> cronEcho.retryTime is None
True

分钟列表包含我们的5分钟。

>>> cronEcho.minute
[5]
>>> cronEcho.hour
[]
>>> cronEcho.dayOfMonth
[]
>>> cronEcho.month
[]
>>> cronEcho.dayOfWeek
[]

计划程序包含一个cron项目。

>>> tuple(scheduler.values())
(<Cron ... for: u'echo cron'>,)

现在我们可以根据cron计划任务项定义的jobName“echo”获取工作项,如果调用pullNextSchedulerItem。

>>> scheduler.pullNextSchedulerItem(now)
<Cron ... for: u'echo cron'>

在这次调用中,retryTime根据retryDelay设置。

>>> cronEcho.retryTime
datetime.datetime(2010, 10, 1, 0, 0, 5, tzinfo=UTC)

现在让我们测试不同的cron设置。注意,我们为分钟、小时、月份、星期几和每月的日期提供了值列表。这意味着你可以将任务安排为每15分钟执行一次,如果你将分钟设置为(0, 15, 30, 45),或者如果你想在每个小时后的第15分钟设置任务,你可以将分钟设置为(15,)。如果你设置多个参数,例如分钟、小时或天数等,所有参数都必须符合给定的时间。

让我们从一个每小时的第1分钟和第2分钟的cron计划程序开始。通常,cron计划任务项会将int(time.time())作为nextCallTime值。为了测试我们的cron计划任务项,我们使用显式的startTime值为0(零)。

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [0, 1]}
>>> cronItem = m01.remote.scheduler.Cron(data)

下一次调用时间是基于给定的startTime值设置的。这意味着第一次调用将在0(零)分钟。

>>> cronItem.nextCallTime is None
True

现在让我们调用getNextCallTime,如您所见,我们将得到None作为nextCallTime,因为我们没有在cron初始化期间设置nextCallTime,并且nextCallTime被设置为下一分钟。

>>> cronItem.getNextCallTime(now) is None
True
>>> cronItem.nextCallTime
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)

现在让我们再次调用getNextCallTime,如您所见,我们将得到我们在对象初始化期间计算的nextCallTime,并且nextCallTime被设置为下一分钟。

如果我们使用加上5秒的调用时间,我们仍然会得到缓存的1分钟下一次调用时间,并且不会生成新的下一次调用时间,因为这个时间已经是在未来的。

>>> cronItem.getNextCallTime(getNextTime(now, 5))
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)
>>> cronItem.nextCallTime
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)

如果我们使用等于或大于从缓存的下一次调用时间开始计算1分钟延迟的调用时间,我们将得到缓存的调用时间作为值,就像我们在较小的调用时间中得到的那样(见上面的示例)。

>>> cronItem.getNextCallTime(getNextTime(now, 65))
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)
>>> cronItem.nextCallTime
datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)

所有未来的调用,如果时间小于下一次调用时间,都将返回当前的下一次调用时间,而不会计算任何新的时间。

>>> cronItem.getNextCallTime(getNextTime(now, 125))
datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)
>>> cronItem.getNextCallTime(getNextTime(now, 1*60*60))
datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)

记住,getNextCallTime返回之前计算的下一次调用时间,而新的计算的下一次调用时间将被存储为nextCallTime。为了更简单的测试输出,我们定义了一个显示时间计算的方法。

分钟

让我们开始测试时间表。

>>> def getNextCallTime(cron, dt, seconds=None):
...     """Return stored and new calculated nextCallTime"""
...     if seconds is None:
...         callTime = dt
...     else:
...         callTime = getNextTime(dt, seconds)
...     nextCallTime = cron.getNextCallTime(callTime)
...     return '%s --> %s' % (nextCallTime, cron.nextCallTime)
>>> now = datetime.datetime(1970, 1, 1, 0, 3, 0, tzinfo=UTC)
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [0, 10], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)
>>> str(now)
'1970-01-01 00:03:00+00:00'
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 1)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 2*60)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 51*60)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 01:00:00+00:00'
>>> getNextCallTime(item, now, 55*60)
'1970-01-01 01:00:00+00:00 --> 1970-01-01 01:00:00+00:00'

小时

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'hour': [2, 13], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 02:00:00+00:00'
>>> getNextCallTime(item, now, 2*60*60)
'1970-01-01 02:00:00+00:00 --> 1970-01-01 13:00:00+00:00'
>>> getNextCallTime(item, now, 4*60*60)
'1970-01-01 13:00:00+00:00 --> 1970-01-01 13:00:00+00:00'
>>> getNextCallTime(item, now, 13*60*60)
'1970-01-01 13:00:00+00:00 --> 1970-01-02 02:00:00+00:00'
>>> getNextCallTime(item, now, 15*60*60)
'1970-01-02 02:00:00+00:00 --> 1970-01-02 02:00:00+00:00'

月份

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'month': [1, 2, 5, 12], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-02-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 90*24*60*60)
'1970-02-01 00:03:00+00:00 --> 1970-05-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 120*24*60*60)
'1970-05-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 130*24*60*60)
'1970-12-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 360*24*60*60)
'1970-12-01 00:03:00+00:00 --> 1971-01-01 00:03:00+00:00'

dayOfWeek [0..6]

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'dayOfWeek': [0, 2, 4, 5], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)

当前工作日的now是

>>> now.weekday()
3

这意味着我们的nextCallTime应该使用第4天作为nextCallTime,如果我们用now调用它们。

>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-02 00:03:00+00:00'

再过一天,我们将得到工作日4(跳过)

>>> getNextCallTime(item, now, 24*60*60)
'1970-01-02 00:03:00+00:00 --> 1970-01-03 00:03:00+00:00'

再过一天,我们将得到工作日5(增加)

>>> getNextCallTime(item, now, 2*24*60*60)
'1970-01-03 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'

再过一天,我们将得到工作日6(跳过)

>>> getNextCallTime(item, now, 3*24*60*60)
'1970-01-05 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'

再过一天,我们将得到工作日0(增加)

>>> getNextCallTime(item, now, 4*24*60*60)
'1970-01-05 00:03:00+00:00 --> 1970-01-07 00:03:00+00:00'

dayOfMonth [1..31]

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'dayOfMonth': [2, 12, 21, 30], 'nextCallTime': now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-02 00:00:00+00:00'
>>> getNextCallTime(item, now, 12*24*60*60)
'1970-01-02 00:00:00+00:00 --> 1970-01-21 00:00:00+00:00'
>>> getNextCallTime(item, now, 31*24*60*60)
'1970-01-21 00:00:00+00:00 --> 1970-02-02 00:00:00+00:00'

组合

组合一些属性

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [10], 'dayOfMonth': [1, 10, 20, 30],
...         'nextCallTime': now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 10*60)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 01:10:00+00:00'
>>> getNextCallTime(item, now, 10*24*60*60)
'1970-01-01 01:10:00+00:00 --> 1970-01-20 00:10:00+00:00'
>>> getNextCallTime(item, now, 20*24*60*60)
'1970-01-20 00:10:00+00:00 --> 1970-01-30 00:10:00+00:00'

另一个示例

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [10], 'hour': [4], 'dayOfMonth': [1, 12, 21, 30],
...         'nextCallTime': now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 04:10:00+00:00'
>>> getNextCallTime(item, now, 10*60)
'1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'
>>> getNextCallTime(item, now, 4*60*60)
'1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'
>>> getNextCallTime(item, now, 5*60*60)
'1970-01-01 04:10:00+00:00 --> 1970-01-12 04:10:00+00:00'

变更

3.0.0 (2015-11-10)

  • 支持 pymongo >= 3.0.0,并将版本号设置为3.0.0,以反映 pymongo >= 3.0.0 的兼容性

0.6.0 (2013-06-28)

  • 特性:实现了 JobError 作为 Job 的子项。并将之前的 JobError 重命名为 RemoteException。此更改需要您在更新之前删除作业列表中的所有以前的 JobError 作业。同时,在您的代码中使用 RemoteException 代替 JobError。新的 JobError 子项提供了更好的错误回溯信息和创建日期。

  • 特性:实现了更好的错误处理,保存格式化的回溯字符串

0.5.1 (2012-11-18)

  • 添加了 MANIFEST.in 文件

  • 删除了 p01.i18n 包依赖项

  • 允许删除具有所有状态的作业

  • 拆分调度器和容器,并将调度器部分移动到混合类中

  • 切换到 bson 导入

  • 在 getBatchData 签名中反映更改

  • 修复日期时间比较,四舍五入毫秒

  • 调整不同的架构描述,使用标题中使用的相同消息 ID

  • 删除了未使用的 ID

0.5.0 (2011-08-19)

  • 初始发布

由以下组织支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面