跳转到主要内容

使用完整的Zope3运行环境创建独立程序

项目描述

gocept.runner

创建具有完整Zope3运行时环境的独立程序。这些程序可以作为一次性运行脚本使用,也可以在定义的时间后自动调用。

创建运行者

gocept.runner 包允许您轻松创建小型、长时间运行的脚本,这些脚本可以与ZODB交互。当脚本运行时,它们已设置完整的组件架构。

使用appmain装饰器定义运行者

>>> import logging
>>> import gocept.runner
>>> work_count = 0
>>> @gocept.runner.appmain(ticks=0.1)
... def worker():
...     import zope.app.appsetup.product
...     log = logging.getLogger('test')
...     log.info("Working")
...     log.info(sorted(
...         zope.app.appsetup.product.getProductConfiguration('test').items()))
...     global work_count
...     work_count += 1
...     if work_count >= 3:
...         return gocept.runner.Exit

被装饰的工作者现在有两个参数

  1. 根中一个对象的名称,将其设置为站点或为根设置None。

  2. 配置文件(zope.conf)的路径

创建一个简单的zope.conf

>>> import os.path
>>> import tempfile
>>> zodb_path = tempfile.mkdtemp()
>>> site_zcml = os.path.join(
...     os.path.dirname(__file__), 'ftesting.zcml')
>>> fd, zope_conf = tempfile.mkstemp()
>>> zope_conf_file = os.fdopen(fd, 'w')
>>> _ = zope_conf_file.write('''\
... site-definition %s
... <zodb>
...   <filestorage>
...     path %s/Data.fs
...   </filestorage>
... </zodb>
... <product-config test>
...     foo bar
...     test-principal zope.mgr
... </product-config>
... <eventlog>
...   <logfile>
...     formatter zope.exceptions.log.Formatter
...     path STDOUT
...   </logfile>
... </eventlog>
... ''' % (site_zcml, zodb_path))
>>> zope_conf_file.close()

所以调用工作者

>>> worker(None, zope_conf)
------
... INFO test Working
------
... INFO test [('foo', 'bar'), ('test-principal', 'zope.mgr')]
------
... INFO test Working
------
... INFO test [('foo', 'bar'), ('test-principal', 'zope.mgr')]
------
... INFO test Working
------
... INFO test [('foo', 'bar'), ('test-principal', 'zope.mgr')]

信号

工作者进程可以通过SIGTERM和SIGHUP以合理的方式终止。将脚本写入临时文件

>>> import sys
>>> runner_path = os.path.abspath(
...     os.path.join(os.path.dirname(__file__), '..', '..'))
>>> fd, script_name = tempfile.mkstemp(suffix='.py')
>>> exchange_fd, exchange_file_name = tempfile.mkstemp()
>>> script = os.fdopen(fd, 'w')
>>> _ = script.write("""\
... import sys
... sys.path[0:0] = %s
... sys.path.insert(0, '%s')
... import gocept.runner
...
... f = open('%s', 'w')
...
... @gocept.runner.appmain(ticks=0.1)
... def worker():
...     f.write("Working.\\n")
...     f.flush()
...
... worker(None, '%s')
... """ % (sys.path, runner_path, exchange_file_name, zope_conf))
>>> script.close()

调用脚本并等待它生成一些输出

>>> import signal
>>> import subprocess
>>> import time
>>> exchange = os.fdopen(exchange_fd, 'r+')
>>> proc = subprocess.Popen(
...     [sys.executable, script_name],
...     stdout=subprocess.PIPE,
...     text=True)
>>> while not exchange.read():
...     time.sleep(0.1)
...     _ = exchange.seek(0, 0)
>>> _ = exchange.seek(0, 0)
>>> print(exchange.read())
Working...

好吧,现在杀掉它

>>> os.kill(proc.pid, signal.SIGTERM)

等待进程真正完成并获取输出。运行者记录了它被终止的情况

>>> stdout, stderr = proc.communicate()
>>> print(stdout)
------...INFO gocept.runner.runner Received signal 15, terminating...

这也适用于SIGHUP

>>> _ = exchange.truncate(0)
>>> proc = subprocess.Popen(
...     [sys.executable, script_name],
...     stdout=subprocess.PIPE,
...     text=True)
>>> while not exchange.read():
...     time.sleep(0.1)
...     _ = exchange.seek(0, 0)
>>> _ = exchange.seek(0, 0)
>>> print(exchange.read())
Working...

好吧,现在杀掉它

>>> os.kill(proc.pid, signal.SIGHUP)
>>> stdout, stderr = proc.communicate()
>>> print(stdout)
------...INFO gocept.runner.runner Received signal 1, terminating...

清理:>> exchange.close() >> os.remove(script_name) >> os.remove(exchange_file_name)

设置主体

也可以创建一个在交互中运行的循环

>>> def get_principal():
...     return 'zope.mgr'
>>> import zope.security.management
>>> work_count = 0
>>> def interaction_worker():
...     global work_count
...     work_count += 1
...     if work_count >= 3:
...         raise SystemExit(1)
...     log = logging.getLogger('test')
...     interaction = zope.security.management.getInteraction()
...     principal = interaction.participations[0].principal
...     log.info("Working as %s" % principal.id)
>>> worker = gocept.runner.appmain(ticks=0.1, principal=get_principal)(
...     interaction_worker)

