跳转到主要内容

通过ZODB优化的队列。

项目描述

持久队列

持久队列是针对持久性通过ZODB优化的队列。它们假定ZODB正在使用MVCC来避免读取冲突。它们尝试解决写冲突,以便添加和同时删除对象的交易合并,除非交易试图从队列中删除相同的值。

这些队列的一个重要特征是,它们不会期望在任何给定等效项目上同时持有多个引用。例如,如果您的应用程序有理由在同一队列中同时保留两个“hello”字符串的副本,则某些冲突解决功能可能无法按预期执行[1]

该模块提供两种口味:一种简单的持久队列,其中所有包含的对象都在一个持久对象中保持(队列),以及将内容分割成多个复合元素的持久队列(复合队列)。在API方面,它们应该是等效的,因此在此文档中主要以抽象形式进行探讨:我们将使用代表性的队列工厂生成实例,它可以是任何类。它们仅在写冲突解决行为方面有所不同,下面将讨论这一点。

队列可以无参数实例化。

>>> q = Queue()
>>> from zc.queue.interfaces import IQueue
>>> from zope.interface.verify import verifyObject
>>> verifyObject(IQueue, q)
True

基本API很简单:使用put向队列末尾添加项目,使用pull从队列中拉取项目,默认为队列的前端。请注意,Item可以是持久对象或非持久对象。

>>> q.put(Item(1))
>>> q.put(Item(2))
>>> q.pull()
1
>>> q.put(Item(3))
>>> q.pull()
2
>>> q.pull()
3

pull方法接受一个可选的基于零的索引参数,并可以接受负值。

>>> q.put(Item(4))
>>> q.put(Item(5))
>>> q.put(Item(6))
>>> q.pull(-1)
6
>>> q.pull(1)
5
>>> q.pull(0)
4

从空队列请求项目会引发IndexError。

>>> q.pull() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
IndexError: ...

请求无效的索引值会产生相同的结果。

>>> q.put(Item(7))
>>> q.put(Item(8))
>>> q.pull(2) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
IndexError: ...

除了这些核心队列操作之外,队列支持len…

>>> len(q)
2
>>> q.pull()
7
>>> len(q)
1
>>> q.pull()
8
>>> len(q)
0

…iter(它不会清空队列)…

>>> next(iter(q))
Traceback (most recent call last):
...
StopIteration
>>> q.put(Item(9))
>>> q.put(Item(10))
>>> q.put(Item(11))
>>> next(iter(q))
9
>>> [i for i in q]
[9, 10, 11]
>>> q.pull()
9
>>> [i for i in q]
[10, 11]

…bool…

>>> bool(q)
True
>>> q.pull()
10
>>> q.pull()
11
>>> bool(q)
False

…以及类似列表的括号访问(它再次不会清空队列)。

>>> q.put(Item(12))
>>> q[0]
12
>>> q.pull()
12
>>> q[0] # doctest: +ELLIPSIS
Traceback (most recent call last):
...
IndexError: ...
>>> for i in range (13, 23):
...     q.put(Item(i))
...
>>> q[0]
13
>>> q[1]
14
>>> q[2]
15
>>> q[-1]
22
>>> q[-10]
13

就是这样——除了put之外没有其他添加任何内容的方法,除了pull之外没有其他移除任何内容的方法。

唯一的其他问题是冲突解决代码。ZODB中的冲突解决有一些一般性的注意事项,您应该了解[2]

除了这些一般性的注意事项之外,我们现在将检查一些zc.queue冲突解决的示例。为了展示这一点,我们必须有两个相同队列的副本,来自两个不同的连接。

注意:这种测试方法有已知的弱点。请参阅tests.py中的讨论。

>>> import transaction
>>> from zc.queue.tests import ConflictResolvingMappingStorage
>>> from ZODB import DB
>>> db = DB(ConflictResolvingMappingStorage('test'))
>>> transactionmanager_1 = transaction.TransactionManager()
>>> transactionmanager_2 = transaction.TransactionManager()
>>> connection_1 = db.open(transaction_manager=transactionmanager_1)
>>> root_1 = connection_1.root()
>>> q_1 = root_1["queue"] = Queue()
>>> transactionmanager_1.commit()
>>> transactionmanager_2 = transaction.TransactionManager()
>>> connection_2 = db.open(transaction_manager=transactionmanager_2)
>>> root_2 = connection_2.root()
>>> q_2 = root_2['queue']

现在我们有相同队列的两个副本,分别具有单独的事务管理器和连接,就像两个线程会有的一样。' _1 '后缀标识用户1的对象,在第一个线程中;而' _2 '后缀标识用户2的对象,在并发线程2中。

