使用mongo作为后端存储的队列。
项目描述
属性
隔离
不同消费者不会处理相同消息。
可靠性
失败的消费者不会消失一个项目。
原子性
对队列的操作是原子的。
用法
可以使用mongo集合和消费者标识符创建队列实例。消费者标识符有助于区分从队列中获取工作的多个队列消费者
>> from pymongo import Connection >> from mongoqueue import MongoQueue >> queue = MongoQueue( ... Connection(TEST_DB).doctest_queue, ... consumer_id="consumer-1", ... timeout=300, ... max_attempts=3)
在MongoQueue类的timeout参数中指定作业在消费者处保持多长时间(以秒为单位),在将其视为失败之前。
超时或错误超过max_attempts参数的作业被视为永久失败,将不再被处理。
可以通过传递一个字典将新作业/项目放入队列
>> queue.put({"foobar": 1})
可以在字典中指定具有整数值的作业priority键,这将导致在处理较低优先级项之前处理作业
>> queue.put({"foobar": 0}, priority=1})
可以通过在队列上调用next方法来检索项目。这返回一个作业对象
>> job = queue.next() >> job.payload {"foobar": 1}
作业类公开了一些作业的控制方法,用于标记进度、完成、错误或释放作业回队列。
complete 将作业标记为完成并从队列中删除。
- error 可选地指定消息,将作业释放回队列,并增加其尝试次数,并将错误消息存储在作业上
上。
- progress 可选地接受一个进度计数整数,记录作业的进度并重置锁超时
。
release 释放作业回池。不修改尝试计数器。
为了方便,该作业支持上下文管理器协议
>> with job as data: ... print data['payload'] {"foobar: 0}
如果上下文封闭退出时作业未标记为完成,如果发生异常,则错误将被存储在作业中。
灵感来源
[0] https://github.com/skiz/mongo_queue/blob/master/lib/mongo_queue.rb
[1] http://blog.boxedice.com/2011/09/28/replacing-rabbitmq-with-mongodb/
[2] http://blog.boxedice.com/2011/04/13/queueing-mongodb-using-mongodb/
[4] http://www.captaincodeman.com/2011/05/28/simple-service-bus-message-queue-mongodb/
运行测试
可以使用以下命令运行单元测试
$ python setup.py nosetests
更改
0.6.0 - 2013年2月4日 - 从作业元数据中隔离传入的数据。0.5.2 - 2012年12月9日 - 修复从pymongo 2.4的排序参数回归问题。0.5.1 - 2012年12月2日 - 修复readme数据文件的打包问题。
致谢
Kapil Thangavelu,作者和维护者Dustin Laurence,pymongo 2.4的排序修复;Jonathan Sackett,作业数据隔离。
项目详情
mongoqueue-0.7.2.tar.gz 的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 218296c7d04fbb090dd68532fbf2b2504efd1fc7b7869d0e73b27720bee721db |
|
MD5 | 81d7ee7f02c7a5952a21adf9f65b7e14 |
|
BLAKE2b-256 | 0dcef1cf86f2bf79884d4dd6bf8bca45bbc59452af7b4678bc3a1faadec75e08 |