Zope3的远程处理队列
项目描述
此包为Zope3提供了一种使用mongodb而不是ZODB的远程处理队列。
README
此包提供了一种远程处理器。该远程处理器作为一个简单的对象实现,使用mongodb作为存储。处理器可以在另一个线程中执行预定义的任务。还可以使用不同的调度项在特定时间运行任务。
RemoteProcessor使用了两个不同的处理器。一个处理任务,另一个从调度器中挑选项目并添加任务。这种分离在实现分布式概念时很有用。这意味着一个或多个应用程序可以根据给定的调度项安排任务项。另一个应用程序处理任务,并不知道如何安排下一个任务项。
由于我们使用此远程调度器进行低CPU密集型任务,我们提供了多处理。这是通过在主工作线程中运行多个工作进程来完成的。如果您使用子进程进行任务处理,您将得到一个真正的多进程处理器,它不受当前Python进程的限制。
您可以在远程处理器中配置任务工作进程可以启动的线程数量。请参阅jobWorkerArguments/maxThreads。默认情况下,此数字使用您机器上安装的CPU数量。
实现使用mongodb作为其组件的存储。这意味着任务、任务工厂和调度项都使用m01.mongo提供的ORM概念存储在mongodb中。
请参阅p01.remote查看基于ZODB的远程处理器实现,但请注意,p01.remote实现尚未提供工作进程和调度器处理器的分离。
设置
>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing
现在让我们创建两个远程处理器。我们可以使用我们的远程队列站点实现
>>> from zope.security.proxy import removeSecurityProxy >>> from m01.remote import interfaces
我们的测试远程处理器应作为应用程序根可用
>>> rp = root >>> rp <TestProcessor None>
让我们发现可用的任务
>>> dict(root._jobs) {}
任务容器最初是空的,因为我们还没有添加任何任务工厂。现在让我们定义一个简单的任务工厂,它只是回显输入字符串
>>> echoJob = testing.EchoJob({})
现在我们可以设置任务输入
>>> echoJob.input = {'foo': u'blah'}
任务唯一的API要求是可以调用。现在我们确保任务能够工作。注意我们用远程处理器实例调用我们的任务,这是我们的初始化应用程序根
>>> echoJob(root) {'foo': u'blah'}
让我们将任务添加到可用任务列表中
>>> rp.addJobFactory(u'echo', echoJob)
echo任务现在在远程处理器中可用
>>> dict(rp._jobFactories) {u'echo': <EchoJob u'echo'>}
由于远程处理器不能立即完成任务,因此入站任务由队列管理。首先,我们请求执行echo任务
>>> jobid1 = rp.addJob(u'echo', {'foo': 'bar'}) >>> jobid1 u'...'>>> sorted([job.status for job in rp._jobs.values()]) [u'queued']
addJob()函数将名为“echo”的任务安排为使用指定参数执行。该方法返回一个任务ID,我们可以用它来查询任务。addJob()函数将任务标记为已排队。
>>> rp.getJobStatus(jobid1) u'queued'
由于任务尚未被处理,状态设置为“已排队”。此外,尚无结果可用
>>> rp.getJobResult(jobid1) is None True
只要任务没有被处理,就可以取消它
>>> rp.cancelJob(jobid1) >>> rp.getJobStatus(jobid1) u'cancelled'>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled']
默认情况下不会启动工作进程处理器
>>> rp.isProcessing False
为了获得干净的日志环境,让我们清除日志堆栈
>>> logger.clear()
现在我们可以通过调用startProcessor来启动远程处理器
>>> rp.startProcessor()
看吧 - 远程处理器正在处理
>>> rp.isProcessing True
检查日志将证明已启动远程处理器
>>> print logger m01.remote INFO Processor 'root-worker' started
让我们再次停止处理器
>>> rp.stopProcessor() >>> rp.isProcessing False
现在让我们从一个处理过的任务中获取结果,但首先提交新添加的任务
>>> jobid2 = rp.addJob(u'echo', {'foo': u'bar'}) >>> transaction.commit()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'queued']
现在创建一个工作进程并调用我们的简单工作进程来处理新任务
>>> class FakeWorker(object): ... ... def __init__(self, rp): ... self.rp = rp ... ... def __call__(self): ... try: ... result = self.rp.processNextJob() ... transaction.commit() ... except Exception, error: ... transaction.commit()>>> worker = FakeWorker(rp) >>> worker()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed']
首先检查任务是否已处理
>>> rp.getJobStatus(jobid2) u'completed'>>> rp.getJobResult(jobid2) {u'foo': u'bar'}
错误处理
现在,让我们定义一个会引发错误的新的任务
>>> errorJob = testing.RemoteExceptionJob() >>> rp.addJobFactory(u'error', errorJob)
现在添加并执行它
>>> jobid3 = rp.addJob(u'error') >>> transaction.commit() >>> worker()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed', u'error']
现在让我们看看发生了什么
>>> rp.getJobStatus(jobid3) u'error' >>> errors = rp.getJobErrors(jobid3) >>> errors [<JobError u'...'>]
这样的JobError项提供了以下数据
>>> error = tuple(errors)[0] >>> data = error.dump() >>> data = m01.mongo.dictify(data) >>> pprint(data) {'_id': ObjectId('...'), '_type': u'JobError', 'created': datetime.datetime(..., ..., ..., ..., ..., ..., ..., tzinfo=<bson.tz_util.FixedOffset object at ...>), 'tb': u"<p>Traceback (most recent call last):..."}
如您所见,存储为tb的跟踪信息是最重要的信息
>>> print data['tb'] <p>Traceback (most recent call last):</p> <ul> <li> Module m01.remote.processor, line 297, in _processJob<br /> job.output = job(self)</li> <li> Module m01.remote.testing, line 86, in __call__<br /> raise exceptions.RemoteException('An error occurred.')</li> </ul><p>RemoteException: An error occurred.<br /> </p>
尝试用不太好的错误也试试
>>> fatalJob = testing.FatalExceptionJob() >>> rp.addJobFactory(u'fatal', fatalJob)
现在添加并执行它
>>> jobid4 = rp.addJob(u'fatal') >>> transaction.commit() >>> worker()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed', u'error', u'queued']>>> job4 = rp._jobs[jobid4] >>> job4.retryCounter 1 >>> job4.status == u'queued' True>>> job4.errors [<JobError u'...'>]
再次处理这个任务,但首先将我们的retryTime设置为过时的值,以模拟自上次调用以来时间已经过去
>>> import datetime >>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC) >>> transaction.commit() >>> worker()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed', u'error', u'queued']>>> job4 = rp._jobs[jobid4] >>> job4.retryCounter 2>>> job4.errors [<JobError u'...'>, <JobError u'...'>]
再次处理任务的第3次。现在它不会重新抛出异常,但错误消息会附加到错误列表中。
>>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC) >>> transaction.commit() >>> worker()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed', u'error', u'error']
现在让我们看看发生了什么
>>> job4 = rp._jobs[jobid4] >>> job4.retryCounter 3>>> job4.status u'error'>>> rp.getJobStatus(jobid4) u'error'>>> job4.errors [<JobError u'...'>, <JobError u'...'>, <JobError u'...'>]>>> rp.getJobErrors(jobid4) [<JobError u'...'>, <JobError u'...'>, <JobError u'...'>]
出于管理目的,远程处理器还允许您检查所有任务
>>> pprint(dict(rp._jobs)) {u'...': <EchoJob u'...' ...>, u'...': <EchoJob u'...' ...>, u'...': <RemoteExceptionJob u'...' ...>, u'...': <FatalExceptionJob u'...' ...>}
要删除不再需要的任务,我们可以使用removeJobs方法。
>>> jobid8 = rp.addJob(u'echo', {'blah': 'blah'}) >>> transaction.commit()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed', u'error', u'error', u'queued']>>> rp.removeJobs() {u'cancelled': 1, u'completed': 1, u'error': 2}>>> sorted([job.status for job in rp._jobs.values()]) [u'queued']
现在处理最后一个待处理任务,并确保我们不会得到更多任务
>>> rp.pullNextJob() <EchoJob u'...' ...>
线程行为
每个远程处理器都在一个单独的线程中运行,允许它们独立操作。任务应该设计成避免冲突错误。
让我们启动我们在此点定义的远程处理器,并看看因此运行了哪些线程
>>> rp.startProcessor() >>> import pprint >>> import threading >>> def show_threads(): ... threads = [t for t in threading.enumerate() ... if t.getName().startswith('root')] ... threads.sort(key=lambda t: t.getName()) ... pprint.pprint(threads) >>> show_threads() [<Thread(root-worker, started daemon ...)>]
让我们停止远程处理器,并给后台线程一个机会来接收消息
>>> rp.stopProcessor() >>> import time >>> time.sleep(2)
线程已经退出了
>>> print [t for t in threading.enumerate() ... if t.getName().startswith('root')] []
任务工作者
队列中任务的真正处理由一个称为任务工作者的单独组件来处理。此组件通常在自己的线程中运行,并提供了自己的主循环。
>>> import time >>> import transaction
worker模块提供了一个任务工作者,它一次执行一个任务。另一个工作者根据调度项设置安排新任务。让我们创建必要的组件来测试任务工作者
创建远程处理器
>>> from m01.remote import testing >>> rp = root >>> rp.isProcessing False>>> rp.isScheduling False
注册一个简单地睡眠并写入消息的任务
>>> data = {'retryDelay': 1} >>> sleepJob = testing.SleepJob(data) >>> rp.addJobFactory(u'sleep', sleepJob)
SimpleJobWorker
此工作者一次执行一个任务。它被设计用于耗时较长且占用计算机大部分处理能力的任务。
让我们首先注册几个任务
>>> jobid1 = rp.addJob(u'sleep', (0.04, 1)) >>> time.sleep(0.2) >>> jobid2 = rp.addJob(u'sleep', (0.1, 2)) >>> time.sleep(0.2) >>> jobid3 = rp.addJob(u'sleep', (0, 3)) >>> time.sleep(0.2) >>> jobid4 = rp.addJob(u'sleep', (0.08, 4)) >>> time.sleep(0.2) >>> transaction.commit()
现在让我们首先检查我们是否可以访问任务
>>> job = rp._jobs.get(jobid1) >>> job <SleepJob u'...' ...>
让我们尝试如果任务是否准备好处理
>>> rp.getJobStatus(jobid1) u'queued'>>> rp.getJobStatus(jobid2) u'queued'>>> rp.getJobStatus(jobid3) u'queued'>>> rp.getJobStatus(jobid4) u'queued'
让我们首先直接执行一个任务。simple worker构造函数的第一个参数是远程处理器实例。所有其他参数都是可选的,可以在RemoteProcessor类中定义为工作者参数,请参阅jobWorkerArguments和schedulerWorkerArguments
>>> from m01.remote.worker import SimpleJobWorker >>> worker = SimpleJobWorker(rp, waitTime=0.0)
现在让我们处理第一个任务。我们清除日志,我们还必须结束任何现有的交互,以便在这个线程中处理任务
>>> logger.clear()>>> from zope.security import management >>> management.endInteraction()>>> worker.doProcessNextJob() True>>> print logger m01.remote INFO Job: 1
现在让我们在远程处理器内部使用工作者。由于工作者构造函数还接受其他参数,因此也指定了这些参数
>>> rp.jobWorkerFactory = SimpleJobWorker >>> rp.jobWorkerFactory <class 'm01.remote.worker.SimpleJobWorker'>>>> rp.jobWorkerArguments {'waitTime': 0.0}
为了测试目的,等待时间已设置为0。它实际上默认设置为1秒。现在让我们开始处理任务,稍等片刻,以便所有任务都完成,然后再次停止处理
>>> rp.startProcessor() >>> transaction.commit()>>> time.sleep(0.5)>>> rp.stopProcessor() >>> transaction.commit()>>> time.sleep(0.5)
日志显示所有任务都已处理。但更重要的是,它们都是按照定义的顺序完成的。注意,第一个任务在启动远程处理器之前就被处理了。是的,这意味着远程处理器可以在队列未启动的情况下处理任务。启动远程处理器仅意味着任务被处理,而不必手动执行。
>>> print logger m01.remote INFO Job: 1 m01.remote INFO Processor 'root-worker' started m01.remote INFO Job: 2 m01.remote INFO Job: 3 m01.remote INFO Job: 4 m01.remote INFO Processor 'root-worker' stopped>>> logger.clear()
任务中的事务
使用SimpleJobWorker,任务_不应该_更改事务状态,因为远程处理器对任务的行政管理和任务本身都在同一事务中运行,所以从任务内部中止它可能会搞乱行政部分。
这是一个回归测试,中止任务内部的事务不会导致无限循环(因为SimpleJobWorker在事务内部拉取任务,所以如果它被中止,任务将保留在队列中)
>>> testing.testCounter 0>>> counter = 0 >>> data = {'counter': counter} >>> abortJob = testing.TransactionAbortJob(data) >>> rp.addJobFactory(u'abortJob', abortJob) >>> jobid = rp.addJob(u'abortJob', (1)) >>> time.sleep(0.5) >>> jobid = rp.addJob(u'abortJob', (2)) >>> transaction.commit()>>> rp.startProcessor() >>> transaction.commit() >>> time.sleep(0.5)>>> rp.stopProcessor() >>> transaction.commit() >>> time.sleep(0.5)>>> transaction.abort() # prevent spurious conflict errors >>> testing.testCounter 2>>> print logger m01.remote INFO Processor 'root-worker' started m01.remote INFO Job: 1 m01.remote INFO Job: 2 m01.remote INFO Processor 'root-worker' stopped
重置测试计数器
>>> testing.testCounter = 0
MultiJobProcessor
多线程任务工作者可以同时执行多个任务。它被设计用于耗时较长但使用很少处理能力的任务。
让我们添加一些新的作业来执行
>>> jobid1 = rp.addJob(u'sleep', (0.04, 1)) >>> time.sleep(0.2) >>> jobid2 = rp.addJob(u'sleep', (1.0, 2)) >>> time.sleep(0.2) >>> jobid3 = rp.addJob(u'sleep', (0, 3)) >>> time.sleep(0.2) >>> jobid4 = rp.addJob(u'sleep', (0.2, 4)) >>> time.sleep(0.2) >>> transaction.commit()
在测试远程处理器中的工作者之前,我们先单独查看每个方法。因此我们实例化工作者
>>> from m01.remote.worker import MultiJobWorker >>> worker = MultiJobWorker(rp, waitTime=0, maxThreads=2)
最大线程数也可以设置
>>> worker.maxThreads 2
任何时间都可以审查所有工作线程
>>> worker.threads []>>> from zope.security import management >>> management.endInteraction()
让我们拉取一个新的作业
>>> job = worker.doPullNextJob() >>> job <SleepJob u'...' ...>
在执行之前,我们需要先拉取作业,这样数据库就会标记作业为正在处理,没有新的线程可以获取相同的作业。如您所见,作业被标记为处理状态
>>> job.status u'processing'
一旦我们拉取了特定的作业,我们就可以处理它
>>> logger.clear() >>> print logger>>> worker.doProcessJob(job.__name__)>>> print logger m01.remote INFO Job: 1
现在让我们看看如何在任务服务中使用处理器。这主要意味着设置处理器工厂
>>> management.newInteraction()>>> rp.jobWorkerFactory = MultiJobWorker >>> rp.jobWorkerArguments = {'waitTime': 1.0, 'maxThreads': 2} >>> transaction.commit()>>> logger.clear()
现在让我们处理剩余的作业
>>> rp.startProcessor() >>> transaction.commit() >>> time.sleep(1.5)>>> rp.stopProcessor() >>> transaction.commit() >>> time.sleep(0.5)
如您所见,这次作业不再按顺序完成,因为它们执行所需的时间不同
>>> print logger m01.remote INFO Processor 'root-worker' started m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 3 m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 4 m01.remote INFO Job: 2 m01.remote INFO Processor 'root-worker' stopped
现在我们将线程限制设置为四,构建一组新的作业以证明所有作业将同时运行
>>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 4}>>> jobid1 = rp.addJob(u'sleep', (0.3, 1)) >>> time.sleep(0.2) >>> jobid2 = rp.addJob(u'sleep', (0.4, 2)) >>> time.sleep(0.2) >>> jobid3 = rp.addJob(u'sleep', (0.1, 3)) >>> time.sleep(0.2) >>> jobid4 = rp.addJob(u'sleep', (0.5, 4)) >>> time.sleep(0.2) >>> transaction.commit()
如果所有任务一次性处理,作业3应该首先完成。您还可以看到,作业4在工作者记录处理之前立即处理
>>> logger.clear()>>> rp.startProcessor() >>> transaction.commit()>>> time.sleep(1.0)>>> rp.stopProcessor() >>> transaction.commit() >>> time.sleep(0.5)>>> print logger m01.remote INFO Processor 'root-worker' started m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 3 m01.remote INFO Job: 1 m01.remote INFO Job: 2 m01.remote INFO Job: 4 m01.remote INFO Processor 'root-worker' stopped
现在将线程限制设置为两个,构建一组新的作业以证明不会同时运行超过两个线程
>>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 2} >>> transaction.commit()>>> jobid1 = rp.addJob(u'sleep', (0.3, 1)) >>> time.sleep(0.2) >>> jobid2 = rp.addJob(u'sleep', (0.4, 2)) >>> time.sleep(0.2) >>> jobid3 = rp.addJob(u'sleep', (0.2, 3)) >>> time.sleep(0.2) >>> jobid4 = rp.addJob(u'sleep', (0.5, 4)) >>> time.sleep(0.2) >>> transaction.commit()
如果所有任务一次性处理,作业3应该首先完成,但由于作业需要等待可用的线程,它将排在第三位。现在我们可以运行作业并查看结果
>>> logger.clear()>>> rp.startProcessor() >>> transaction.commit()>>> time.sleep(1.5)>>> rp.stopProcessor() >>> transaction.commit() >>> time.sleep(0.5)>>> print logger m01.remote INFO Processor 'root-worker' started m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 1 m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 2 m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 3 m01.remote INFO Job: 4 m01.remote INFO Processor 'root-worker' stopped
调度器
调度器概念实现为包含调度项的附加调度容器。
>>> from m01.mongo import UTC >>> import m01.remote.scheduler >>> from m01.remote import interfaces >>> from m01.remote import testing
现在让我们先获取我们的测试远程处理器,该处理器包含我们的调度容器
>>> remoteProcessor = root >>> remoteProcessor <TestProcessor None>>>> scheduler = remoteProcessor._scheduler>>> tuple(scheduler.values()) ()
延迟
我们可以为延迟作业处理添加一个调度项。让我们添加这样一个项
>>> import datetime >>> def getNextTime(dt, seconds): ... return dt + datetime.timedelta(seconds=seconds)>>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC) >>> now10 = getNextTime(now, 10) >>> delay = 10 >>> data = {'jobName': u'echo 1', 'active': True, 'delay': delay, ... 'retryDelay': 5, 'nextCallTime': now10} >>> firstEcho = m01.remote.scheduler.Delay(data) >>> interfaces.IDelay.providedBy(firstEcho) True
延迟设置为10
>>> firstEcho.delay 10
和重试延迟设置为5
>>> firstEcho.retryDelay 5
并将显式的下一次调用时间设置为现在+10
>>> firstEcho.nextCallTime == getNextTime(now, 10) True
并且我们的重试时间设置为None
>>> firstEcho.retryTime is None True
现在我们可以将延迟项添加到调度器
>>> scheduler.add(firstEcho) u'...'
如您所见,调度器包含一个项
>>> sorted(scheduler.values()) [<Delay ... for: u'echo 1'>]
接下来,我们将测试一些调度器API方法。首先检查我们是否可以使用updateRetryTime更新缓存中项的重试时间
>>> scheduler.updateRetryTime(firstEcho.dump(), now) False
如您所见,我们没有得到新的重试时间。这是因为我们没有使用正确的调用时间。让我们尝试使用正确的下一次调用时间
>>> now10 = getNextTime(now, 10) >>> now15 = getNextTime(now, 15) >>> retryTime = scheduler.updateRetryTime(firstEcho.dump(), now10) >>> retryTime == now15 True
如您所见,新的重试时间是使用5秒的重试延迟。这个重试时间用于锁定项。这意味着项在时间过去之前不会被选中。
现在让我们尝试另一个内部API方法,它可以获取添加缓存中的下一个项
>>> scheduler.getNextCachedItem(now)
如您所见,该方法没有返回项,让我们尝试使用下一个计划调用时间
>>> nextCallTime = firstEcho.nextCallTime >>> scheduler.getNextCachedItem(now10) <Delay ... for: u'echo 1'>
如您所见,重试时间基于下一个调用时间和重试延迟进行设置
>>> firstEcho.retryTime == getNextTime(nextCallTime, 5) True
现在重要的一部分。让我们测试我们的方法,该方法负责获取包括Mongo中的下一个项。此方法使用上述两个方法。当然,使用当前时间我们将不会获取任何项
>>> scheduler.pullNextSchedulerItem(now) is None True
但现在已经需要另一个下一个调用时间,因为之前的调用更新了项的下一个调用时间。让我们首先检查下一个调用时间
>>> firstEcho.nextCallTime == now10 True
但如您所见,重试时间在我们的上次测试中已经设置。这意味着如果我们至少使用比重试时间更大的时间,我们才会得到一个项
>>> firstEcho.retryTime == now15 True>>> scheduler.pullNextSchedulerItem(now10)>>> scheduler.pullNextSchedulerItem(now15) <Delay ... for: u'echo 1'>
现在,让我们检查我们的计划项时间
>>> now20 = getNextTime(now15, 5) >>> firstEcho.nextCallTime == now10 True
注意,我们的重试时间是用当前调用时间和重试延迟计算的。如果我们使用调用时间作为重试时间计算的基础,那就没有意义了
>>> firstEcho.retryTime == now20 True
pullNextSchedulerItem方法返回一个挂起的项或None,因为我们没有挂起的项
>>> scheduler.pullNextSchedulerItem(now) is None True
现在让我们在某个调度器时间内添加第二个调度项
>>> import datetime >>> delay = 10 >>> data = {'jobName': u'echo 2', 'active': True, 'delay': delay, ... 'retryDelay': 5} >>> secondEcho = m01.remote.scheduler.Delay(data)>>> scheduler.add(secondEcho) u'...'>>> sorted(scheduler.values(), key=lambda x:(x.__name__, x.__name__)) [<Delay ... for: u'echo 1'>, <Delay ... for: u'echo 2'>]>>> scheduler.remove(firstEcho) >>> scheduler.remove(secondEcho) >>> tuple(scheduler.values()) ()
adjustCallTime
在我们测试cron项目之前,让我们测试一下我们的方法,该方法可以将给定的日期时间重置为最小的起始点。例如,如果小时被用作计算基础,我们需要从第一分钟开始计数。
>>> from m01.remote.scheduler import adjustCallTime>>> now = datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC) >>> now datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC)>>> item = m01.remote.scheduler.Cron({'jobName': u'bar', 'minute': [5]}) >>> adjustCallTime(item, now) datetime.datetime(2010, 10, 25, 16, 6, 0, 123, tzinfo=UTC)
Cron
可能更有趣的实现是cron计划任务项。这个cron项目可以在特定给定的时间安排任务。让我们设置这样的cron项目。
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5} >>> cronItem = m01.remote.scheduler.Cron(data)
cronItem提供了ISchedulerItem和ICron接口。
>>> interfaces.ISchedulerItem.providedBy(cronItem) True>>> interfaces.ICron.providedBy(cronItem) True
如您所见,cron项目还提供了一个retryDelay。
>>> cronItem.retryDelay 5
让我们首先解释一下它是如何工作的。cron计划程序提供了一个下一次调用时间戳。如果计算出的下一次调用时间小于上一次调用时间,cron计划任务项将计算新的下一次调用时间,并将其存储为nextCallTime,同时将上一个nextCallTime返回。这将确保我们至少进行一次时间计算调用,因为每次cron计划任务项被询问下一次调用时间时,都会使用存储的nextCallTime。只有当现有的下一次调用时间小于给定的调用时间时,cron计划任务项才会计算下一次调用时间。
现在让我们测试cron作为计划任务项。设置一个简单的cron项目,周期为5分钟。
>>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC) >>> now datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)>>> data = {'jobName': u'echo cron', 'active': True, 'retryDelay': 5, ... 'minute': [5], 'nextCallTime': now} >>> cronEcho = m01.remote.scheduler.Cron(data)
现在将项目添加到计划程序。
>>> scheduler.add(cronEcho) u'...'
如您所见,我们的cron项目是基于给定的nextCallTime进行计划的。
>>> cronEcho.nextCallTime datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)
retrytime为空。
>>> cronEcho.retryTime is None True
分钟列表包含我们的5分钟。
>>> cronEcho.minute [5]>>> cronEcho.hour []>>> cronEcho.dayOfMonth []>>> cronEcho.month []>>> cronEcho.dayOfWeek []
计划程序包含一个cron项目。
>>> tuple(scheduler.values()) (<Cron ... for: u'echo cron'>,)
现在我们可以根据cron计划任务项定义的jobName“echo”获取工作项,如果调用pullNextSchedulerItem。
>>> scheduler.pullNextSchedulerItem(now) <Cron ... for: u'echo cron'>
在这次调用中,retryTime根据retryDelay设置。
>>> cronEcho.retryTime datetime.datetime(2010, 10, 1, 0, 0, 5, tzinfo=UTC)
现在让我们测试不同的cron设置。注意,我们为分钟、小时、月份、星期几和每月的日期提供了值列表。这意味着你可以将任务安排为每15分钟执行一次,如果你将分钟设置为(0, 15, 30, 45),或者如果你想在每个小时后的第15分钟设置任务,你可以将分钟设置为(15,)。如果你设置多个参数,例如分钟、小时或天数等,所有参数都必须符合给定的时间。
让我们从一个每小时的第1分钟和第2分钟的cron计划程序开始。通常,cron计划任务项会将int(time.time())作为nextCallTime值。为了测试我们的cron计划任务项,我们使用显式的startTime值为0(零)。
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'minute': [0, 1]} >>> cronItem = m01.remote.scheduler.Cron(data)
下一次调用时间是基于给定的startTime值设置的。这意味着第一次调用将在0(零)分钟。
>>> cronItem.nextCallTime is None True
现在让我们调用getNextCallTime,如您所见,我们将得到None作为nextCallTime,因为我们没有在cron初始化期间设置nextCallTime,并且nextCallTime被设置为下一分钟。
>>> cronItem.getNextCallTime(now) is None True>>> cronItem.nextCallTime datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)
现在让我们再次调用getNextCallTime,如您所见,我们将得到我们在对象初始化期间计算的nextCallTime,并且nextCallTime被设置为下一分钟。
如果我们使用加上5秒的调用时间,我们仍然会得到缓存的1分钟下一次调用时间,并且不会生成新的下一次调用时间,因为这个时间已经是在未来的。
>>> cronItem.getNextCallTime(getNextTime(now, 5)) datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)>>> cronItem.nextCallTime datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)
如果我们使用等于或大于从缓存的下一次调用时间开始计算1分钟延迟的调用时间,我们将得到缓存的调用时间作为值,就像我们在较小的调用时间中得到的那样(见上面的示例)。
>>> cronItem.getNextCallTime(getNextTime(now, 65)) datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)>>> cronItem.nextCallTime datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)
所有未来的调用,如果时间小于下一次调用时间,都将返回当前的下一次调用时间,而不会计算任何新的时间。
>>> cronItem.getNextCallTime(getNextTime(now, 125)) datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)>>> cronItem.getNextCallTime(getNextTime(now, 1*60*60)) datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)
记住,getNextCallTime返回之前计算的下一次调用时间,而新的计算的下一次调用时间将被存储为nextCallTime。为了更简单的测试输出,我们定义了一个显示时间计算的方法。
分钟
让我们开始测试时间表。
>>> def getNextCallTime(cron, dt, seconds=None): ... """Return stored and new calculated nextCallTime""" ... if seconds is None: ... callTime = dt ... else: ... callTime = getNextTime(dt, seconds) ... nextCallTime = cron.getNextCallTime(callTime) ... return '%s --> %s' % (nextCallTime, cron.nextCallTime)>>> now = datetime.datetime(1970, 1, 1, 0, 3, 0, tzinfo=UTC) >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'minute': [0, 10], 'nextCallTime':now} >>> item = m01.remote.scheduler.Cron(data)>>> str(now) '1970-01-01 00:03:00+00:00'>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'>>> getNextCallTime(item, now, 1) '1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'>>> getNextCallTime(item, now, 2*60) '1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'>>> getNextCallTime(item, now, 51*60) '1970-01-01 00:10:00+00:00 --> 1970-01-01 01:00:00+00:00'>>> getNextCallTime(item, now, 55*60) '1970-01-01 01:00:00+00:00 --> 1970-01-01 01:00:00+00:00'
小时
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'hour': [2, 13], 'nextCallTime':now} >>> item = m01.remote.scheduler.Cron(data)>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-01 02:00:00+00:00'>>> getNextCallTime(item, now, 2*60*60) '1970-01-01 02:00:00+00:00 --> 1970-01-01 13:00:00+00:00'>>> getNextCallTime(item, now, 4*60*60) '1970-01-01 13:00:00+00:00 --> 1970-01-01 13:00:00+00:00'>>> getNextCallTime(item, now, 13*60*60) '1970-01-01 13:00:00+00:00 --> 1970-01-02 02:00:00+00:00'>>> getNextCallTime(item, now, 15*60*60) '1970-01-02 02:00:00+00:00 --> 1970-01-02 02:00:00+00:00'
月份
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'month': [1, 2, 5, 12], 'nextCallTime':now} >>> item = m01.remote.scheduler.Cron(data)>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-02-01 00:03:00+00:00'>>> getNextCallTime(item, now, 90*24*60*60) '1970-02-01 00:03:00+00:00 --> 1970-05-01 00:03:00+00:00'>>> getNextCallTime(item, now, 120*24*60*60) '1970-05-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'>>> getNextCallTime(item, now, 130*24*60*60) '1970-12-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'>>> getNextCallTime(item, now, 360*24*60*60) '1970-12-01 00:03:00+00:00 --> 1971-01-01 00:03:00+00:00'
dayOfWeek [0..6]
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'dayOfWeek': [0, 2, 4, 5], 'nextCallTime':now} >>> item = m01.remote.scheduler.Cron(data)
当前工作日的now是
>>> now.weekday() 3
这意味着我们的nextCallTime应该使用第4天作为nextCallTime,如果我们用now调用它们。
>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-02 00:03:00+00:00'
再过一天,我们将得到工作日4(跳过)
>>> getNextCallTime(item, now, 24*60*60) '1970-01-02 00:03:00+00:00 --> 1970-01-03 00:03:00+00:00'
再过一天,我们将得到工作日5(增加)
>>> getNextCallTime(item, now, 2*24*60*60) '1970-01-03 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'
再过一天,我们将得到工作日6(跳过)
>>> getNextCallTime(item, now, 3*24*60*60) '1970-01-05 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'
再过一天,我们将得到工作日0(增加)
>>> getNextCallTime(item, now, 4*24*60*60) '1970-01-05 00:03:00+00:00 --> 1970-01-07 00:03:00+00:00'
dayOfMonth [1..31]
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'dayOfMonth': [2, 12, 21, 30], 'nextCallTime': now} >>> item = m01.remote.scheduler.Cron(data)>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-02 00:00:00+00:00'>>> getNextCallTime(item, now, 12*24*60*60) '1970-01-02 00:00:00+00:00 --> 1970-01-21 00:00:00+00:00'>>> getNextCallTime(item, now, 31*24*60*60) '1970-01-21 00:00:00+00:00 --> 1970-02-02 00:00:00+00:00'
组合
组合一些属性
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'minute': [10], 'dayOfMonth': [1, 10, 20, 30], ... 'nextCallTime': now} >>> item = m01.remote.scheduler.Cron(data)>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'>>> getNextCallTime(item, now, 10*60) '1970-01-01 00:10:00+00:00 --> 1970-01-01 01:10:00+00:00'>>> getNextCallTime(item, now, 10*24*60*60) '1970-01-01 01:10:00+00:00 --> 1970-01-20 00:10:00+00:00'>>> getNextCallTime(item, now, 20*24*60*60) '1970-01-20 00:10:00+00:00 --> 1970-01-30 00:10:00+00:00'
另一个示例
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'minute': [10], 'hour': [4], 'dayOfMonth': [1, 12, 21, 30], ... 'nextCallTime': now} >>> item = m01.remote.scheduler.Cron(data)>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-01 04:10:00+00:00'>>> getNextCallTime(item, now, 10*60) '1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'>>> getNextCallTime(item, now, 4*60*60) '1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'>>> getNextCallTime(item, now, 5*60*60) '1970-01-01 04:10:00+00:00 --> 1970-01-12 04:10:00+00:00'
变更
3.0.0 (2015-11-10)
支持 pymongo >= 3.0.0,并将版本号设置为3.0.0,以反映 pymongo >= 3.0.0 的兼容性
0.6.0 (2013-06-28)
特性:实现了 JobError 作为 Job 的子项。并将之前的 JobError 重命名为 RemoteException。此更改需要您在更新之前删除作业列表中的所有以前的 JobError 作业。同时,在您的代码中使用 RemoteException 代替 JobError。新的 JobError 子项提供了更好的错误回溯信息和创建日期。
特性:实现了更好的错误处理,保存格式化的回溯字符串
0.5.1 (2012-11-18)
添加了 MANIFEST.in 文件
删除了 p01.i18n 包依赖项
允许删除具有所有状态的作业
拆分调度器和容器,并将调度器部分移动到混合类中
切换到 bson 导入
在 getBatchData 签名中反映更改
修复日期时间比较,四舍五入毫秒
调整不同的架构描述,使用标题中使用的相同消息 ID
删除了未使用的 ID
0.5.0 (2011-08-19)
初始发布
项目详情
m01.remote-3.0.0.zip 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | d7fa107b72a17164ed53d6ea7558718ff40ec44d8e4f0d29bffae7eb8af64267 |
|
MD5 | 86d4c3febb7762b467a6c0561fa52538 |
|
BLAKE2b-256 | b87e79d9c86a988017d46e1bfd5bd615bad6e9e1dd00eb2b084406cf8fd5819e |