首先,让我们让两个用户同时向队列中添加一些项目。对于仅包含单个新项目的并发提交(每个交易一个),在两种类型的队列中,首先提交的用户将获得队列中较低的位置——也就是说,将使用默认的pull调用最早离开队列的位置。

在此示例中,尽管q_1首先被修改,但q_2的交易首先提交,因此合并后的q_2的添加是第一个。

>>> q_1.put(Item(1001))
>>> q_2.put(Item(1000))
>>> transactionmanager_2.commit()
>>> transactionmanager_1.commit()
>>> connection_1.sync()
>>> connection_2.sync()
>>> list(q_1)
[1000, 1001]
>>> list(q_2)
[1000, 1001]

对于每个连接两次以上的提交,或两个以上的并发添加交易,Queue的行为是相同的:第一次提交的添加将位于第二次提交之前。

>>> from zc import queue
>>> if isinstance(q_1, queue.Queue):
...     for i in range(5):
...         q_1.put(Item(i))
...     for i in range(1002, 1005):
...         q_2.put(Item(i))
...     transactionmanager_2.commit()
...     transactionmanager_1.commit()
...     connection_1.sync()
...     connection_2.sync()
...

如下所示,这将再次可靠地将第一次提交的所有值放在队列中较早的位置,结果为[1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]。

对于CompositeQueue,其行为不同。在事务中添加一系列元素时,顺序将得到保持,但两个事务添加的值可能会在两者之间合并。我们将在这里进行补偿,以获得可靠的队列状态。

>>> if isinstance(q_1, queue.CompositeQueue):
...     for i1, i2 in ((1002, 1003), (1004, 0), (1, 2), (3, 4)):
...         q_1.put(Item(i1))
...         q_2.put(Item(i2))
...         transactionmanager_1.commit()
...         transactionmanager_2.commit()
...         connection_1.sync()
...         connection_2.sync()
...

无论我们有什么样的队列,我们现在都有以下值。

>>> list(q_1)
[1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]
>>> list(q_2)
[1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]

如果有两个用户尝试添加相同的项,则会引发冲突错误。

>>> five = Item(5)
>>> q_1.put(five)
>>> q_2.put(five)
>>> transactionmanager_1.commit()
>>> from ZODB.POSException import ConflictError, InvalidObjectReference
>>> try:
...     transactionmanager_2.commit() # doctest: +ELLIPSIS
... except (ConflictError, InvalidObjectReference):
...     print("Conflict Error")
Conflict Error
>>> transactionmanager_2.abort()
>>> connection_1.sync()
>>> connection_2.sync()
>>> list(q_1)
[1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4, 5]
>>> list(q_2)
[1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4, 5]

用户还可以并发地从队列中删除项目...

>>> q_1.pull()
1000
>>> q_1[0]
1001
>>> q_2.pull(5)
0
>>> q_2[5]
1
>>> q_2[0] # 1000 value still there in this connection
1000
>>> q_1[4] # 0 value still there in this connection.
0
>>> transactionmanager_1.commit()
>>> transactionmanager_2.commit()
>>> connection_1.sync()
>>> connection_2.sync()
>>> list(q_1)
[1001, 1002, 1003, 1004, 1, 2, 3, 4, 5]
>>> list(q_2)
[1001, 1002, 1003, 1004, 1, 2, 3, 4, 5]

...只要他们不删除相同的项。

>>> q_1.pull()
1001
>>> q_2.pull()
1001
>>> transactionmanager_1.commit()
>>> transactionmanager_2.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ConflictError: ...
>>> transactionmanager_2.abort()
>>> connection_1.sync()
>>> connection_2.sync()
>>> list(q_1)
[1002, 1003, 1004, 1, 2, 3, 4, 5]
>>> list(q_2)
[1002, 1003, 1004, 1, 2, 3, 4, 5]

实际上有一个特殊情况:如果复合队列的桶在非空状态下开始,并且两个新状态中的一个是空的,则它们将拒绝合并。这是为了防止队列中添加项的丢失。请参阅tests.py以获取示例。

同样重要的是,用户可以并发地从队列中删除和添加项。

>>> q_1.pull()
1002
>>> q_1.pull()
1003
>>> q_1.pull()
1004
>>> q_2.put(Item(6))
>>> q_2.put(Item(7))
>>> transactionmanager_1.commit()
>>> transactionmanager_2.commit()
>>> connection_1.sync()
>>> connection_2.sync()
>>> list(q_1)
[1, 2, 3, 4, 5, 6, 7]
>>> list(q_2)
[1, 2, 3, 4, 5, 6, 7]

作为最后的例子,我们将展示在极端压力下解决冲突的代码,有多个同时的存入和取出操作。

