跳转到主要内容

zope 3的远程任务客户端工具

项目描述

远程任务执行

本软件包提供了一个远程任务执行Web服务的实现,允许在另一台服务器上执行预定义的任务。还可以在特定时间运行定时任务。这些服务以两种方式有用

  1. 它们使我们能够完成特定机器上不可用原生的任务。例如,使用Linux(我们Web服务器可能运行的操作系统),无法将AVI文件转换为Flash(R)电影。

  2. 它们还允许将昂贵的操作移动到其他服务器。这在处理高流量网站上的视频转换时非常有价值。

安装

在 zope.conf 中定义应在启动时启动的 remotetasks,如下所示

<product-config lovely.remotetask>
  autostart site1@TestTaskService1, site2@TestTaskService2, @RootTaskService
</product-config>

请注意,直接在根文件夹中注册的服务可以通过仅用 @ 符号作为前缀来引用。可以省略站点名称。例如,上面引用的 RootTaskService

这将在Zope启动时启动Remotetasks。

用法

>>> STOP_SLEEP_TIME = 0.02

现在让我们创建一个单一的服务

>>> from lovely import remotetask
>>> service = remotetask.TaskService()

对象应位于,以便它有一个名称

>>> from zope.app.folder import Folder
>>> site1 = Folder()
>>> root['site1'] = site1
>>> from zope.app.component.site import LocalSiteManager
>>> from zope.security.proxy import removeSecurityProxy
>>> sm = LocalSiteManager(removeSecurityProxy(site1))
>>> site1.setSiteManager(sm)
>>> sm['default']['testTaskService1'] = service
>>> service = sm['default']['testTaskService1'] # caution! proxy
>>> service.__name__
u'testTaskService1'
>>> service.__parent__ is sm['default']
True

让我们以 TestTaskService1 的名称注册它

>>> from zope import component
>>> from lovely.remotetask import interfaces
>>> sm = site1.getSiteManager()
>>> sm.registerUtility(service, interfaces.ITaskService,
...                          name='TestTaskService1')

我们可以发现可用的任务

>>> service.getAvailableTasks()
{}

此列表最初为空,因为我们尚未注册任何任务。现在让我们定义一个简单的任务,该任务简单地回显输入字符串

>>> def echo(input):
...     return input
>>> import lovely.remotetask.task
>>> echoTask = remotetask.task.SimpleTask(echo)

转换器唯一的API要求是可调用。现在我们确保任务工作正常

>>> echoTask(service, 1, input={'foo': 'blah'})
{'foo': 'blah'}

现在让我们将任务注册为实用工具

>>> import zope.component
>>> zope.component.provideUtility(echoTask, name='echo')

echo任务现在在服务中可用

>>> service.getAvailableTasks()
{u'echo': <SimpleTask <function echo ...>>}

由于服务不能立即完成一个任务,因此由队列管理传入的工作。首先,我们要求执行echo任务

>>> jobid = service.add(u'echo', {'foo': 'bar'})
>>> jobid
1392637175

add() 函数安排执行带有指定参数的“echo”任务。该方法返回一个作业ID,我们可以用它来查询作业。默认情况下,add() 函数立即添加并启动作业。有时我们需要作业id,但不想立即启动作业。请参阅 startlater.txt 了解详情。

>>> service.getStatus(jobid)
'queued'

由于作业尚未处理,状态设置为“队列”。此外,尚未提供任何结果

>>> service.getResult(jobid) is None
True

只要作业未被处理,就可以取消它

>>> service.cancel(jobid)
>>> service.getStatus(jobid)
'cancelled'

默认情况下不会启动服务

>>> service.isProcessing()
False

TaskService 会在 IDatabaseOpenedEvent 触发时自动启动(如果已在 zope.conf 中指定)- 让我们模拟 zope.conf 设置

>>> class Config(object):
...     mapping = {}
...     def getSectionName(self):
...         return 'lovely.remotetask'
>>> config = Config()
>>> config.mapping['autostart'] = (
...     'site1@TestTaskService1, site2@TestTaskService2,@RootTaskService')
>>> from zope.app.appsetup.product import setProductConfigurations
>>> setProductConfigurations([config])
>>> from lovely.remotetask.service import getAutostartServiceNames
>>> getAutostartServiceNames()
['site1@TestTaskService1', 'site2@TestTaskService2', '@RootTaskService']

请注意,RootTaskService 是用于服务直接在根注册的情况。我们在单独的脚注中测试此用例,以免破坏本文档的流程。 [3]

为了获得干净的日志环境,让我们清除日志堆栈

>>> log_info.clear()

