跳转到主要内容

Zope3的远程处理队列

项目描述

此包为Zope3提供了一种使用mongodb而不是ZODB的远程处理队列。

README

此包提供了一种远程处理器。该远程处理器作为一个简单的对象实现,使用mongodb作为存储。处理器可以在另一个线程中执行预定义的任务。还可以使用不同的调度项在特定时间运行任务。

RemoteProcessor使用了两个不同的处理器。一个处理任务,另一个从调度器中挑选项目并添加任务。这种分离在实现分布式概念时很有用。这意味着一个或多个应用程序可以根据给定的调度项安排任务项。另一个应用程序处理任务,并不知道如何安排下一个任务项。

由于我们使用此远程调度器进行低CPU密集型任务,我们提供了多处理。这是通过在主工作线程中运行多个工作进程来完成的。如果您使用子进程进行任务处理,您将得到一个真正的多进程处理器,它不受当前Python进程的限制。

您可以在远程处理器中配置任务工作进程可以启动的线程数量。请参阅jobWorkerArguments/maxThreads。默认情况下,此数字使用您机器上安装的CPU数量。

实现使用mongodb作为其组件的存储。这意味着任务、任务工厂和调度项都使用m01.mongo提供的ORM概念存储在mongodb中。

请参阅p01.remote查看基于ZODB的远程处理器实现,但请注意,p01.remote实现尚未提供工作进程和调度器处理器的分离。

设置

>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing

现在让我们创建两个远程处理器。我们可以使用我们的远程队列站点实现

>>> from zope.security.proxy import removeSecurityProxy
>>> from m01.remote import interfaces

我们的测试远程处理器应作为应用程序根可用

>>> rp = root
>>> rp
<TestProcessor None>

让我们发现可用的任务

>>> dict(root._jobs)
{}

任务容器最初是空的,因为我们还没有添加任何任务工厂。现在让我们定义一个简单的任务工厂,它只是回显输入字符串

>>> echoJob = testing.EchoJob({})

现在我们可以设置任务输入

>>> echoJob.input = {'foo': u'blah'}

任务唯一的API要求是可以调用。现在我们确保任务能够工作。注意我们用远程处理器实例调用我们的任务,这是我们的初始化应用程序根

>>> echoJob(root)
{'foo': u'blah'}

让我们将任务添加到可用任务列表中

>>> rp.addJobFactory(u'echo', echoJob)

echo任务现在在远程处理器中可用

>>> dict(rp._jobFactories)
{u'echo': <EchoJob u'echo'>}

由于远程处理器不能立即完成任务,因此入站任务由队列管理。首先,我们请求执行echo任务

>>> jobid1 = rp.addJob(u'echo', {'foo': 'bar'})
>>> jobid1
u'...'
>>> sorted([job.status for job in rp._jobs.values()])
[u'queued']

addJob()函数将名为“echo”的任务安排为使用指定参数执行。该方法返回一个任务ID,我们可以用它来查询任务。addJob()函数将任务标记为已排队。

>>> rp.getJobStatus(jobid1)
u'queued'

由于任务尚未被处理,状态设置为“已排队”。此外,尚无结果可用

>>> rp.getJobResult(jobid1) is None
True

只要任务没有被处理,就可以取消它

>>> rp.cancelJob(jobid1)
>>> rp.getJobStatus(jobid1)
u'cancelled'
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled']

默认情况下不会启动工作进程处理器

>>> rp.isProcessing
False

为了获得干净的日志环境,让我们清除日志堆栈

>>> logger.clear()

现在我们可以通过调用startProcessor来启动远程处理器

>>> rp.startProcessor()

看吧 - 远程处理器正在处理

>>> rp.isProcessing
True

检查日志将证明已启动远程处理器

>>> print logger
m01.remote INFO
  Processor 'root-worker' started

让我们再次停止处理器

>>> rp.stopProcessor()
>>> rp.isProcessing
False

现在让我们从一个处理过的任务中获取结果,但首先提交新添加的任务

>>> jobid2 = rp.addJob(u'echo', {'foo': u'bar'})
>>> transaction.commit()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'queued']