>>> res_1 = []
>>> for i in range(6, -1, -2):
...     res_1.append(q_1.pull(i))
...
>>> res_1
[7, 5, 3, 1]
>>> res_2 = []
>>> for i in range(5, 0, -2):
...     res_2.append(q_2.pull(i))
...
>>> res_2
[6, 4, 2]
>>> for i in range(8, 12):
...     q_1.put(Item(i))
...
>>> for i in range(12, 16):
...     q_2.put(Item(i))
...
>>> list(q_1)
[2, 4, 6, 8, 9, 10, 11]
>>> list(q_2)
[1, 3, 5, 7, 12, 13, 14, 15]
>>> transactionmanager_1.commit()
>>> transactionmanager_2.commit()
>>> connection_1.sync()
>>> connection_2.sync()

在这些提交之后,如果队列是Queue,新值将按照它们的提交顺序排列。然而,如上所述,如果队列是CompositeQueue,则行为不同。虽然顺序将相对保持不变–所以在提交时,如果对象A在队列中位于对象B之前,则在冲突事务的合并之后,A仍然位于B之前–值可能会在两个事务之间交错。

例如,如果我们的示例队列是Queue,值将是[8, 9, 10, 11, 12, 13, 14, 15]。然而,如果它是CompositeQueue,值可能相同,或者可能是任何组合,其中[8, 9, 10, 11]和[12, 13, 14, 15]从两个事务中仍然保持顺序。一个可能的顺序可能是[8, 9, 12, 13, 10, 11, 14, 15],例如。

>>> if isinstance(q_1, queue.Queue):
...     res_1 = list(q_1)
...     res_2 = list(q_2)
... elif isinstance(q_1, queue.CompositeQueue):
...     firstsrc_1 = list(q_1)
...     firstsrc_2 = list(q_2)
...     secondsrc_1 = firstsrc_1[:]
...     secondsrc_2 = firstsrc_2[:]
...     for val in [12, 13, 14, 15]:
...         firstsrc_1.remove(Item(val))
...         firstsrc_2.remove(Item(val))
...     for val in [8, 9, 10, 11]:
...         secondsrc_1.remove(Item(val))
...         secondsrc_2.remove(Item(val))
...     res_1 = firstsrc_1 + secondsrc_1
...     res_2 = firstsrc_2 + secondsrc_2
...
>>> res_1
[8, 9, 10, 11, 12, 13, 14, 15]
>>> res_2
[8, 9, 10, 11, 12, 13, 14, 15]
>>> db.close() # cleanup

持久引用代理

由于缺少__hash__方法,ZODB.ConflictResolution.PersistentReferenceset中得不到适当的处理,因此我们定义了一个利用包含项的__cmp__方法的类[3]

让我们创建一些持久引用对象。还要创建一些具有相同oid的对象以模拟不同的事务状态。

>>> from zc.queue.tests import StubPersistentReference
>>> pr1 = StubPersistentReference(1)
>>> pr2 = StubPersistentReference(2)
>>> pr3 = StubPersistentReference(3)
>>> pr_c1 = StubPersistentReference(1)
>>> pr_c2 = StubPersistentReference(2)
>>> pr_c3 = StubPersistentReference(3)
>>> pr1 == pr_c1
True
>>> pr2 == pr_c2
True
>>> pr3 == pr_c3
True
>>> id(pr1) == id(pr_c1)
False
>>> id(pr2) == id(pr_c2)
False
>>> id(pr3) == id(pr_c3)
False
>>> set1 = set((pr1, pr2))
>>> set1
set([SPR (1), SPR (2)])
>>> len(set1)
2
>>> set2 = set((pr_c1, pr_c3))
>>> set2
set([SPR (1), SPR (3)])
>>> len(set2)
2
>>> set_c1 = set((pr_c1, pr_c2))
>>> set_c1
set([SPR (1), SPR (2)])
>>> len(set_c1)
2

set不能正确处理持久引用对象。所有随后的集合操作都会产生错误结果。

去重(请注意,对于长度超过两个的项,我们只检查长度和内容,而不是表示的顺序,因为这在不同的Python版本中是不同的)

>>> set((pr1, pr_c1))
set([SPR (1), SPR (1)])
>>> set((pr2, pr_c2))
set([SPR (2), SPR (2)])
>>> set4 = set((pr1, pr_c1, pr2))
>>> len(set4)
3
>>> pr1 in set4 and pr_c1 in set4 and pr2 in set4
True
>>> set4 = set((pr1, pr2, pr3, pr_c1, pr_c2, pr_c3))
>>> len(set4)
6

减法操作

>>> set3 = set1 - set2
>>> len(set3)
2
>>> set3
set([SPR (1), SPR (2)])

包含

>>> pr3 in set2
False

交集

>>> set1 & set2
set([])

比较

>>> set1 == set_c1
False