在Zope启动时,会触发 IDatabaseOpenedEvent,并调用 bootStrap() 方法

>>> from ZODB.tests import util
>>> import transaction
>>> db = util.DB()
>>> from zope.app.publication.zopepublication import ZopePublication
>>> conn = db.open()
>>> conn.root()[ZopePublication.root_name] = root
>>> transaction.commit()

触发事件

>>> from zope.app.appsetup.interfaces import DatabaseOpenedWithRoot
>>> from lovely.remotetask.service import bootStrapSubscriber
>>> event = DatabaseOpenedWithRoot(db)
>>> bootStrapSubscriber(event)

就这样 - 服务正在处理

>>> service.isProcessing()
True

检查日志将证明已启动的服务

>>> print log_info
lovely.remotetask INFO
  handling event IStartRemoteTasksEvent
lovely.remotetask INFO
  service TestTaskService1 on site site1 started
lovely.remotetask ERROR
  site site2 not found
lovely.remotetask INFO
  service RootTaskService on site root started

对根级服务中的作业的验证在另一个脚注中进行 [4]

在同一个站点处理大量服务时,可以使用星号(*)来启动服务。使用site@*时,表示启动该站点上的所有服务

但首先停止所有处理服务

>>> service.stopProcessing()
>>> service.isProcessing()
False
>>> root_service.stopProcessing()
>>> root_service.isProcessing()
False
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)

然后重置记录器

>>> log_info.clear()

使用带星号的服务名称重置产品配置

>>> config.mapping['autostart'] = 'site1@*'
>>> setProductConfigurations([config])
>>> getAutostartServiceNames()
['site1@*']

再次触发事件将启动配置站点上的所有服务

>>> bootStrapSubscriber(event)
>>> service.isProcessing()
True
>>> root_service.isProcessing()
False

让我们检查日志记录

>>> print log_info
lovely.remotetask INFO
  handling event IStartRemoteTasksEvent
lovely.remotetask INFO
  service TestTaskService1 on site site1 started

在许多站点处理大量服务时,可以使用星号(*)来启动服务。使用@时,表示启动所有站点上的所有服务

>>> service.stopProcessing()
>>> service.isProcessing()
False
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)

使用带星号的服务名称重置产品配置

>>> config.mapping['autostart'] = '*@*'
>>> setProductConfigurations([config])
>>> getAutostartServiceNames()
['*@*']

…并重置记录器

>>> log_info.clear()

再次触发事件。现在应该启动所有服务

>>> bootStrapSubscriber(event)
>>> service.isProcessing()
True
>>> root_service.isProcessing()
True

让我们检查日志记录

>>> print log_info
lovely.remotetask INFO
  handling event IStartRemoteTasksEvent
lovely.remotetask INFO
  service RootTaskService on site root started
lovely.remotetask INFO
  service TestTaskService1 on site site1 started

在许多站点处理特定服务时,可以使用星号(*)来启动服务。使用*@service表示启动所有站点上的名为service的服务

>>> service.stopProcessing()
>>> service.isProcessing()
False
>>> root_service.stopProcessing()
>>> root_service.isProcessing()
False
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)

使用带星号的服务名称重置产品配置

>>> config.mapping['autostart'] = '*@TestTaskService1'
>>> setProductConfigurations([config])
>>> getAutostartServiceNames()
['*@TestTaskService1']

…并重置记录器

>>> log_info.clear()

再次触发事件。现在应该启动所有服务

>>> bootStrapSubscriber(event)
>>> service.isProcessing()
True
>>> root_service.isProcessing()
False

让我们检查日志记录

>>> print log_info
lovely.remotetask INFO
  handling event IStartRemoteTasksEvent
lovely.remotetask INFO
  service TestTaskService1 on site site1 started

如果配置的指令在任何站点上都不匹配任何服务,则日志将显示警告信息

>>> service.stopProcessing()
>>> service.isProcessing()
False
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)
>>> config.mapping['autostart'] = '*@Foo'
>>> setProductConfigurations([config])
>>> getAutostartServiceNames()
['*@Foo']
>>> log_info.clear()
>>> bootStrapSubscriber(event)
>>> service.isProcessing()
False
>>> root_service.isProcessing()
False
>>> print log_info
lovely.remotetask INFO
  handling event IStartRemoteTasksEvent
lovely.remotetask WARNING
  no services started by directive *@Foo

最后停止处理并杀死线程。由于测试中没有合适的测试环境,我们将手动调用service.process()

>>> service.stopProcessing()
>>> service.isProcessing()
False
>>> root_service.stopProcessing()
>>> root_service.isProcessing()
False
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)

现在让我们读取一个作业