现在创建一个工作进程并调用我们的简单工作进程来处理新任务

>>> class FakeWorker(object):
...
...     def __init__(self, rp):
...         self.rp = rp
...
...     def __call__(self):
...         try:
...             result = self.rp.processNextJob()
...             transaction.commit()
...         except Exception, error:
...             transaction.commit()
>>> worker = FakeWorker(rp)
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed']

首先检查任务是否已处理

>>> rp.getJobStatus(jobid2)
u'completed'
>>> rp.getJobResult(jobid2)
{u'foo': u'bar'}

错误处理

现在,让我们定义一个会引发错误的新的任务

>>> errorJob = testing.RemoteExceptionJob()
>>> rp.addJobFactory(u'error', errorJob)

现在添加并执行它

>>> jobid3 = rp.addJob(u'error')
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error']

现在让我们看看发生了什么

>>> rp.getJobStatus(jobid3)
u'error'
>>> errors = rp.getJobErrors(jobid3)
>>> errors
[<JobError u'...'>]

这样的JobError项提供了以下数据

>>> error = tuple(errors)[0]
>>> data = error.dump()
>>> data = m01.mongo.dictify(data)
>>> pprint(data)
{'_id': ObjectId('...'),
 '_type': u'JobError',
 'created': datetime.datetime(..., ..., ..., ..., ..., ..., ..., tzinfo=<bson.tz_util.FixedOffset object at ...>),
 'tb': u"<p>Traceback (most recent call last):..."}

如您所见,存储为tb的跟踪信息是最重要的信息

>>> print data['tb']
<p>Traceback (most recent call last):</p>
<ul>
<li>  Module m01.remote.processor, line 297, in _processJob<br />
    job.output = job(self)</li>
<li>  Module m01.remote.testing, line 86, in __call__<br />
    raise exceptions.RemoteException('An error occurred.')</li>
</ul><p>RemoteException: An error occurred.<br />
</p>

尝试用不太好的错误也试试

>>> fatalJob = testing.FatalExceptionJob()
>>> rp.addJobFactory(u'fatal', fatalJob)

现在添加并执行它

>>> jobid4 = rp.addJob(u'fatal')
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'queued']
>>> job4 = rp._jobs[jobid4]
>>> job4.retryCounter
1
>>> job4.status == u'queued'
True
>>> job4.errors
[<JobError u'...'>]

再次处理这个任务,但首先将我们的retryTime设置为过时的值,以模拟自上次调用以来时间已经过去

>>> import datetime
>>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC)
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'queued']
>>> job4 = rp._jobs[jobid4]
>>> job4.retryCounter
2
>>> job4.errors
[<JobError u'...'>,
 <JobError u'...'>]

再次处理任务的第3次。现在它不会重新抛出异常,但错误消息会附加到错误列表中。

>>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC)
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'error']

现在让我们看看发生了什么

>>> job4 = rp._jobs[jobid4]
>>> job4.retryCounter
3
>>> job4.status
u'error'
>>> rp.getJobStatus(jobid4)
u'error'
>>> job4.errors
[<JobError u'...'>,
 <JobError u'...'>,
 <JobError u'...'>]
>>> rp.getJobErrors(jobid4)
[<JobError u'...'>,
 <JobError u'...'>,
 <JobError u'...'>]

出于管理目的,远程处理器还允许您检查所有任务

>>> pprint(dict(rp._jobs))
{u'...': <EchoJob u'...' ...>,
 u'...': <EchoJob u'...' ...>,
 u'...': <RemoteExceptionJob u'...' ...>,
 u'...': <FatalExceptionJob u'...' ...>}

要删除不再需要的任务,我们可以使用removeJobs方法。

>>> jobid8 = rp.addJob(u'echo', {'blah': 'blah'})
>>> transaction.commit()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'error', u'queued']
>>> rp.removeJobs()
{u'cancelled': 1, u'completed': 1, u'error': 2}
>>> sorted([job.status for job in rp._jobs.values()])
[u'queued']

现在处理最后一个待处理任务,并确保我们不会得到更多任务