因此,我们创建了一个包装PersistentReferencePersistentReferenceProxy,以便与集合一起使用。

>>> from zc.queue._queue import PersistentReferenceProxy
>>> prp1 = PersistentReferenceProxy(pr1)
>>> prp2 = PersistentReferenceProxy(pr2)
>>> prp3 = PersistentReferenceProxy(pr3)
>>> prp_c1 = PersistentReferenceProxy(pr_c1)
>>> prp_c2 = PersistentReferenceProxy(pr_c2)
>>> prp_c3 = PersistentReferenceProxy(pr_c3)
>>> prp1 == prp_c1
True
>>> prp2 == prp_c2
True
>>> prp3 == prp_c3
True
>>> id(prp1) == id(prp_c1)
False
>>> id(prp2) == id(prp_c2)
False
>>> id(prp3) == id(prp_c3)
False
>>> set1 = set((prp1, prp2))
>>> set1
set([SPR (1), SPR (2)])
>>> len(set1)
2
>>> set2 = set((prp_c1, prp_c3))
>>> set2
set([SPR (1), SPR (3)])
>>> len(set2)
2
>>> set_c1 = set((prp_c1, prp_c2))
>>> set_c1
set([SPR (1), SPR (2)])
>>> len(set_c1)
2

set现在可以正确处理持久引用。所有随后的集合操作都将产生正确的结果。

去重

>>> set4 = set((prp1, prp2, prp3, prp_c1, prp_c2, prp_c3))
>>> len(set4)
3
>>> set((prp1, prp_c1))
set([SPR (1)])
>>> set((prp2, prp_c2))
set([SPR (2)])
>>> set((prp1, prp_c1, prp2))
set([SPR (1), SPR (2)])

减法操作

>>> set3 = set1 - set2
>>> len(set3)
1
>>> set3
set([SPR (2)])
>>> set1 - set1
set([])
>>> set2 - set3
set([SPR (1), SPR (3)])
>>> set3 - set2
set([SPR (2)])

包含

>>> prp3 in set2
True
>>> prp1 in set2
True
>>> prp_c1 in set2
True
>>> prp2 not in set2
True

交集

>>> set1 & set2
set([SPR (1)])
>>> set1 & set_c1
set([SPR (1), SPR (2)])
>>> set2 & set3
set([])

比较

>>> set1 == set_c1
True
>>> set1 == set2
False
>>> set1 == set4
False

CHANGES

2.0.1(未发布)

  • 尚未有任何更改。

2.0.0 (2017-05-11)

  • 停止支持 Python 2.6 和 3.3。

  • 添加了对 Python 3.4、3.5、3.6 和 PyPy 的支持。

  • 修复了在 CompositeQueue 中使用复杂数据切片(例如,负步长)时的内存使用增加问题。

2.0.0a1(2013-03-01)

  • 添加了对 Python 3.3 的支持。

  • 将已弃用的 zope.interface.implements 使用替换为等效的 zope.interface.implementer 装饰器。

  • 停止支持 Python 2.4 和 2.5。

  • 修复了由于编程错误而导致切片复合队列失败的问题。[malthe]

1.3 (2012-01-11)

  • 修复了一个未正确处理 ZODB.ConflictResolution.PersistentReference 的冲突解决错误。请注意,由于语法,我们现在需要 Python 2.5 或更高版本。

1.2.1 (2011-12-17)

  • 修复了 setup.py 中的 ImportError。[maurits]

1.2 (2011-12-17)

  • 修复了测试中未定义的 ZODB.POSException.StorageTransactionError。[maurits]

  • 让测试通过 ZODB 3.8 和 ZODB 3.9。[maurits]

  • 添加了测试额外声明,以声明对 zope.testing 的测试依赖项。

  • 使用 Python 的 doctest 模块而不是已弃用的 zope.testing.doctest

  • 清理了 reST 文档的生成。

1.1

  • 修复了 CompositeQueue 中的冲突解决错误。

  • 将 PersistentQueue 重命名为 Queue,将 CompositePersistentQueue 重命名为 CompositeQueue。旧名称名义上已弃用,尽管没有生成警告,也没有计划消除它们。PersistentQueue 类的冲突解决比以前更保守。(Queue 类具有与 PersistentQueue 以前相同的冲突解决。)

1.0.1

  • 构建系统的一些小变化。

  • 首次发布到 PyPI。

1.0

  • 首次发布到 zope.org。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定该选择哪一个,请了解有关 安装包 的更多信息。

源代码分发

zc.queue-2.0.0.tar.gz (29.7 kB 查看哈希值)

上传时间 源代码

构建分发

zc.queue-2.0.0-py2.py3-none-any.whl (29.1 kB 查看哈希值)

上传时间 Python 2 Python 3

支持