现在调用工作者

>>> worker(None, zope_conf)
------
... INFO test Working as zope.mgr
------
... INFO test Working as zope.mgr

工作者运行后没有交互

>>> zope.security.management.queryInteraction() is None
True

通常从zope.conf中读取主体。因此有一个助手使这项任务更容易

>>> work_count = 0
>>> worker = gocept.runner.appmain(
...     ticks=0.1,
...     principal=gocept.runner.from_config('test', 'test-principal'))(
...     interaction_worker)
>>> worker(None, zope_conf)
------
... INFO test Working as zope.mgr
------
... INFO test Working as zope.mgr

子站点

可以直接在根内的站点上工作。当然,站点必须已经存在,否则将出现错误

>>> worker('a-site', zope_conf)
Traceback (most recent call last):
    ...
KeyError: 'a-site'

清理

>>> import shutil
>>> shutil.rmtree(zodb_path)
>>> os.remove(zope_conf)

一次性命令

通常需要运行一次命令(或通过cron)并使用完整的组件架构。通常使用zopectl run进行此操作。

>>> import logging
>>> import gocept.runner
>>> import zope.component.hooks
>>> @gocept.runner.once()
... def worker():
...     log = logging.getLogger('test')
...     log.info("Working.")
...     site = zope.component.hooks.getSite()
...     if hasattr(site, 'store'):
...         log.info("Having attribute store.")
...     site.store = True

定义Zope环境

>>> import os.path
>>> import tempfile
>>> zodb_path = tempfile.mkdtemp()
>>> site_zcml = os.path.join(
...     os.path.dirname(__file__), 'ftesting.zcml')
>>> fd, zope_conf = tempfile.mkstemp()
>>> zope_conf_file = os.fdopen(fd, 'w')
>>> _ = zope_conf_file.write('''\
... site-definition %s
... <zodb>
...   <filestorage>
...     path %s/Data.fs
...   </filestorage>
... </zodb>
... <product-config test>
...     foo bar
... </product-config>
... <eventlog>
...   <logfile>
...     formatter zope.exceptions.log.Formatter
...     path STDOUT
...   </logfile>
... </eventlog>
... ''' % (site_zcml, zodb_path))
>>> zope_conf_file.close()

所以第一次调用工作者。它将在一次调用后终止

>>> worker(None, zope_conf)
------
... INFO test Working.

第二次调用表示在第一次运行中更改了属性

>>> worker(None, zope_conf)
------
... INFO test Working.
------
... INFO test Having attribute store.
...

运行者详情

主循环

主循环会一直循环,直到遇到KeyboardInterrupt或SystemExit异常,并且每秒调用工作者一次。

定义一个工作者函数,它在第三次被调用时退出

>>> work_count = 0
>>> def worker():
...     print("Working")
...     global work_count
...     work_count += 1
...     if work_count >= 3:
...         raise SystemExit(1)

调用主循环

>>> import gocept.runner.runner
>>> gocept.runner.runner.MainLoop(getRootFolder(), 0.1, worker)()
Working
Working
Working
>>> work_count
3

在循环期间设置站点

>>> import zope.component.hooks
>>> zope.component.hooks.getSite() is None
True
>>> def worker():
...     print(zope.component.hooks.getSite())
...     raise SystemExit(1)
>>> gocept.runner.runner.MainLoop(getRootFolder(), 0.1, worker)()
<zope.site.folder.Folder object at 0x...>

循环后,不再设置站点

>>> zope.component.hooks.getSite() is None
True

当工作者无错误地通过时,将提交事务

>>> work_count = 0
>>> def worker():
...     print("Working")
...     global work_count
...     work_count += 1
...     if work_count >= 2:
...         raise SystemExit(1)
...     site = zope.component.hooks.getSite()
...     site.worker_done = 1
>>> gocept.runner.runner.MainLoop(getRootFolder(), 0.1, worker)()
Working
Working

我们现在设置了属性worker_done

>>> getRootFolder().worker_done
1

当工作者产生错误时,将中止事务

>>> work_count = 0
>>> def worker():
...     global work_count
...     work_count += 1
...     print("Working")
...     site = zope.component.hooks.getSite()
...     site.worker_done += 1
...     if work_count < 3:
...         raise ValueError('hurz')
...     raise SystemExit(1)
>>> gocept.runner.runner.MainLoop(getRootFolder(), 0.1, worker)()
Working
Working
Working

我们仍然将属性worker_done设置为1:b

>>> getRootFolder().worker_done
1

控制休眠时间

工作者函数可以控制休眠时间。

注册日志处理程序,以便我们可以观察此操作