>>> rp.pullNextJob()
<EchoJob u'...' ...>

线程行为

每个远程处理器都在一个单独的线程中运行,允许它们独立操作。任务应该设计成避免冲突错误。

让我们启动我们在此点定义的远程处理器,并看看因此运行了哪些线程

>>> rp.startProcessor()

>>> import pprint
>>> import threading

>>> def show_threads():
...     threads = [t for t in threading.enumerate()
...                if t.getName().startswith('root')]
...     threads.sort(key=lambda t: t.getName())
...     pprint.pprint(threads)

>>> show_threads()
[<Thread(root-worker, started daemon ...)>]

让我们停止远程处理器,并给后台线程一个机会来接收消息

>>> rp.stopProcessor()

>>> import time
>>> time.sleep(2)

线程已经退出了

>>> print [t for t in threading.enumerate()
...        if t.getName().startswith('root')]
[]

任务工作者

队列中任务的真正处理由一个称为任务工作者的单独组件来处理。此组件通常在自己的线程中运行,并提供了自己的主循环。

>>> import time
>>> import transaction

worker模块提供了一个任务工作者,它一次执行一个任务。另一个工作者根据调度项设置安排新任务。让我们创建必要的组件来测试任务工作者

  1. 创建远程处理器

>>> from m01.remote import testing
>>> rp = root
>>> rp.isProcessing
False
>>> rp.isScheduling
False
  1. 注册一个简单地睡眠并写入消息的任务

>>> data = {'retryDelay': 1}
>>> sleepJob = testing.SleepJob(data)
>>> rp.addJobFactory(u'sleep', sleepJob)

SimpleJobWorker

此工作者一次执行一个任务。它被设计用于耗时较长且占用计算机大部分处理能力的任务。

让我们首先注册几个任务

>>> jobid1 = rp.addJob(u'sleep', (0.04, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (0.1,  2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0,    3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.08, 4))
>>> time.sleep(0.2)
>>> transaction.commit()

现在让我们首先检查我们是否可以访问任务

>>> job = rp._jobs.get(jobid1)
>>> job
<SleepJob u'...' ...>

让我们尝试如果任务是否准备好处理

>>> rp.getJobStatus(jobid1)
u'queued'
>>> rp.getJobStatus(jobid2)
u'queued'
>>> rp.getJobStatus(jobid3)
u'queued'
>>> rp.getJobStatus(jobid4)
u'queued'

让我们首先直接执行一个任务。simple worker构造函数的第一个参数是远程处理器实例。所有其他参数都是可选的,可以在RemoteProcessor类中定义为工作者参数,请参阅jobWorkerArguments和schedulerWorkerArguments

>>> from m01.remote.worker import SimpleJobWorker
>>> worker = SimpleJobWorker(rp, waitTime=0.0)

现在让我们处理第一个任务。我们清除日志,我们还必须结束任何现有的交互,以便在这个线程中处理任务

>>> logger.clear()
>>> from zope.security import management
>>> management.endInteraction()
>>> worker.doProcessNextJob()
True
>>> print logger
m01.remote INFO
  Job: 1

现在让我们在远程处理器内部使用工作者。由于工作者构造函数还接受其他参数,因此也指定了这些参数

>>> rp.jobWorkerFactory = SimpleJobWorker
>>> rp.jobWorkerFactory
<class 'm01.remote.worker.SimpleJobWorker'>
>>> rp.jobWorkerArguments
{'waitTime': 0.0}

为了测试目的,等待时间已设置为0。它实际上默认设置为1秒。现在让我们开始处理任务,稍等片刻,以便所有任务都完成,然后再次停止处理

>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)

日志显示所有任务都已处理。但更重要的是,它们都是按照定义的顺序完成的。注意,第一个任务在启动远程处理器之前就被处理了。是的,这意味着远程处理器可以在队列未启动的情况下处理任务。启动远程处理器仅意味着任务被处理,而不必手动执行。

>>> print logger
m01.remote INFO
  Job: 1
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  Job: 2
m01.remote INFO
  Job: 3
m01.remote INFO
  Job: 4