>>> jobid = service.add(u'echo', {'foo': 'bar'})
>>> service.process()
>>> service.getStatus(jobid)
'completed'
>>> service.getResult(jobid)
{'foo': 'bar'}

现在,让我们定义一个会导致错误的新的任务

>>> def error(input):
...     raise remotetask.task.TaskError('An error occurred.')
>>> zope.component.provideUtility(
...     remotetask.task.SimpleTask(error), name='error')

现在添加并执行它

>>> jobid = service.add(u'error')
>>> service.process()

现在让我们看看发生了什么

>>> service.getStatus(jobid)
'error'
>>> service.getError(jobid)
'An error occurred.'

为了管理目的,服务还允许您检查所有作业

>>> dict(service.jobs)
{1392637176: <Job 1392637176>, 1392637177: <Job 1392637177>, 1392637175: <Job 1392637175>}

为了删除不再需要的作业,可以使用clean方法。

>>> jobid = service.add(u'echo', {'blah': 'blah'})
>>> sorted([job.status for job in service.jobs.values()])
['cancelled', 'completed', 'error', 'queued']
>>> service.clean()
>>> sorted([job.status for job in service.jobs.values()])
['queued']

定时任务

计划任务在特定时间执行。

>>> import time
>>> from lovely.remotetask.job import CronJob
>>> now = 0
>>> time.gmtime(now)
(1970, 1, 1, 0, 0, 0, 3, 1, 0)

我们设置了一个任务,每当前一分钟执行一次。下一次调用时间是现在的下一个。

分钟

>>> cronJob = CronJob(-1, u'echo', (), minute=(0, 10))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 1, 0, 10, 0, 3, 1, 0)
>>> time.gmtime(cronJob.timeOfNextCall(10*60))
(1970, 1, 1, 1, 0, 0, 3, 1, 0)

小时

>>> cronJob = CronJob(-1, u'echo', (), hour=(2, 13))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 1, 2, 0, 0, 3, 1, 0)
>>> time.gmtime(cronJob.timeOfNextCall(2*60*60))
(1970, 1, 1, 13, 0, 0, 3, 1, 0)

月份

>>> cronJob = CronJob(-1, u'echo', (), month=(1, 5, 12))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 5, 1, 0, 0, 0, 4, 121, 0)
>>> time.gmtime(cronJob.timeOfNextCall(cronJob.timeOfNextCall(0)))
(1970, 12, 1, 0, 0, 0, 1, 335, 0)

星期几[0..6],1970年1月1日是星期三。

>>> cronJob = CronJob(-1, u'echo', (), dayOfWeek=(0, 2, 4, 5))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 2, 0, 0, 0, 4, 2, 0)
>>> time.gmtime(cronJob.timeOfNextCall(60*60*24))
(1970, 1, 3, 0, 0, 0, 5, 3, 0)
>>> time.gmtime(cronJob.timeOfNextCall(2*60*60*24))
(1970, 1, 5, 0, 0, 0, 0, 5, 0)
>>> time.gmtime(cronJob.timeOfNextCall(4*60*60*24))
(1970, 1, 7, 0, 0, 0, 2, 7, 0)

每月的某一天[1..31]

>>> cronJob = CronJob(-1, u'echo', (), dayOfMonth=(1, 12, 21, 30))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 12, 0, 0, 0, 0, 12, 0)
>>> time.gmtime(cronJob.timeOfNextCall(12*24*60*60))
(1970, 1, 21, 0, 0, 0, 2, 21, 0)

组合

>>> cronJob = CronJob(-1, u'echo', (), minute=(10,),
...                                 dayOfMonth=(1, 12, 21, 30))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 1, 0, 10, 0, 3, 1, 0)
>>> time.gmtime(cronJob.timeOfNextCall(10*60))
(1970, 1, 1, 1, 10, 0, 3, 1, 0)
>>> cronJob = CronJob(-1, u'echo', (), minute=(10,),
...                                 hour=(4,),
...                                 dayOfMonth=(1, 12, 21, 30))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 1, 4, 10, 0, 3, 1, 0)
>>> time.gmtime(cronJob.timeOfNextCall(10*60))
(1970, 1, 1, 4, 10, 0, 3, 1, 0)

计划任务也可以用来延迟作业的执行。

>>> cronJob = CronJob(-1, u'echo', (), delay=10,)
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 1, 0, 0, 10, 3, 1, 0)
>>> time.gmtime(cronJob.timeOfNextCall(1))
(1970, 1, 1, 0, 0, 11, 3, 1, 0)

创建延迟任务

延迟作业将在给定的延迟时间(秒)后执行一次。