>>> from io import StringIO
>>> import logging
>>> log = StringIO()
>>> log_handler = logging.StreamHandler(log)
>>> logging.root.addHandler(log_handler)
>>> old_log_level = logging.root.level
>>> logging.root.setLevel(logging.DEBUG)
>>> work_count = 0
>>> def worker():
...     global work_count
...     work_count += 1
...     new_sleep = work_count * 0.1
...     if work_count == 3:
...         print("Will sleep default")
...         return None
...     if work_count > 3:
...         raise SystemExit(1)
...     print("Will sleep %s" % new_sleep)
...     return new_sleep
>>> gocept.runner.runner.MainLoop(getRootFolder(), 0.15, worker)()
Will sleep 0.1
Will sleep 0.2
Will sleep default

实际的休眠值在日志中

>>> print(log.getvalue())
new transaction
commit
Sleeping 0.1 seconds
new transaction
commit
Sleeping 0.2 seconds
new transaction
commit
Sleeping 0.15 seconds
new transaction
abort...

当工作者内部发生错误时,将使用默认的休眠时间

>>> _ = log.seek(0)
>>> _ = log.truncate()
>>> work_count = 0
>>> def worker():
...     global work_count
...     work_count += 1
...     if work_count == 1:
...         new_sleep = 0.1
...     elif work_count == 2:
...         print("Failing")
...         raise Exception("Fail!")
...     elif work_count == 3:
...         print("Will sleep default")
...         return None
...     elif work_count > 3:
...         return gocept.runner.Exit
...     print("Will sleep %s" % new_sleep)
...     return new_sleep
>>> gocept.runner.runner.MainLoop(getRootFolder(), 0.15, worker)()
Will sleep 0.1
Failing
Will sleep default

实际的休眠值在日志中

>>> print(log.getvalue())
new transaction
commit
Sleeping 0.1 seconds
new transaction
Error in worker: Exception('Fail!'...)
Traceback (most recent call last):
  ...
Exception: Fail!
abort
Sleeping 0.15 seconds
new transaction
commit
Sleeping 0.15 seconds
new transaction
commit...

恢复旧的日志处理程序

>>> logging.root.removeHandler(log_handler)
>>> log.close()
>>> logging.root.setLevel(old_log_level)

多次提交助手

有时您有多个操作,并希望在每一步之间提交。gocept.runner 提供了一个装饰器 transaction_per_item,这使得编写它变得简单,例如

@gocept.runner.transaction_per_item
def multiple_steps():
    for item in source:
        yield lambda: process(item)

这将按顺序调用每个 process(item),并在每个之后提交。如果步骤引发异常,则中止事务,循环继续进行下一个步骤。ZODB ConflictErrors也会被忽略。

变更

3.0 (2023-07-20)

  • 停止支持Python 2.7。

  • 添加对Python 3.9、3.10、3.11的支持。

2.2 (2020-09-14)

  • 去除对 ZODB3 的依赖。

2.1 (2019-12-02)

  • 添加对Python 3.7和3.8的支持。

  • 迁移到Github。

2.0 (2018-03-14)

  • 不兼容的更改:不再依赖于 zope.app.server,也不再不必要地使用其 ZConfig 架构,该架构不必要地需要一个 <accesslog> 部分,而在这个上下文中这部分没有任何意义。因此,在由本包使用的 ZConfing 配置文件中不再允许使用 <accesslog>

1.0 (2016-01-12)

  • 不再依赖于 zope.app.component

0.7.1 (2015-01-28)

  • 修复了 0.7.0 版本的错误。

0.7.0 (2015-01-28)

  • 当工作进程产生无法用 us-ascii 表示的异常时,会正确记录错误信息。这曾经导致日志模块抛出异常。这些异常的根源很难追踪。

  • 移除了未声明且不必要的 zope.app.security 依赖。

  • 将项目主页迁移到 <https://bitbucket.org/gocept/gocept.runner/>。

0.6.0 (2011-12-01)

  • 添加了 transaction_per_item 装饰器。

  • 使用 stdlib 的 doctest 而不是 zope.testing。

0.5.3 (2010-04-14)

  • 使用 log.error/log.warning 而不是 log.exception。冲突错误现在作为警告记录,因为它们并不是真正的问题。

0.5.2 (2010-04-14)

  • 将记录的异常转换为 str。

0.5.1 (2009-10-13)

  • 正确声明了依赖关系。

0.5 (2009-09-21)

  • 不再使用 zope.app.twisted,而是使用 zope.app.server

0.4 (2009-09-03)

  • 现在可以通过一个函数计算由 appmain/once 设置的主。

0.3.2 (2009-05-21)

  • 修复了 appmain 中的子站处理。

0.3.1 (2009-05-21)

  • 声明了命名空间包。

0.3 (2009-04-15)

  • 当工作进程失败时,将使用默认的睡眠时间(而不是最后一个时间)。

0.2 (2009-04-09)

  • 添加了一种优雅的方式退出运行器(通过返回 gocept.runner.Exit)。

0.1 (2009-04-07)

  • 第一个公开版本

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪一个,请了解有关 安装包 的更多信息。

源分布

gocept.runner-3.0.tar.gz (18.1 kB 查看哈希)

上传时间

构建分布

gocept.runner-3.0-py2.py3-none-any.whl (18.2 kB 查看哈希)

上传时间 Python 2 Python 3

支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面