m01.remote INFO
  Processor 'root-worker' stopped
>>> logger.clear()

任务中的事务

使用SimpleJobWorker,任务_不应该_更改事务状态,因为远程处理器对任务的行政管理和任务本身都在同一事务中运行,所以从任务内部中止它可能会搞乱行政部分。

这是一个回归测试,中止任务内部的事务不会导致无限循环(因为SimpleJobWorker在事务内部拉取任务,所以如果它被中止,任务将保留在队列中)

>>> testing.testCounter
0
>>> counter = 0
>>> data = {'counter': counter}
>>> abortJob = testing.TransactionAbortJob(data)
>>> rp.addJobFactory(u'abortJob', abortJob)
>>> jobid = rp.addJob(u'abortJob', (1))
>>> time.sleep(0.5)
>>> jobid = rp.addJob(u'abortJob', (2))
>>> transaction.commit()
>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> transaction.abort() # prevent spurious conflict errors
>>> testing.testCounter
2
>>> print logger
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  Job: 1
m01.remote INFO
  Job: 2
m01.remote INFO
  Processor 'root-worker' stopped

重置测试计数器

>>> testing.testCounter = 0

MultiJobProcessor

多线程任务工作者可以同时执行多个任务。它被设计用于耗时较长但使用很少处理能力的任务。

让我们添加一些新的作业来执行

