跳转到主要内容

混合Twisted和ZODB

项目描述

Twist:在Twisted反应器调用中与ZODB通信

twist包包含一些函数和类,但主要是为在可调用的持久对象上或持久对象的某个方法上进行延迟调用而提供的一个辅助工具。这可以让Twisted反应器调用或Twisted延迟回调影响ZODB。所有操作都可以在主线程中完成,因此可以实现完全的Twisted使用,而无需线程。还有一些重要的“陷阱”:请参阅下面的陷阱部分以获取详细信息。

主要的API是Partial。您可以传递一个可调用的持久对象、持久对象的某个方法或一个普通非持久可调用对象,以及相同类型的任何参数或关键字参数。不要使用持久对象的非持久数据结构(如列表)作为数据库连接的参数。这是您的责任。

如果没有持久对象,partial将不会麻烦获取连接,并将按正常方式运行。

>>> from zc.twist import Partial
>>> def demo():
...     return 42
...
>>> Partial(demo)()
42

现在让我们想象一个持久并属于数据库连接的一部分的演示对象。它有一个从0开始的count属性,一个增加计数(默认为1)的__call__方法,以及一个减少计数(默认为1)的decrement方法 [1]。一切都会返回计数的当前值。

>>> demo.count
0
>>> demo()
1
>>> demo(2)
3
>>> demo.decrement()
2
>>> demo.decrement(2)
0
>>> import transaction
>>> transaction.commit()

现在我们可以使用这些示例进行一些延迟调用。我们将使用 transaction.begin() 来同步我们的连接与延迟调用中发生的情况。请注意,我们需要为这些适配器设置一些适配器才能使其工作。Twist模块包括它们的实现,我们假设它们已经被安装了。[2]

>>> call = Partial(demo)
>>> demo.count # hasn't been called yet
0
>>> deferred = call()
>>> demo.count # we haven't synced yet
0
>>> t = transaction.begin() # sync the connection
>>> demo.count # ah-ha!
1

我们可以使用从调用返回的延迟来做一些与返回值相关的事情。在这种情况下,延迟已经完成,所以添加回调会立即执行。

>>> def show_value(res):
...     print res
...
>>> ignore = deferred.addCallback(show_value)
1

我们也可以传递方法。

>>> call = Partial(demo.decrement)
>>> deferred = call()
>>> demo.count
1
>>> t = transaction.begin()
>>> demo.count
0

这同样适用于槽方法。

>>> import BTrees
>>> tree = root['tree'] = BTrees.family32.OO.BTree()
>>> transaction.commit()
>>> call = Partial(tree.__setitem__, 'foo', 'bar')
>>> deferred = call()
>>> len(tree)
0
>>> t = transaction.begin()
>>> tree['foo']
'bar'

参数会传递过去。

>>> call = Partial(demo)
>>> deferred = call(2)
>>> t = transaction.begin()
>>> demo.count
2
>>> call = Partial(demo.decrement)
>>> deferred = call(amount=2)
>>> t = transaction.begin()
>>> demo.count
0

它们也可以在实例化期间设置。

>>> call = Partial(demo, 3)
>>> deferred = call()
>>> t = transaction.begin()
>>> demo.count
3
>>> call = Partial(demo.decrement, amount=3)
>>> deferred = call()
>>> t = transaction.begin()
>>> demo.count
0

参数本身可以是持久对象。让我们假设一个新的demo2对象。

>>> demo2.count
0
>>> def mass_increment(d1, d2, value=1):
...     d1(value)
...     d2(value)
...
>>> call = Partial(mass_increment, demo, demo2, value=4)
>>> deferred = call()
>>> t = transaction.begin()
>>> demo.count
4
>>> demo2.count
4
>>> demo.count = demo2.count = 0 # cleanup
>>> transaction.commit()

ConflictErrors会导致重试。

为了有模拟ConflictError的机会,这次想象我们有一个可以切换执行从调用到我们代码的运行器,使用pauseretryresume(这只是为了测试–记住,在非线程Twisted中使用的调用应该是非阻塞的!)[3]

>>> import sys
>>> demo.count
0
>>> call = Partial(demo)
>>> runner = Runner(call) # it starts paused in the middle of an attempt
>>> call.attempt_count
1
>>> demo.count = 5 # now we will make a conflicting transaction...
>>> transaction.commit()
>>> runner.retry()
>>> call.attempt_count # so it has to retry
2
>>> t = transaction.begin()
>>> demo.count # our value hasn't changed...
5
>>> runner.resume() # but now call will be successful on the second attempt
>>> call.attempt_count
2
>>> t = transaction.begin()
>>> demo.count
6

