zope 3的远程任务客户端工具
项目描述
远程任务执行
本软件包提供了一个远程任务执行Web服务的实现,允许在另一台服务器上执行预定义的任务。还可以在特定时间运行定时任务。这些服务以两种方式有用
它们使我们能够完成特定机器上不可用原生的任务。例如,使用Linux(我们Web服务器可能运行的操作系统),无法将AVI文件转换为Flash(R)电影。
它们还允许将昂贵的操作移动到其他服务器。这在处理高流量网站上的视频转换时非常有价值。
安装
在 zope.conf 中定义应在启动时启动的 remotetasks,如下所示
<product-config lovely.remotetask> autostart site1@TestTaskService1, site2@TestTaskService2, @RootTaskService </product-config>
请注意,直接在根文件夹中注册的服务可以通过仅用 @ 符号作为前缀来引用。可以省略站点名称。例如,上面引用的 RootTaskService。
这将在Zope启动时启动Remotetasks。
用法
>>> STOP_SLEEP_TIME = 0.02
现在让我们创建一个单一的服务
>>> from lovely import remotetask >>> service = remotetask.TaskService()
对象应位于,以便它有一个名称
>>> from zope.app.folder import Folder >>> site1 = Folder() >>> root['site1'] = site1 >>> from zope.app.component.site import LocalSiteManager >>> from zope.security.proxy import removeSecurityProxy >>> sm = LocalSiteManager(removeSecurityProxy(site1)) >>> site1.setSiteManager(sm)>>> sm['default']['testTaskService1'] = service >>> service = sm['default']['testTaskService1'] # caution! proxy >>> service.__name__ u'testTaskService1' >>> service.__parent__ is sm['default'] True
让我们以 TestTaskService1 的名称注册它
>>> from zope import component >>> from lovely.remotetask import interfaces >>> sm = site1.getSiteManager() >>> sm.registerUtility(service, interfaces.ITaskService, ... name='TestTaskService1')
我们可以发现可用的任务
>>> service.getAvailableTasks() {}
此列表最初为空,因为我们尚未注册任何任务。现在让我们定义一个简单的任务,该任务简单地回显输入字符串
>>> def echo(input): ... return input>>> import lovely.remotetask.task >>> echoTask = remotetask.task.SimpleTask(echo)
转换器唯一的API要求是可调用。现在我们确保任务工作正常
>>> echoTask(service, 1, input={'foo': 'blah'}) {'foo': 'blah'}
现在让我们将任务注册为实用工具
>>> import zope.component >>> zope.component.provideUtility(echoTask, name='echo')
echo任务现在在服务中可用
>>> service.getAvailableTasks() {u'echo': <SimpleTask <function echo ...>>}
由于服务不能立即完成一个任务,因此由队列管理传入的工作。首先,我们要求执行echo任务
>>> jobid = service.add(u'echo', {'foo': 'bar'}) >>> jobid 1392637175
add() 函数安排执行带有指定参数的“echo”任务。该方法返回一个作业ID,我们可以用它来查询作业。默认情况下,add() 函数立即添加并启动作业。有时我们需要作业id,但不想立即启动作业。请参阅 startlater.txt 了解详情。
>>> service.getStatus(jobid) 'queued'
由于作业尚未处理,状态设置为“队列”。此外,尚未提供任何结果
>>> service.getResult(jobid) is None True
只要作业未被处理,就可以取消它
>>> service.cancel(jobid) >>> service.getStatus(jobid) 'cancelled'
默认情况下不会启动服务
>>> service.isProcessing() False
TaskService 会在 IDatabaseOpenedEvent 触发时自动启动(如果已在 zope.conf 中指定)- 让我们模拟 zope.conf 设置
>>> class Config(object): ... mapping = {} ... def getSectionName(self): ... return 'lovely.remotetask' >>> config = Config() >>> config.mapping['autostart'] = ( ... 'site1@TestTaskService1, site2@TestTaskService2,@RootTaskService') >>> from zope.app.appsetup.product import setProductConfigurations >>> setProductConfigurations([config]) >>> from lovely.remotetask.service import getAutostartServiceNames >>> getAutostartServiceNames() ['site1@TestTaskService1', 'site2@TestTaskService2', '@RootTaskService']
请注意,RootTaskService 是用于服务直接在根注册的情况。我们在单独的脚注中测试此用例,以免破坏本文档的流程。 [3]
为了获得干净的日志环境,让我们清除日志堆栈
>>> log_info.clear()
在Zope启动时,会触发 IDatabaseOpenedEvent,并调用 bootStrap() 方法
>>> from ZODB.tests import util >>> import transaction >>> db = util.DB() >>> from zope.app.publication.zopepublication import ZopePublication >>> conn = db.open() >>> conn.root()[ZopePublication.root_name] = root >>> transaction.commit()
触发事件
>>> from zope.app.appsetup.interfaces import DatabaseOpenedWithRoot >>> from lovely.remotetask.service import bootStrapSubscriber >>> event = DatabaseOpenedWithRoot(db) >>> bootStrapSubscriber(event)
就这样 - 服务正在处理
>>> service.isProcessing() True
检查日志将证明已启动的服务
>>> print log_info lovely.remotetask INFO handling event IStartRemoteTasksEvent lovely.remotetask INFO service TestTaskService1 on site site1 started lovely.remotetask ERROR site site2 not found lovely.remotetask INFO service RootTaskService on site root started
对根级服务中的作业的验证在另一个脚注中进行 [4]
在同一个站点处理大量服务时,可以使用星号(*)来启动服务。使用site@*时,表示启动该站点上的所有服务
但首先停止所有处理服务
>>> service.stopProcessing() >>> service.isProcessing() False>>> root_service.stopProcessing() >>> root_service.isProcessing() False>>> import time >>> time.sleep(STOP_SLEEP_TIME)
然后重置记录器
>>> log_info.clear()
使用带星号的服务名称重置产品配置
>>> config.mapping['autostart'] = 'site1@*' >>> setProductConfigurations([config]) >>> getAutostartServiceNames() ['site1@*']
再次触发事件将启动配置站点上的所有服务
>>> bootStrapSubscriber(event)>>> service.isProcessing() True>>> root_service.isProcessing() False
让我们检查日志记录
>>> print log_info lovely.remotetask INFO handling event IStartRemoteTasksEvent lovely.remotetask INFO service TestTaskService1 on site site1 started
在许多站点处理大量服务时,可以使用星号(*)来启动服务。使用@时,表示启动所有站点上的所有服务
>>> service.stopProcessing() >>> service.isProcessing() False>>> import time >>> time.sleep(STOP_SLEEP_TIME)
使用带星号的服务名称重置产品配置
>>> config.mapping['autostart'] = '*@*' >>> setProductConfigurations([config]) >>> getAutostartServiceNames() ['*@*']
…并重置记录器
>>> log_info.clear()
再次触发事件。现在应该启动所有服务
>>> bootStrapSubscriber(event)>>> service.isProcessing() True>>> root_service.isProcessing() True
让我们检查日志记录
>>> print log_info lovely.remotetask INFO handling event IStartRemoteTasksEvent lovely.remotetask INFO service RootTaskService on site root started lovely.remotetask INFO service TestTaskService1 on site site1 started
在许多站点处理特定服务时,可以使用星号(*)来启动服务。使用*@service表示启动所有站点上的名为service的服务
>>> service.stopProcessing() >>> service.isProcessing() False>>> root_service.stopProcessing() >>> root_service.isProcessing() False>>> import time >>> time.sleep(STOP_SLEEP_TIME)
使用带星号的服务名称重置产品配置
>>> config.mapping['autostart'] = '*@TestTaskService1' >>> setProductConfigurations([config]) >>> getAutostartServiceNames() ['*@TestTaskService1']
…并重置记录器
>>> log_info.clear()
再次触发事件。现在应该启动所有服务
>>> bootStrapSubscriber(event)>>> service.isProcessing() True>>> root_service.isProcessing() False
让我们检查日志记录
>>> print log_info lovely.remotetask INFO handling event IStartRemoteTasksEvent lovely.remotetask INFO service TestTaskService1 on site site1 started
如果配置的指令在任何站点上都不匹配任何服务,则日志将显示警告信息
>>> service.stopProcessing() >>> service.isProcessing() False>>> import time >>> time.sleep(STOP_SLEEP_TIME)>>> config.mapping['autostart'] = '*@Foo' >>> setProductConfigurations([config]) >>> getAutostartServiceNames() ['*@Foo']>>> log_info.clear()>>> bootStrapSubscriber(event)>>> service.isProcessing() False>>> root_service.isProcessing() False>>> print log_info lovely.remotetask INFO handling event IStartRemoteTasksEvent lovely.remotetask WARNING no services started by directive *@Foo
最后停止处理并杀死线程。由于测试中没有合适的测试环境,我们将手动调用service.process()
>>> service.stopProcessing() >>> service.isProcessing() False>>> root_service.stopProcessing() >>> root_service.isProcessing() False>>> import time >>> time.sleep(STOP_SLEEP_TIME)
现在让我们读取一个作业
>>> jobid = service.add(u'echo', {'foo': 'bar'}) >>> service.process()>>> service.getStatus(jobid) 'completed' >>> service.getResult(jobid) {'foo': 'bar'}
现在,让我们定义一个会导致错误的新的任务
>>> def error(input): ... raise remotetask.task.TaskError('An error occurred.')>>> zope.component.provideUtility( ... remotetask.task.SimpleTask(error), name='error')
现在添加并执行它
>>> jobid = service.add(u'error') >>> service.process()
现在让我们看看发生了什么
>>> service.getStatus(jobid) 'error' >>> service.getError(jobid) 'An error occurred.'
为了管理目的,服务还允许您检查所有作业
>>> dict(service.jobs) {1392637176: <Job 1392637176>, 1392637177: <Job 1392637177>, 1392637175: <Job 1392637175>}
为了删除不再需要的作业,可以使用clean方法。
>>> jobid = service.add(u'echo', {'blah': 'blah'}) >>> sorted([job.status for job in service.jobs.values()]) ['cancelled', 'completed', 'error', 'queued']>>> service.clean()>>> sorted([job.status for job in service.jobs.values()]) ['queued']
定时任务
计划任务在特定时间执行。
>>> import time >>> from lovely.remotetask.job import CronJob >>> now = 0 >>> time.gmtime(now) (1970, 1, 1, 0, 0, 0, 3, 1, 0)
我们设置了一个任务,每当前一分钟执行一次。下一次调用时间是现在的下一个。
分钟
>>> cronJob = CronJob(-1, u'echo', (), minute=(0, 10)) >>> time.gmtime(cronJob.timeOfNextCall(0)) (1970, 1, 1, 0, 10, 0, 3, 1, 0) >>> time.gmtime(cronJob.timeOfNextCall(10*60)) (1970, 1, 1, 1, 0, 0, 3, 1, 0)
小时
>>> cronJob = CronJob(-1, u'echo', (), hour=(2, 13)) >>> time.gmtime(cronJob.timeOfNextCall(0)) (1970, 1, 1, 2, 0, 0, 3, 1, 0) >>> time.gmtime(cronJob.timeOfNextCall(2*60*60)) (1970, 1, 1, 13, 0, 0, 3, 1, 0)
月份
>>> cronJob = CronJob(-1, u'echo', (), month=(1, 5, 12)) >>> time.gmtime(cronJob.timeOfNextCall(0)) (1970, 5, 1, 0, 0, 0, 4, 121, 0) >>> time.gmtime(cronJob.timeOfNextCall(cronJob.timeOfNextCall(0))) (1970, 12, 1, 0, 0, 0, 1, 335, 0)
星期几[0..6],1970年1月1日是星期三。
>>> cronJob = CronJob(-1, u'echo', (), dayOfWeek=(0, 2, 4, 5)) >>> time.gmtime(cronJob.timeOfNextCall(0)) (1970, 1, 2, 0, 0, 0, 4, 2, 0) >>> time.gmtime(cronJob.timeOfNextCall(60*60*24)) (1970, 1, 3, 0, 0, 0, 5, 3, 0) >>> time.gmtime(cronJob.timeOfNextCall(2*60*60*24)) (1970, 1, 5, 0, 0, 0, 0, 5, 0) >>> time.gmtime(cronJob.timeOfNextCall(4*60*60*24)) (1970, 1, 7, 0, 0, 0, 2, 7, 0)
每月的某一天[1..31]
>>> cronJob = CronJob(-1, u'echo', (), dayOfMonth=(1, 12, 21, 30)) >>> time.gmtime(cronJob.timeOfNextCall(0)) (1970, 1, 12, 0, 0, 0, 0, 12, 0) >>> time.gmtime(cronJob.timeOfNextCall(12*24*60*60)) (1970, 1, 21, 0, 0, 0, 2, 21, 0)
组合
>>> cronJob = CronJob(-1, u'echo', (), minute=(10,), ... dayOfMonth=(1, 12, 21, 30)) >>> time.gmtime(cronJob.timeOfNextCall(0)) (1970, 1, 1, 0, 10, 0, 3, 1, 0) >>> time.gmtime(cronJob.timeOfNextCall(10*60)) (1970, 1, 1, 1, 10, 0, 3, 1, 0)>>> cronJob = CronJob(-1, u'echo', (), minute=(10,), ... hour=(4,), ... dayOfMonth=(1, 12, 21, 30)) >>> time.gmtime(cronJob.timeOfNextCall(0)) (1970, 1, 1, 4, 10, 0, 3, 1, 0) >>> time.gmtime(cronJob.timeOfNextCall(10*60)) (1970, 1, 1, 4, 10, 0, 3, 1, 0)
计划任务也可以用来延迟作业的执行。
>>> cronJob = CronJob(-1, u'echo', (), delay=10,) >>> time.gmtime(cronJob.timeOfNextCall(0)) (1970, 1, 1, 0, 0, 10, 3, 1, 0) >>> time.gmtime(cronJob.timeOfNextCall(1)) (1970, 1, 1, 0, 0, 11, 3, 1, 0)
创建延迟任务
延迟作业将在给定的延迟时间(秒)后执行一次。
>>> count = 0 >>> def counting(input): ... global count ... count += 1 ... return count >>> countingTask = remotetask.task.SimpleTask(counting) >>> zope.component.provideUtility(countingTask, name='counter')>>> jobid = service.addCronJob(u'counter', ... {'foo': 'bar'}, ... delay = 10, ... ) >>> service.getStatus(jobid) 'delayed' >>> service.process(0) >>> service.getStatus(jobid) 'delayed' >>> service.process(9) >>> service.getStatus(jobid) 'delayed'
在10秒时,作业被执行并完成。
>>> service.process(10) >>> service.getStatus(jobid) 'completed'
创建定时任务
这里我们创建了一个计划任务,它在小时过后的10分钟和13分钟运行。
>>> count = 0>>> jobid = service.addCronJob(u'counter', ... {'foo': 'bar'}, ... minute = (10, 13), ... ) >>> service.getStatus(jobid) 'cronjob'
我们处理远程任务,但我们的计划任务没有执行,因为我们执行得太早。
>>> service.process(0) >>> service.getStatus(jobid) 'cronjob' >>> service.getResult(jobid) is None True
现在我们晚10分钟运行远程任务并得到结果。
>>> service.process(10*60) >>> service.getStatus(jobid) 'cronjob' >>> service.getResult(jobid) 1
并且1分钟后它没有被调用。
>>> service.process(11*60) >>> service.getResult(jobid) 1
但是3分钟后它再次被调用。
>>> service.process(13*60) >>> service.getResult(jobid) 2
作业可以被重新安排。
>>> job = service.jobs[jobid] >>> job.update(minute = (11, 13))
更新后,必须在服务中重新安排作业。
>>> service.reschedule(jobid)
现在作业不再在旧的注册分钟10分执行。
>>> service.process(10*60+60*60) >>> service.getResult(jobid) 2
但它在新设置的11分钟执行。
>>> service.process(11*60+60*60) >>> service.getResult(jobid) 3
线程行为
每个任务服务都在单独的线程中运行,允许它们独立操作。任务应该设计成避免数据库中的冲突错误。
让我们现在启动我们定义的任务服务,并查看因此运行的线程
>>> service.startProcessing() >>> root_service.startProcessing()>>> import pprint >>> import threading>>> def show_threads(): ... threads = [t for t in threading.enumerate() ... if t.getName().startswith('remotetasks.')] ... threads.sort(key=lambda t: t.getName()) ... pprint.pprint(threads)>>> show_threads() [<Thread(remotetasks.rootTaskService, started daemon)>, <Thread(remotetasks.site1.++etc++site.default.testTaskService1, started daemon)>]
让我们添加第二个站点,该站点包含一个与第一个站点中服务同名任务服务
>>> site2 = Folder() >>> service2 = remotetask.TaskService()>>> root['site2'] = site2 >>> sm = LocalSiteManager(removeSecurityProxy(site2)) >>> site2.setSiteManager(sm)>>> sm['default']['testTaskService1'] = service2 >>> service2 = sm['default']['testTaskService1'] # caution! proxy
让我们以 TestTaskService1 的名称注册它
>>> sm = site2.getSiteManager() >>> sm.registerUtility( ... service2, interfaces.ITaskService, name='TestTaskService1')
服务要求在可以使用之前先提交到数据库
>>> transaction.commit()
新服务目前没有处理
>>> service2.isProcessing() False
如果我们启动新服务,我们可以看到现在有三个后台线程
>>> service2.startProcessing() >>> show_threads() [<Thread(remotetasks.rootTaskService, started daemon)>, <Thread(remotetasks.site1.++etc++site.default.testTaskService1, started daemon)>, <Thread(remotetasks.site2.++etc++site.default.testTaskService1, started daemon)>]
让我们停止服务,并给后台线程一个接收消息的机会
>>> service.stopProcessing() >>> service2.stopProcessing() >>> root_service.stopProcessing()>>> import time >>> time.sleep(STOP_SLEEP_TIME)
线程已经退出
>>> print [t for t in threading.enumerate() ... if t.getName().startswith('remotetasks.')] []
脚注
检查接口和其他内容
>>> from zope.interface.verify import verifyClass, verifyObject >>> verifyClass(interfaces.ITaskService, remotetask.TaskService) True >>> verifyObject(interfaces.ITaskService, service) True >>> interfaces.ITaskService.providedBy(service) True>>> from lovely.remotetask.job import Job >>> fakejob = Job(1, u'echo', {}) >>> verifyClass(interfaces.IJob, Job) True >>> verifyObject(interfaces.IJob, fakejob) True >>> interfaces.IJob.providedBy(fakejob) True>>> fakecronjob = CronJob(1, u'echo', {}) >>> verifyClass(interfaces.ICronJob, CronJob) True >>> verifyObject(interfaces.ICronJob, fakecronjob) True >>> interfaces.IJob.providedBy(fakecronjob) True
对 lovely.remotetask 的修改
0.5.2 (2010-04-30)
删除了对依赖项zope.publisher不必要的版本要求。
0.5.1 (2010-04-14)
将记录的异常转换为str,因为日志消息应该是字符串。
0.5 (2009-09-10)
修复了SimpleProcessor的bug:如果作业被中止了事务,它将永远不会从队列中移除,而是反复重试。
2009/05/20 (0.4):
像intid那样随机化新作业ID的生成:尝试分配连续的ID,以便它们落入同一个BTree桶中,如果遇到已使用的ID则随机化。
2009/04/05 (0.3):
在添加cron作业的表单中使用下拉小部件,而不是文本输入,以显示可用任务。
通过直接使用其包装的API来删除对zope.app.zapi的依赖。
使用zope.location中的ISite而不是zope.app.component。
使用zc.queue.Queue而不是zc.queue.PersistentQueue,因为PersistentQueue仅适用于CompositeQueue。
将URL更改为pypi。
在ITaskService.clean中使用正确的复数形式的状态(status)。
2008/11/07 0.2.15a1
运行可能会引发AttributeError。已添加处理程序。
2008/02/08 0.2.14:
在'clearAll'期间每100个作业后提交,以避免在取消大量作业时浏览器超时。
2008/01/28 (新)
解决了一些bug,并改进了测试。
向TaskService.add添加了startLater。有关更多信息,请参阅startlater.txt。这有助于将作业添加和开始时间点分开。(不是cron类型)
2007/12/?? (新)
将索引切换到Zope 3.4 KGS,以便我们同意使用的包版本。
使处理器的睡眠时间可变;这是测试所需的,以便测试框架不会比处理器关闭得快。
在isProcessing()中添加了小的优化,以在找到具有正确名称的线程后停止遍历线程。
2007/11/12 0.2.13:
添加了“取消所有”按钮。
修复了与任务服务实例关联线程的bug。
2007/10/28 0.2.12:
使启动更健壮。如果通过ZMI删除已注册的任务服务,则其注册不会删除。如果发生这种情况,则使用自动启动时,Zope无法重新启动。
2007/10/28 0.2.11:
允许在cron作业添加/编辑表单中选择所有可能的“*”时间。
允许取消延迟作业。
2007/10/24 0.2.10:
避免了弃用警告。
2007/10/08 0.2.9:
如果作业状态为ERROR,则不会将cron作业推回队列。
2007/10/08 0.2.8:
增强了启动过程中的日志记录。
2007/10/02 0.2.7:
在buildout.cfg中添加了索引。
增强了自动启动行为:服务可以按以下方式启动:site@*,@service和@
2007/08/07 0.2.6:
修复了排序中的bug,导致列标题永远不会可点击。
2007/08/07 0.2.5:
不再需要“作业”ZMI视图的会话支持。
2007/08/06 0.2.4:
修复了导致处理线程不必要地保持进程活跃的bug。
2007/07/26 0.2.3:
现在处理直接在根处注册的任务服务的用例。在产品配置中引用此类服务时,必须以@开头,而不是<sitename>@。
2007/07/02 0.2.2:
在ZMI菜单中添加了将cron作业添加到任务服务的选项。
可以为特定于任务的作业注册命名详细视图。
为cron作业添加了编辑视图。
改进了ZMI视图。
如果为未注册的任务添加了作业,则捕获异常。
修复了所有时区都能正常工作的测试。
2007/06/12 0.2.1:
在使用eggs时,不要因为跟踪回溯的性能问题而引发IndexError。
2007/06/12 0.2.0:
在lovely/__init__.py中添加了命名空间声明。
允许延迟作业。