>>> jobid1 = rp.addJob(u'sleep', (0.04, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (1.0,  2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0,    3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.2,  4))
>>> time.sleep(0.2)
>>> transaction.commit()

在测试远程处理器中的工作者之前,我们先单独查看每个方法。因此我们实例化工作者

>>> from m01.remote.worker import MultiJobWorker
>>> worker = MultiJobWorker(rp, waitTime=0, maxThreads=2)

最大线程数也可以设置

>>> worker.maxThreads
2

任何时间都可以审查所有工作线程

>>> worker.threads
[]
>>> from zope.security import management
>>> management.endInteraction()

让我们拉取一个新的作业

>>> job = worker.doPullNextJob()
>>> job
<SleepJob u'...' ...>

在执行之前,我们需要先拉取作业,这样数据库就会标记作业为正在处理,没有新的线程可以获取相同的作业。如您所见,作业被标记为处理状态

>>> job.status
u'processing'

一旦我们拉取了特定的作业,我们就可以处理它

>>> logger.clear()
>>> print logger
>>> worker.doProcessJob(job.__name__)
>>> print logger
m01.remote INFO
  Job: 1

现在让我们看看如何在任务服务中使用处理器。这主要意味着设置处理器工厂

>>> management.newInteraction()
>>> rp.jobWorkerFactory = MultiJobWorker
>>> rp.jobWorkerArguments = {'waitTime': 1.0, 'maxThreads': 2}
>>> transaction.commit()
>>> logger.clear()

现在让我们处理剩余的作业

>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(1.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)

如您所见,这次作业不再按顺序完成,因为它们执行所需的时间不同

>>> print logger
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 3
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 4
m01.remote INFO
  Job: 2
m01.remote INFO
  Processor 'root-worker' stopped

现在我们将线程限制设置为四,构建一组新的作业以证明所有作业将同时运行

>>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 4}
>>> jobid1 = rp.addJob(u'sleep', (0.3, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (0.4, 2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0.1, 3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.5, 4))
>>> time.sleep(0.2)
>>> transaction.commit()

如果所有任务一次性处理,作业3应该首先完成。您还可以看到,作业4在工作者记录处理之前立即处理

>>> logger.clear()
>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(1.0)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> print logger
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 3
m01.remote INFO
  Job: 1
m01.remote INFO
  Job: 2
m01.remote INFO
  Job: 4
m01.remote INFO
  Processor 'root-worker' stopped

现在将线程限制设置为两个,构建一组新的作业以证明不会同时运行超过两个线程

>>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 2}
>>> transaction.commit()
>>> jobid1 = rp.addJob(u'sleep', (0.3, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (0.4, 2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0.2, 3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.5, 4))
>>> time.sleep(0.2)
>>> transaction.commit()

如果所有任务一次性处理,作业3应该首先完成,但由于作业需要等待可用的线程,它将排在第三位。现在我们可以运行作业并查看结果

>>> logger.clear()
>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(1.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> print logger
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 1
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 2
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 3
m01.remote INFO
  Job: 4
m01.remote INFO
  Processor 'root-worker' stopped

调度器

调度器概念实现为包含调度项的附加调度容器。

>>> from m01.mongo import UTC
>>> import m01.remote.scheduler
>>> from m01.remote import interfaces
>>> from m01.remote import testing

现在让我们先获取我们的测试远程处理器,该处理器包含我们的调度容器

>>> remoteProcessor = root
>>> remoteProcessor
<TestProcessor None>
>>> scheduler = remoteProcessor._scheduler
>>> tuple(scheduler.values())
()

延迟

我们可以为延迟作业处理添加一个调度项。让我们添加这样一个项

>>> import datetime
>>> def getNextTime(dt, seconds):
...     return dt + datetime.timedelta(seconds=seconds)
>>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC)
>>> now10 = getNextTime(now, 10)
>>> delay = 10
>>> data = {'jobName': u'echo 1', 'active': True, 'delay': delay,
...         'retryDelay': 5, 'nextCallTime': now10}
>>> firstEcho = m01.remote.scheduler.Delay(data)
>>> interfaces.IDelay.providedBy(firstEcho)
True

延迟设置为10

>>> firstEcho.delay
10

和重试延迟设置为5

>>> firstEcho.retryDelay
5

并将显式的下一次调用时间设置为现在+10

>>> firstEcho.nextCallTime == getNextTime(now, 10)
True

并且我们的重试时间设置为None

>>> firstEcho.retryTime is None
True

现在我们可以将延迟项添加到调度器

>>> scheduler.add(firstEcho)
u'...'

如您所见,调度器包含一个项

>>> sorted(scheduler.values())
[<Delay ... for: u'echo 1'>]

接下来,我们将测试一些调度器API方法。首先检查我们是否可以使用updateRetryTime更新缓存中项的重试时间

>>> scheduler.updateRetryTime(firstEcho.dump(), now)
False

如您所见,我们没有得到新的重试时间。这是因为我们没有使用正确的调用时间。让我们尝试使用正确的下一次调用时间

>>> now10 = getNextTime(now, 10)
>>> now15 = getNextTime(now, 15)
>>> retryTime = scheduler.updateRetryTime(firstEcho.dump(), now10)
>>> retryTime == now15
True

如您所见,新的重试时间是使用5秒的重试延迟。这个重试时间用于锁定项。这意味着项在时间过去之前不会被选中。

现在让我们尝试另一个内部API方法,它可以获取添加缓存中的下一个项

>>> scheduler.getNextCachedItem(now)

如您所见,该方法没有返回项,让我们尝试使用下一个计划调用时间

>>> nextCallTime = firstEcho.nextCallTime
>>> scheduler.getNextCachedItem(now10)
<Delay ... for: u'echo 1'>

如您所见,重试时间基于下一个调用时间和重试延迟进行设置

>>> firstEcho.retryTime == getNextTime(nextCallTime, 5)
True

现在重要的一部分。让我们测试我们的方法,该方法负责获取包括Mongo中的下一个项。此方法使用上述两个方法。当然,使用当前时间我们将不会获取任何项

>>> scheduler.pullNextSchedulerItem(now) is None
True

但现在已经需要另一个下一个调用时间,因为之前的调用更新了项的下一个调用时间。让我们首先检查下一个调用时间

>>> firstEcho.nextCallTime == now10
True

但如您所见,重试时间在我们的上次测试中已经设置。这意味着如果我们至少使用比重试时间更大的时间,我们才会得到一个项

>>> firstEcho.retryTime == now15
True
>>> scheduler.pullNextSchedulerItem(now10)
>>> scheduler.pullNextSchedulerItem(now15)
<Delay ... for: u'echo 1'>

现在,让我们检查我们的计划项时间

>>> now20 = getNextTime(now15, 5)
>>> firstEcho.nextCallTime == now10
True

注意,我们的重试时间是用当前调用时间和重试延迟计算的。如果我们使用调用时间作为重试时间计算的基础,那就没有意义了

>>> firstEcho.retryTime == now20
True

pullNextSchedulerItem方法返回一个挂起的项或None,因为我们没有挂起的项

>>> scheduler.pullNextSchedulerItem(now) is None
True

现在让我们在某个调度器时间内添加第二个调度项

>>> import datetime
>>> delay = 10
>>> data = {'jobName': u'echo 2', 'active': True, 'delay': delay,
...         'retryDelay': 5}
>>> secondEcho = m01.remote.scheduler.Delay(data)
>>> scheduler.add(secondEcho)
u'...'
>>> sorted(scheduler.values(), key=lambda x:(x.__name__, x.__name__))
[<Delay ... for: u'echo 1'>, <Delay ... for: u'echo 2'>]
>>> scheduler.remove(firstEcho)
>>> scheduler.remove(secondEcho)
>>> tuple(scheduler.values())
()

adjustCallTime

在我们测试cron项目之前,让我们测试一下我们的方法,该方法可以将给定的日期时间重置为最小的起始点。例如,如果小时被用作计算基础,我们需要从第一分钟开始计数。

>>> from m01.remote.scheduler import adjustCallTime
>>> now = datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC)
>>> now
datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC)
>>> item = m01.remote.scheduler.Cron({'jobName': u'bar', 'minute': [5]})
>>> adjustCallTime(item, now)
datetime.datetime(2010, 10, 25, 16, 6, 0, 123, tzinfo=UTC)