>>> count = 0
>>> def counting(input):
...     global count
...     count += 1
...     return count
>>> countingTask = remotetask.task.SimpleTask(counting)
>>> zope.component.provideUtility(countingTask, name='counter')
>>> jobid = service.addCronJob(u'counter',
...                            {'foo': 'bar'},
...                            delay = 10,
...                           )
>>> service.getStatus(jobid)
'delayed'
>>> service.process(0)
>>> service.getStatus(jobid)
'delayed'
>>> service.process(9)
>>> service.getStatus(jobid)
'delayed'

在10秒时,作业被执行并完成。

>>> service.process(10)
>>> service.getStatus(jobid)
'completed'

创建定时任务

这里我们创建了一个计划任务,它在小时过后的10分钟和13分钟运行。

>>> count = 0
>>> jobid = service.addCronJob(u'counter',
...                            {'foo': 'bar'},
...                            minute = (10, 13),
...                           )
>>> service.getStatus(jobid)
'cronjob'

我们处理远程任务,但我们的计划任务没有执行,因为我们执行得太早。

>>> service.process(0)
>>> service.getStatus(jobid)
'cronjob'
>>> service.getResult(jobid) is None
True

现在我们晚10分钟运行远程任务并得到结果。

>>> service.process(10*60)
>>> service.getStatus(jobid)
'cronjob'
>>> service.getResult(jobid)
1

并且1分钟后它没有被调用。

>>> service.process(11*60)
>>> service.getResult(jobid)
1

但是3分钟后它再次被调用。

>>> service.process(13*60)
>>> service.getResult(jobid)
2

作业可以被重新安排。

>>> job = service.jobs[jobid]
>>> job.update(minute = (11, 13))

更新后,必须在服务中重新安排作业。

>>> service.reschedule(jobid)

现在作业不再在旧的注册分钟10分执行。

>>> service.process(10*60+60*60)
>>> service.getResult(jobid)
2

但它在新设置的11分钟执行。

>>> service.process(11*60+60*60)
>>> service.getResult(jobid)
3

线程行为

每个任务服务都在单独的线程中运行,允许它们独立操作。任务应该设计成避免数据库中的冲突错误。

让我们现在启动我们定义的任务服务,并查看因此运行的线程

>>> service.startProcessing()
>>> root_service.startProcessing()
>>> import pprint
>>> import threading
>>> def show_threads():
...     threads = [t for t in threading.enumerate()
...                if t.getName().startswith('remotetasks.')]
...     threads.sort(key=lambda t: t.getName())
...     pprint.pprint(threads)
>>> show_threads()
[<Thread(remotetasks.rootTaskService, started daemon)>,
 <Thread(remotetasks.site1.++etc++site.default.testTaskService1, started daemon)>]

让我们添加第二个站点,该站点包含一个与第一个站点中服务同名任务服务

>>> site2 = Folder()
>>> service2 = remotetask.TaskService()
>>> root['site2'] = site2
>>> sm = LocalSiteManager(removeSecurityProxy(site2))
>>> site2.setSiteManager(sm)
>>> sm['default']['testTaskService1'] = service2
>>> service2 = sm['default']['testTaskService1'] # caution! proxy

让我们以 TestTaskService1 的名称注册它

>>> sm = site2.getSiteManager()
>>> sm.registerUtility(
...     service2, interfaces.ITaskService, name='TestTaskService1')

服务要求在可以使用之前先提交到数据库

>>> transaction.commit()

新服务目前没有处理

>>> service2.isProcessing()
False

如果我们启动新服务,我们可以看到现在有三个后台线程

>>> service2.startProcessing()
>>> show_threads()
[<Thread(remotetasks.rootTaskService, started daemon)>,
 <Thread(remotetasks.site1.++etc++site.default.testTaskService1, started daemon)>,
 <Thread(remotetasks.site2.++etc++site.default.testTaskService1, started daemon)>]

让我们停止服务,并给后台线程一个接收消息的机会

>>> service.stopProcessing()
>>> service2.stopProcessing()
>>> root_service.stopProcessing()
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)

线程已经退出

>>> print [t for t in threading.enumerate()
...        if t.getName().startswith('remotetasks.')]
[]

脚注

检查接口和其他内容