默认情况下,在五次ConflictError重试后,部分失败,引发最后的ConflictError。这被返回到延迟中。延迟上的失败将有一个清洗过的堆栈跟踪。在这里,想象我们有一个从这样一个事件创建的延迟(命名为 deferred[4]

>>> res = None
>>> def get_result(r):
...     global res
...     res = r # we return None to quiet Twisted down on the command line
...
>>> d = deferred.addErrback(get_result)
>>> print res.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
ZODB.POSException.ConflictError: database conflict error...

你可以通过设置 max_transaction_errors 属性来控制应该执行多少次ConflictError(以及其他事务错误)重试[5]

ZEO ClientDisconnected错误始终重试,默认情况下,退避策略从5秒开始,永远不会超过60秒[6] [7] [8]

其他错误以清洗过的失败形式返回到延迟中,例如事务错误超过了其可用的重试次数。

>>> call = Partial(demo)
>>> d = call('I do not add well with integers')
>>> d = d.addErrback(get_result)
>>> print res.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
...TypeError: unsupported operand type(s) for +=: 'int' and 'str'

失败被清洗,即堆栈跟踪消失,帧值被转换为repr。如果你序列化失败,那么repr将被截断为最多20个字符加上“[…]”来指示截断[9]

调用试图成为一个好的连接市民,等待连接如果池的大小达到最大。此代码依赖于twisted反应器;我们将使用一个 time_flies 函数,该函数以秒为单位前进,来模拟反应器中的时间流逝。

我们使用2的幂(例如,0.125)作为浮点数,以避免测试中发生的浮点数累加错误,当时使用了0.1等值。

>>> db.setPoolSize(1)
>>> db.getPoolSize()
1
>>> demo.count = 0
>>> transaction.commit()
>>> call = Partial(demo)
>>> res = None
>>> deferred = call()
>>> d = deferred.addCallback(get_result)
>>> call.attempt_count
0
>>> time_flies(.125) >= 1 # returns number of connection attempts
True
>>> call.attempt_count
0
>>> res # None
>>> db.setPoolSize(2)
>>> db.getPoolSize()
2
>>> time_flies(.25) >= 1
True
>>> call.attempt_count > 0
True
>>> res
1
>>> t = transaction.begin()
>>> demo.count
1

如果需要超过一秒或两秒,它最终会决定抓取一个。这种行为可能会改变。

>>> db.setPoolSize(1)
>>> db.getPoolSize()
1
>>> call = Partial(demo)
>>> res = None
>>> deferred = call()
>>> d = deferred.addCallback(get_result)
>>> call.attempt_count
0
>>> time_flies(.125) >= 1
True
>>> call.attempt_count
0
>>> res # None
>>> time_flies(2) >= 2 # for a total of at least 3
True
>>> res
2
>>> t = transaction.begin()
>>> demo.count
2

如果没有运行的反应器,这个功能将无法工作[10]

你也可以使用 setReactor 为部分指定一个反应器,如果你不想使用twisted在 twisted.internet.reactor 中安装的标准反应器[11]

注意事项

对于某些类别的任务,您不需要过多考虑使用twist的twist Partial。例如,如果您将deferred工作收集的结果放入ZODB中,那么一切都应该相当简单。然而,不幸的是,对于其他常见用例,您需要稍微思考一下。

  • 如前所述,请勿使用包含任何持久化对象的非持久化集合(或没有连接的持久化对象)作为参数。

  • 使用尚未提交到数据库的带有连接的持久化对象会在使用(作为可调用的或参数)时引起问题,可能是不定期的(如果提交发生在调用部分之前,它将工作)。不要这样做。

  • 不要返回与连接关联的持久化对象。

  • 如果您打算根据可调用的工作启动另一个reactor调用,请记住,直到您提交事务,这项工作才真正“发生”。部分通常为您处理提交,如果您返回任何结果,则提交;如果您抛出错误,则中止。但如果您想在成功的事务基础上发送reactor调用,您将需要(a)完成工作,然后(b)提交,然后(c)发送reactor调用。如果提交失败,您将得到标准的中止和重试。

  • 如果您想处理自己的事务,不要使用从导入transaction获得的线程事务管理器。这将导致间歇性、难以调试的意外问题。相反,将您获得的任何持久化对象适配到transaction.interfaces.ITransactionManager,并使用该管理器进行提交和中止。

脚注

如果需要超过一秒或两秒,它最终会决定抓取一个。这种行为可能会改变。

>>> db.setPoolSize(1)
>>> db.getPoolSize()
1
>>> call = Partial(demo).setReactor(faux)
>>> res = None
>>> deferred = call()
>>> d = deferred.addCallback(get_result)
>>> call.attempt_count
0
>>> time_flies(.125) >= 1
True
>>> call.attempt_count
0
>>> res # None
>>> time_flies(2) >= 2 # for a total of at least 3
True
>>> res
2
>>> t = transaction.begin()
>>> demo.count
2

更改

1.3.1 (2009-11-15)

  • 添加了缺少的 twisted.python.failure 导入。

  • 使用 ZODB >= 3.9 时的 db.pool 和 ZODB < 3.9 时的 db._pools。现在测试通过 ZODB 3.9。

  • 测试在 Python 2.6 中通过。

1.3 (2008-06-19)

  • 处理 ZEO.Exceptions.ClientDisconnected 错误:无限重试,带有退避,默认最大退避时间为 60 秒。

  • 使重试 ConflictErrors 的次数可配置。

1.2 (2008-04-09)

  • 新的 twisted.python.failure.Failure 子类最初仅包含 repr,并且它序列化时排除了堆栈,排除了框架中的全局变量,并且截断了框架中局部变量的 repr。目标是保持 Failure 的 pickle 大小在一个可管理的范围内。现在 sanitize 使用这个类。

1.1 (2008-03-27)

  • 现在依赖于 twisted 8.0.1 或更高版本,它是新的 setuptools 兼容的。twisted 构建在 Py2.4 上有些令人恐惧,有多个警告和错误,但可以工作。依赖关系更改是升级到 1.1 的原因;这个版本没有新功能。

  • setup.py 使用 os.path

  • C 扩展使用旧的注释风格并且没有多余的代码。

1.0.1 (2008-03-14)

  • 错误修复:如果你传递了一个槽方法,如 BTree.__setitem__,会发生糟糕的事情。

1.0.0 (2008-03-13)

  • 添加指定备用反应器的功能

  • 使用外部引导

0.1.1 (?)

  • 在 setup.py 中使用 zc.twisted,而不是 twisted,直到 twisted 友好地兼容 setuptools

0.1 (?)

  • 初始发布版本

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分发

zc.twist-1.3.1.tar.gz (29.5 kB 查看哈希值)

上传

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面