Cron

可能更有趣的实现是cron计划任务项。这个cron项目可以在特定给定的时间安排任务。让我们设置这样的cron项目。

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5}
>>> cronItem = m01.remote.scheduler.Cron(data)

cronItem提供了ISchedulerItem和ICron接口。

>>> interfaces.ISchedulerItem.providedBy(cronItem)
True
>>> interfaces.ICron.providedBy(cronItem)
True

如您所见,cron项目还提供了一个retryDelay。

>>> cronItem.retryDelay
5

让我们首先解释一下它是如何工作的。cron计划程序提供了一个下一次调用时间戳。如果计算出的下一次调用时间小于上一次调用时间,cron计划任务项将计算新的下一次调用时间,并将其存储为nextCallTime,同时将上一个nextCallTime返回。这将确保我们至少进行一次时间计算调用,因为每次cron计划任务项被询问下一次调用时间时,都会使用存储的nextCallTime。只有当现有的下一次调用时间小于给定的调用时间时,cron计划任务项才会计算下一次调用时间。

现在让我们测试cron作为计划任务项。设置一个简单的cron项目,周期为5分钟。

>>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC)
>>> now
datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)
>>> data = {'jobName': u'echo cron', 'active': True, 'retryDelay': 5,
...         'minute': [5], 'nextCallTime': now}
>>> cronEcho = m01.remote.scheduler.Cron(data)

现在将项目添加到计划程序。

>>> scheduler.add(cronEcho)
u'...'

如您所见,我们的cron项目是基于给定的nextCallTime进行计划的。

>>> cronEcho.nextCallTime
datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)

retrytime为空。

>>> cronEcho.retryTime is None
True

分钟列表包含我们的5分钟。

>>> cronEcho.minute
[5]
>>> cronEcho.hour
[]
>>> cronEcho.dayOfMonth
[]
>>> cronEcho.month
[]
>>> cronEcho.dayOfWeek
[]

计划程序包含一个cron项目。

>>> tuple(scheduler.values())
(<Cron ... for: u'echo cron'>,)

现在我们可以根据cron计划任务项定义的jobName“echo”获取工作项,如果调用pullNextSchedulerItem。

>>> scheduler.pullNextSchedulerItem(now)
<Cron ... for: u'echo cron'>

在这次调用中,retryTime根据retryDelay设置。

>>> cronEcho.retryTime
datetime.datetime(2010, 10, 1, 0, 0, 5, tzinfo=UTC)