>>> from zope.interface.verify import verifyClass, verifyObject
>>> verifyClass(interfaces.ITaskService, remotetask.TaskService)
True
>>> verifyObject(interfaces.ITaskService, service)
True
>>> interfaces.ITaskService.providedBy(service)
True
>>> from lovely.remotetask.job import Job
>>> fakejob = Job(1, u'echo', {})
>>> verifyClass(interfaces.IJob, Job)
True
>>> verifyObject(interfaces.IJob, fakejob)
True
>>> interfaces.IJob.providedBy(fakejob)
True
>>> fakecronjob = CronJob(1, u'echo', {})
>>> verifyClass(interfaces.ICronJob, CronJob)
True
>>> verifyObject(interfaces.ICronJob, fakecronjob)
True
>>> interfaces.IJob.providedBy(fakecronjob)
True

对 lovely.remotetask 的修改

0.5.2 (2010-04-30)

  • 删除了对依赖项zope.publisher不必要的版本要求。

0.5.1 (2010-04-14)

  • 将记录的异常转换为str,因为日志消息应该是字符串。

0.5 (2009-09-10)

  • 修复了SimpleProcessor的bug:如果作业被中止了事务,它将永远不会从队列中移除,而是反复重试。

2009/05/20 (0.4):

  • 像intid那样随机化新作业ID的生成:尝试分配连续的ID,以便它们落入同一个BTree桶中,如果遇到已使用的ID则随机化。

2009/04/05 (0.3):

  • 在添加cron作业的表单中使用下拉小部件,而不是文本输入,以显示可用任务。

  • 通过直接使用其包装的API来删除对zope.app.zapi的依赖。

  • 使用zope.location中的ISite而不是zope.app.component。

  • 使用zc.queue.Queue而不是zc.queue.PersistentQueue,因为PersistentQueue仅适用于CompositeQueue。

  • 将URL更改为pypi。

  • 在ITaskService.clean中使用正确的复数形式的状态(status)。

2008/11/07 0.2.15a1

  • 运行可能会引发AttributeError。已添加处理程序。

2008/02/08 0.2.14:

  • 在'clearAll'期间每100个作业后提交,以避免在取消大量作业时浏览器超时。

2008/01/28 (新)

  • 解决了一些bug,并改进了测试。

  • 向TaskService.add添加了startLater。有关更多信息,请参阅startlater.txt。这有助于将作业添加和开始时间点分开。(不是cron类型)

2007/12/?? (新)

  • 将索引切换到Zope 3.4 KGS,以便我们同意使用的包版本。

  • 使处理器的睡眠时间可变;这是测试所需的,以便测试框架不会比处理器关闭得快。

  • isProcessing()中添加了小的优化,以在找到具有正确名称的线程后停止遍历线程。

2007/11/12 0.2.13:

  • 添加了“取消所有”按钮。

  • 修复了与任务服务实例关联线程的bug。

2007/10/28 0.2.12:

  • 使启动更健壮。如果通过ZMI删除已注册的任务服务,则其注册不会删除。如果发生这种情况,则使用自动启动时,Zope无法重新启动。

2007/10/28 0.2.11:

  • 允许在cron作业添加/编辑表单中选择所有可能的“*”时间。

  • 允许取消延迟作业。

2007/10/24 0.2.10:

  • 避免了弃用警告。

2007/10/08 0.2.9:

  • 如果作业状态为ERROR,则不会将cron作业推回队列。

2007/10/08 0.2.8:

  • 增强了启动过程中的日志记录。

2007/10/02 0.2.7:

  • 在buildout.cfg中添加了索引。

  • 增强了自动启动行为:服务可以按以下方式启动:site@*,@service和@

2007/08/07 0.2.6:

  • 修复了排序中的bug,导致列标题永远不会可点击。

2007/08/07 0.2.5:

  • 不再需要“作业”ZMI视图的会话支持。

2007/08/06 0.2.4:

  • 修复了导致处理线程不必要地保持进程活跃的bug。

2007/07/26 0.2.3:

  • 现在处理直接在根处注册的任务服务的用例。在产品配置中引用此类服务时,必须以@开头,而不是<sitename>@

2007/07/02 0.2.2:

  • 在ZMI菜单中添加了将cron作业添加到任务服务的选项。

  • 可以为特定于任务的作业注册命名详细视图。

  • 为cron作业添加了编辑视图。

  • 改进了ZMI视图。

  • 如果为未注册的任务添加了作业,则捕获异常。

  • 修复了所有时区都能正常工作的测试。

2007/06/12 0.2.1:

  • 在使用eggs时,不要因为跟踪回溯的性能问题而引发IndexError。

2007/06/12 0.2.0:

  • 在lovely/__init__.py中添加了命名空间声明。

  • 允许延迟作业。

项目详细信息


下载文件

为您的平台下载文件。如果您不确定要选择哪个,请了解更多关于安装包的信息。

源分布

lovely.remotetask-0.5.2.tar.gz (48.8 kB 查看哈希值)

上传时间: 源码

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页