通过ZODB优化的队列。
项目描述
持久队列
持久队列是针对持久性通过ZODB优化的队列。它们假定ZODB正在使用MVCC来避免读取冲突。它们尝试解决写冲突,以便添加和同时删除对象的交易合并,除非交易试图从队列中删除相同的值。
这些队列的一个重要特征是,它们不会期望在任何给定等效项目上同时持有多个引用。例如,如果您的应用程序有理由在同一队列中同时保留两个“hello”字符串的副本,则某些冲突解决功能可能无法按预期执行[1]。
该模块提供两种口味:一种简单的持久队列,其中所有包含的对象都在一个持久对象中保持(队列),以及将内容分割成多个复合元素的持久队列(复合队列)。在API方面,它们应该是等效的,因此在此文档中主要以抽象形式进行探讨:我们将使用代表性的队列工厂生成实例,它可以是任何类。它们仅在写冲突解决行为方面有所不同,下面将讨论这一点。
队列可以无参数实例化。
>>> q = Queue() >>> from zc.queue.interfaces import IQueue >>> from zope.interface.verify import verifyObject >>> verifyObject(IQueue, q) True
基本API很简单:使用put向队列末尾添加项目,使用pull从队列中拉取项目,默认为队列的前端。请注意,Item可以是持久对象或非持久对象。
>>> q.put(Item(1)) >>> q.put(Item(2)) >>> q.pull() 1 >>> q.put(Item(3)) >>> q.pull() 2 >>> q.pull() 3
pull方法接受一个可选的基于零的索引参数,并可以接受负值。
>>> q.put(Item(4)) >>> q.put(Item(5)) >>> q.put(Item(6)) >>> q.pull(-1) 6 >>> q.pull(1) 5 >>> q.pull(0) 4
从空队列请求项目会引发IndexError。
>>> q.pull() # doctest: +ELLIPSIS Traceback (most recent call last): ... IndexError: ...
请求无效的索引值会产生相同的结果。
>>> q.put(Item(7)) >>> q.put(Item(8)) >>> q.pull(2) # doctest: +ELLIPSIS Traceback (most recent call last): ... IndexError: ...
除了这些核心队列操作之外,队列支持len…
>>> len(q) 2 >>> q.pull() 7 >>> len(q) 1 >>> q.pull() 8 >>> len(q) 0
…iter(它不会清空队列)…
>>> next(iter(q)) Traceback (most recent call last): ... StopIteration >>> q.put(Item(9)) >>> q.put(Item(10)) >>> q.put(Item(11)) >>> next(iter(q)) 9 >>> [i for i in q] [9, 10, 11] >>> q.pull() 9 >>> [i for i in q] [10, 11]
…bool…
>>> bool(q) True >>> q.pull() 10 >>> q.pull() 11 >>> bool(q) False
…以及类似列表的括号访问(它再次不会清空队列)。
>>> q.put(Item(12)) >>> q[0] 12 >>> q.pull() 12 >>> q[0] # doctest: +ELLIPSIS Traceback (most recent call last): ... IndexError: ... >>> for i in range (13, 23): ... q.put(Item(i)) ... >>> q[0] 13 >>> q[1] 14 >>> q[2] 15 >>> q[-1] 22 >>> q[-10] 13
就是这样——除了put之外没有其他添加任何内容的方法,除了pull之外没有其他移除任何内容的方法。
唯一的其他问题是冲突解决代码。ZODB中的冲突解决有一些一般性的注意事项,您应该了解[2]。
除了这些一般性的注意事项之外,我们现在将检查一些zc.queue冲突解决的示例。为了展示这一点,我们必须有两个相同队列的副本,来自两个不同的连接。
注意:这种测试方法有已知的弱点。请参阅tests.py中的讨论。
>>> import transaction >>> from zc.queue.tests import ConflictResolvingMappingStorage >>> from ZODB import DB >>> db = DB(ConflictResolvingMappingStorage('test')) >>> transactionmanager_1 = transaction.TransactionManager() >>> transactionmanager_2 = transaction.TransactionManager() >>> connection_1 = db.open(transaction_manager=transactionmanager_1) >>> root_1 = connection_1.root()>>> q_1 = root_1["queue"] = Queue() >>> transactionmanager_1.commit()>>> transactionmanager_2 = transaction.TransactionManager() >>> connection_2 = db.open(transaction_manager=transactionmanager_2) >>> root_2 = connection_2.root() >>> q_2 = root_2['queue']
现在我们有相同队列的两个副本,分别具有单独的事务管理器和连接,就像两个线程会有的一样。' _1 '后缀标识用户1的对象,在第一个线程中;而' _2 '后缀标识用户2的对象,在并发线程2中。
首先,让我们让两个用户同时向队列中添加一些项目。对于仅包含单个新项目的并发提交(每个交易一个),在两种类型的队列中,首先提交的用户将获得队列中较低的位置——也就是说,将使用默认的pull调用最早离开队列的位置。
在此示例中,尽管q_1首先被修改,但q_2的交易首先提交,因此合并后的q_2的添加是第一个。
>>> q_1.put(Item(1001)) >>> q_2.put(Item(1000)) >>> transactionmanager_2.commit() >>> transactionmanager_1.commit() >>> connection_1.sync() >>> connection_2.sync() >>> list(q_1) [1000, 1001] >>> list(q_2) [1000, 1001]
对于每个连接两次以上的提交,或两个以上的并发添加交易,Queue的行为是相同的:第一次提交的添加将位于第二次提交之前。
>>> from zc import queue >>> if isinstance(q_1, queue.Queue): ... for i in range(5): ... q_1.put(Item(i)) ... for i in range(1002, 1005): ... q_2.put(Item(i)) ... transactionmanager_2.commit() ... transactionmanager_1.commit() ... connection_1.sync() ... connection_2.sync() ...
如下所示,这将再次可靠地将第一次提交的所有值放在队列中较早的位置,结果为[1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]。
对于CompositeQueue,其行为不同。在事务中添加一系列元素时,顺序将得到保持,但两个事务添加的值可能会在两者之间合并。我们将在这里进行补偿,以获得可靠的队列状态。
>>> if isinstance(q_1, queue.CompositeQueue): ... for i1, i2 in ((1002, 1003), (1004, 0), (1, 2), (3, 4)): ... q_1.put(Item(i1)) ... q_2.put(Item(i2)) ... transactionmanager_1.commit() ... transactionmanager_2.commit() ... connection_1.sync() ... connection_2.sync() ...
无论我们有什么样的队列,我们现在都有以下值。
>>> list(q_1) [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4] >>> list(q_2) [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]
如果有两个用户尝试添加相同的项,则会引发冲突错误。
>>> five = Item(5) >>> q_1.put(five) >>> q_2.put(five) >>> transactionmanager_1.commit() >>> from ZODB.POSException import ConflictError, InvalidObjectReference >>> try: ... transactionmanager_2.commit() # doctest: +ELLIPSIS ... except (ConflictError, InvalidObjectReference): ... print("Conflict Error") Conflict Error >>> transactionmanager_2.abort() >>> connection_1.sync() >>> connection_2.sync() >>> list(q_1) [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4, 5] >>> list(q_2) [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4, 5]
用户还可以并发地从队列中删除项目...
>>> q_1.pull() 1000 >>> q_1[0] 1001>>> q_2.pull(5) 0 >>> q_2[5] 1>>> q_2[0] # 1000 value still there in this connection 1000>>> q_1[4] # 0 value still there in this connection. 0>>> transactionmanager_1.commit() >>> transactionmanager_2.commit() >>> connection_1.sync() >>> connection_2.sync() >>> list(q_1) [1001, 1002, 1003, 1004, 1, 2, 3, 4, 5] >>> list(q_2) [1001, 1002, 1003, 1004, 1, 2, 3, 4, 5]
...只要他们不删除相同的项。
>>> q_1.pull() 1001 >>> q_2.pull() 1001 >>> transactionmanager_1.commit() >>> transactionmanager_2.commit() # doctest: +ELLIPSIS Traceback (most recent call last): ... ConflictError: ... >>> transactionmanager_2.abort() >>> connection_1.sync() >>> connection_2.sync() >>> list(q_1) [1002, 1003, 1004, 1, 2, 3, 4, 5] >>> list(q_2) [1002, 1003, 1004, 1, 2, 3, 4, 5]
实际上有一个特殊情况:如果复合队列的桶在非空状态下开始,并且两个新状态中的一个是空的,则它们将拒绝合并。这是为了防止队列中添加项的丢失。请参阅tests.py以获取示例。
同样重要的是,用户可以并发地从队列中删除和添加项。
>>> q_1.pull() 1002 >>> q_1.pull() 1003 >>> q_1.pull() 1004 >>> q_2.put(Item(6)) >>> q_2.put(Item(7)) >>> transactionmanager_1.commit() >>> transactionmanager_2.commit() >>> connection_1.sync() >>> connection_2.sync() >>> list(q_1) [1, 2, 3, 4, 5, 6, 7] >>> list(q_2) [1, 2, 3, 4, 5, 6, 7]
作为最后的例子,我们将展示在极端压力下解决冲突的代码,有多个同时的存入和取出操作。
>>> res_1 = [] >>> for i in range(6, -1, -2): ... res_1.append(q_1.pull(i)) ... >>> res_1 [7, 5, 3, 1] >>> res_2 = [] >>> for i in range(5, 0, -2): ... res_2.append(q_2.pull(i)) ... >>> res_2 [6, 4, 2] >>> for i in range(8, 12): ... q_1.put(Item(i)) ... >>> for i in range(12, 16): ... q_2.put(Item(i)) ... >>> list(q_1) [2, 4, 6, 8, 9, 10, 11] >>> list(q_2) [1, 3, 5, 7, 12, 13, 14, 15] >>> transactionmanager_1.commit() >>> transactionmanager_2.commit() >>> connection_1.sync() >>> connection_2.sync()
在这些提交之后,如果队列是Queue,新值将按照它们的提交顺序排列。然而,如上所述,如果队列是CompositeQueue,则行为不同。虽然顺序将相对保持不变–所以在提交时,如果对象A在队列中位于对象B之前,则在冲突事务的合并之后,A仍然位于B之前–值可能会在两个事务之间交错。
例如,如果我们的示例队列是Queue,值将是[8, 9, 10, 11, 12, 13, 14, 15]。然而,如果它是CompositeQueue,值可能相同,或者可能是任何组合,其中[8, 9, 10, 11]和[12, 13, 14, 15]从两个事务中仍然保持顺序。一个可能的顺序可能是[8, 9, 12, 13, 10, 11, 14, 15],例如。
>>> if isinstance(q_1, queue.Queue): ... res_1 = list(q_1) ... res_2 = list(q_2) ... elif isinstance(q_1, queue.CompositeQueue): ... firstsrc_1 = list(q_1) ... firstsrc_2 = list(q_2) ... secondsrc_1 = firstsrc_1[:] ... secondsrc_2 = firstsrc_2[:] ... for val in [12, 13, 14, 15]: ... firstsrc_1.remove(Item(val)) ... firstsrc_2.remove(Item(val)) ... for val in [8, 9, 10, 11]: ... secondsrc_1.remove(Item(val)) ... secondsrc_2.remove(Item(val)) ... res_1 = firstsrc_1 + secondsrc_1 ... res_2 = firstsrc_2 + secondsrc_2 ... >>> res_1 [8, 9, 10, 11, 12, 13, 14, 15] >>> res_2 [8, 9, 10, 11, 12, 13, 14, 15]>>> db.close() # cleanup
持久引用代理
由于缺少__hash__
方法,ZODB.ConflictResolution.PersistentReference
在set
中得不到适当的处理,因此我们定义了一个利用包含项的__cmp__
方法的类[3]。
让我们创建一些持久引用对象。还要创建一些具有相同oid的对象以模拟不同的事务状态。
>>> from zc.queue.tests import StubPersistentReference >>> pr1 = StubPersistentReference(1) >>> pr2 = StubPersistentReference(2) >>> pr3 = StubPersistentReference(3) >>> pr_c1 = StubPersistentReference(1) >>> pr_c2 = StubPersistentReference(2) >>> pr_c3 = StubPersistentReference(3)>>> pr1 == pr_c1 True >>> pr2 == pr_c2 True >>> pr3 == pr_c3 True >>> id(pr1) == id(pr_c1) False >>> id(pr2) == id(pr_c2) False >>> id(pr3) == id(pr_c3) False>>> set1 = set((pr1, pr2)) >>> set1 set([SPR (1), SPR (2)]) >>> len(set1) 2 >>> set2 = set((pr_c1, pr_c3)) >>> set2 set([SPR (1), SPR (3)]) >>> len(set2) 2 >>> set_c1 = set((pr_c1, pr_c2)) >>> set_c1 set([SPR (1), SPR (2)]) >>> len(set_c1) 2
set
不能正确处理持久引用对象。所有随后的集合操作都会产生错误结果。
去重(请注意,对于长度超过两个的项,我们只检查长度和内容,而不是表示的顺序,因为这在不同的Python版本中是不同的)
>>> set((pr1, pr_c1)) set([SPR (1), SPR (1)]) >>> set((pr2, pr_c2)) set([SPR (2), SPR (2)]) >>> set4 = set((pr1, pr_c1, pr2)) >>> len(set4) 3 >>> pr1 in set4 and pr_c1 in set4 and pr2 in set4 True >>> set4 = set((pr1, pr2, pr3, pr_c1, pr_c2, pr_c3)) >>> len(set4) 6
减法操作
>>> set3 = set1 - set2 >>> len(set3) 2 >>> set3 set([SPR (1), SPR (2)])
包含
>>> pr3 in set2 False
交集
>>> set1 & set2 set([])
比较
>>> set1 == set_c1 False
因此,我们创建了一个包装PersistentReference
的PersistentReferenceProxy
,以便与集合一起使用。
>>> from zc.queue._queue import PersistentReferenceProxy >>> prp1 = PersistentReferenceProxy(pr1) >>> prp2 = PersistentReferenceProxy(pr2) >>> prp3 = PersistentReferenceProxy(pr3) >>> prp_c1 = PersistentReferenceProxy(pr_c1) >>> prp_c2 = PersistentReferenceProxy(pr_c2) >>> prp_c3 = PersistentReferenceProxy(pr_c3) >>> prp1 == prp_c1 True >>> prp2 == prp_c2 True >>> prp3 == prp_c3 True >>> id(prp1) == id(prp_c1) False >>> id(prp2) == id(prp_c2) False >>> id(prp3) == id(prp_c3) False>>> set1 = set((prp1, prp2)) >>> set1 set([SPR (1), SPR (2)]) >>> len(set1) 2 >>> set2 = set((prp_c1, prp_c3)) >>> set2 set([SPR (1), SPR (3)]) >>> len(set2) 2 >>> set_c1 = set((prp_c1, prp_c2)) >>> set_c1 set([SPR (1), SPR (2)]) >>> len(set_c1) 2
set
现在可以正确处理持久引用。所有随后的集合操作都将产生正确的结果。
去重
>>> set4 = set((prp1, prp2, prp3, prp_c1, prp_c2, prp_c3)) >>> len(set4) 3 >>> set((prp1, prp_c1)) set([SPR (1)]) >>> set((prp2, prp_c2)) set([SPR (2)]) >>> set((prp1, prp_c1, prp2)) set([SPR (1), SPR (2)])
减法操作
>>> set3 = set1 - set2 >>> len(set3) 1 >>> set3 set([SPR (2)]) >>> set1 - set1 set([]) >>> set2 - set3 set([SPR (1), SPR (3)]) >>> set3 - set2 set([SPR (2)])
包含
>>> prp3 in set2 True >>> prp1 in set2 True >>> prp_c1 in set2 True >>> prp2 not in set2 True
交集
>>> set1 & set2 set([SPR (1)]) >>> set1 & set_c1 set([SPR (1), SPR (2)]) >>> set2 & set3 set([])
比较
>>> set1 == set_c1 True >>> set1 == set2 False >>> set1 == set4 False
CHANGES
2.0.1(未发布)
尚未有任何更改。
2.0.0 (2017-05-11)
停止支持 Python 2.6 和 3.3。
添加了对 Python 3.4、3.5、3.6 和 PyPy 的支持。
修复了在 CompositeQueue 中使用复杂数据切片(例如,负步长)时的内存使用增加问题。
2.0.0a1(2013-03-01)
添加了对 Python 3.3 的支持。
将已弃用的 zope.interface.implements 使用替换为等效的 zope.interface.implementer 装饰器。
停止支持 Python 2.4 和 2.5。
修复了由于编程错误而导致切片复合队列失败的问题。[malthe]
1.3 (2012-01-11)
修复了一个未正确处理 ZODB.ConflictResolution.PersistentReference 的冲突解决错误。请注意,由于语法,我们现在需要 Python 2.5 或更高版本。
1.2.1 (2011-12-17)
修复了 setup.py 中的 ImportError。[maurits]
1.2 (2011-12-17)
修复了测试中未定义的 ZODB.POSException.StorageTransactionError。[maurits]
让测试通过 ZODB 3.8 和 ZODB 3.9。[maurits]
添加了测试额外声明,以声明对 zope.testing 的测试依赖项。
使用 Python 的 doctest 模块而不是已弃用的 zope.testing.doctest。
清理了 reST 文档的生成。
1.1
修复了 CompositeQueue 中的冲突解决错误。
将 PersistentQueue 重命名为 Queue,将 CompositePersistentQueue 重命名为 CompositeQueue。旧名称名义上已弃用,尽管没有生成警告,也没有计划消除它们。PersistentQueue 类的冲突解决比以前更保守。(Queue 类具有与 PersistentQueue 以前相同的冲突解决。)
1.0.1
构建系统的一些小变化。
首次发布到 PyPI。
1.0
首次发布到 zope.org。
项目详情
下载文件
下载适用于您平台的文件。如果您不确定该选择哪一个,请了解有关 安装包 的更多信息。