现在让我们测试不同的cron设置。注意,我们为分钟、小时、月份、星期几和每月的日期提供了值列表。这意味着你可以将任务安排为每15分钟执行一次,如果你将分钟设置为(0, 15, 30, 45),或者如果你想在每个小时后的第15分钟设置任务,你可以将分钟设置为(15,)。如果你设置多个参数,例如分钟、小时或天数等,所有参数都必须符合给定的时间。

让我们从一个每小时的第1分钟和第2分钟的cron计划程序开始。通常,cron计划任务项会将int(time.time())作为nextCallTime值。为了测试我们的cron计划任务项,我们使用显式的startTime值为0(零)。

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [0, 1]}
>>> cronItem = m01.remote.scheduler.Cron(data)

下一次调用时间是基于给定的startTime值设置的。这意味着第一次调用将在0(零)分钟。

>>> cronItem.nextCallTime is None
True

现在让我们调用getNextCallTime,如您所见,我们将得到None作为nextCallTime,因为我们没有在cron初始化期间设置nextCallTime,并且nextCallTime被设置为下一分钟。

>>> cronItem.getNextCallTime(now) is None
True
>>> cronItem.nextCallTime
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)

现在让我们再次调用getNextCallTime,如您所见,我们将得到我们在对象初始化期间计算的nextCallTime,并且nextCallTime被设置为下一分钟。

如果我们使用加上5秒的调用时间,我们仍然会得到缓存的1分钟下一次调用时间,并且不会生成新的下一次调用时间,因为这个时间已经是在未来的。

>>> cronItem.getNextCallTime(getNextTime(now, 5))
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)
>>> cronItem.nextCallTime
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)

如果我们使用等于或大于从缓存的下一次调用时间开始计算1分钟延迟的调用时间,我们将得到缓存的调用时间作为值,就像我们在较小的调用时间中得到的那样(见上面的示例)。

>>> cronItem.getNextCallTime(getNextTime(now, 65))
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)
>>> cronItem.nextCallTime
datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)

所有未来的调用,如果时间小于下一次调用时间,都将返回当前的下一次调用时间,而不会计算任何新的时间。

>>> cronItem.getNextCallTime(getNextTime(now, 125))
datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)
>>> cronItem.getNextCallTime(getNextTime(now, 1*60*60))
datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)

记住,getNextCallTime返回之前计算的下一次调用时间,而新的计算的下一次调用时间将被存储为nextCallTime。为了更简单的测试输出,我们定义了一个显示时间计算的方法。

分钟

让我们开始测试时间表。

>>> def getNextCallTime(cron, dt, seconds=None):
...     """Return stored and new calculated nextCallTime"""
...     if seconds is None:
...         callTime = dt
...     else:
...         callTime = getNextTime(dt, seconds)
...     nextCallTime = cron.getNextCallTime(callTime)
...     return '%s --> %s' % (nextCallTime, cron.nextCallTime)
>>> now = datetime.datetime(1970, 1, 1, 0, 3, 0, tzinfo=UTC)
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [0, 10], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)
>>> str(now)
'1970-01-01 00:03:00+00:00'
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 1)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 2*60)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 51*60)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 01:00:00+00:00'
>>> getNextCallTime(item, now, 55*60)
'1970-01-01 01:00:00+00:00 --> 1970-01-01 01:00:00+00:00'

小时

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'hour': [2, 13], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 02:00:00+00:00'
>>> getNextCallTime(item, now, 2*60*60)
'1970-01-01 02:00:00+00:00 --> 1970-01-01 13:00:00+00:00'
>>> getNextCallTime(item, now, 4*60*60)
'1970-01-01 13:00:00+00:00 --> 1970-01-01 13:00:00+00:00'
>>> getNextCallTime(item, now, 13*60*60)
'1970-01-01 13:00:00+00:00 --> 1970-01-02 02:00:00+00:00'
>>> getNextCallTime(item, now, 15*60*60)
'1970-01-02 02:00:00+00:00 --> 1970-01-02 02:00:00+00:00'

月份

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'month': [1, 2, 5, 12], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-02-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 90*24*60*60)
'1970-02-01 00:03:00+00:00 --> 1970-05-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 120*24*60*60)
'1970-05-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 130*24*60*60)
'1970-12-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 360*24*60*60)
'1970-12-01 00:03:00+00:00 --> 1971-01-01 00:03:00+00:00'

dayOfWeek [0..6]

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'dayOfWeek': [0, 2, 4, 5], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)

当前工作日的now是

>>> now.weekday()
3

这意味着我们的nextCallTime应该使用第4天作为nextCallTime,如果我们用now调用它们。

>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-02 00:03:00+00:00'

再过一天,我们将得到工作日4(跳过)

>>> getNextCallTime(item, now, 24*60*60)
'1970-01-02 00:03:00+00:00 --> 1970-01-03 00:03:00+00:00'

再过一天,我们将得到工作日5(增加)

>>> getNextCallTime(item, now, 2*24*60*60)
'1970-01-03 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'

再过一天,我们将得到工作日6(跳过)

>>> getNextCallTime(item, now, 3*24*60*60)
'1970-01-05 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'

再过一天,我们将得到工作日0(增加)

>>> getNextCallTime(item, now, 4*24*60*60)
'1970-01-05 00:03:00+00:00 --> 1970-01-07 00:03:00+00:00'

dayOfMonth [1..31]

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'dayOfMonth': [2, 12, 21, 30], 'nextCallTime': now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-02 00:00:00+00:00'
>>> getNextCallTime(item, now, 12*24*60*60)
'1970-01-02 00:00:00+00:00 --> 1970-01-21 00:00:00+00:00'
>>> getNextCallTime(item, now, 31*24*60*60)
'1970-01-21 00:00:00+00:00 --> 1970-02-02 00:00:00+00:00'

组合

组合一些属性

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [10], 'dayOfMonth': [1, 10, 20, 30],
...         'nextCallTime': now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 10*60)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 01:10:00+00:00'
>>> getNextCallTime(item, now, 10*24*60*60)
'1970-01-01 01:10:00+00:00 --> 1970-01-20 00:10:00+00:00'
>>> getNextCallTime(item, now, 20*24*60*60)
'1970-01-20 00:10:00+00:00 --> 1970-01-30 00:10:00+00:00'

另一个示例

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [10], 'hour': [4], 'dayOfMonth': [1, 12, 21, 30],
...         'nextCallTime': now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 04:10:00+00:00'
>>> getNextCallTime(item, now, 10*60)
'1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'
>>> getNextCallTime(item, now, 4*60*60)
'1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'
>>> getNextCallTime(item, now, 5*60*60)
'1970-01-01 04:10:00+00:00 --> 1970-01-12 04:10:00+00:00'

变更

3.0.0 (2015-11-10)

  • 支持 pymongo >= 3.0.0,并将版本号设置为3.0.0,以反映 pymongo >= 3.0.0 的兼容性

0.6.0 (2013-06-28)

  • 特性:实现了 JobError 作为 Job 的子项。并将之前的 JobError 重命名为 RemoteException。此更改需要您在更新之前删除作业列表中的所有以前的 JobError 作业。同时,在您的代码中使用 RemoteException 代替 JobError。新的 JobError 子项提供了更好的错误回溯信息和创建日期。

  • 特性:实现了更好的错误处理,保存格式化的回溯字符串

0.5.1 (2012-11-18)

  • 添加了 MANIFEST.in 文件

  • 删除了 p01.i18n 包依赖项

  • 允许删除具有所有状态的作业

  • 拆分调度器和容器,并将调度器部分移动到混合类中

  • 切换到 bson 导入

  • 在 getBatchData 签名中反映更改

  • 修复日期时间比较,四舍五入毫秒

  • 调整不同的架构描述,使用标题中使用的相同消息 ID

  • 删除了未使用的 ID

0.5.0 (2011-08-19)

  • 初始发布

项目详情


下载文件

下载适合您平台文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

m01.remote-3.0.0.zip (68.5 kB 查看哈希值)

上传时间

由以下组织支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面