==================
项目描述
这是SQS的一个小型包装器,提供了一些测试支持和boto SQS API的一些抽象。
有两个基本部分,生产者API和工作者API。
注意,这些API不允许您传递AWS凭证。这意味着您必须通过 ~/.boto 配置、环境变量或通过EC2实例角色提供的临时凭证传递凭证。
生产作业
要将工作发送到工作者,实例化一个队列
>>> import zc.sqs >>> queue = zc.sqs.Queue("myqueue") Connected to region us-east-1.
SQS队列必须已经存在。创建队列超出了这些API的范围。尝试使用不存在的队列名称创建队列实例将导致引发异常。
>>> import mock >>> with mock.patch("boto.sqs.connect_to_region") as conn: ... conn().get_queue.return_value = None ... zc.sqs.Queue("nonexistent") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... NonExistentQueue: nonexistent
要将数据放入队列,您可以调用它。您可以传递位置参数和/或关键字参数。
>>> queue(1, 2, x=3) [[1, 2], {'x': 3}]
在这个例子中,我们在测试模式下运行。在测试模式下,数据只是简单地回显(除非我们设置了一个工作者,如以下所述)。
参数必须是可json编码的。
工作者
工作者由接受配置数据的工厂提供,并返回用队列消息调用的可调用对象。工作者工厂可以使用具有 __init__ 和 __call__ 方法的类实现,也可以使用接受配置数据并返回处理消息的嵌套函数的函数实现。
通常,工作者不会返回任何内容。如果输入不良,工作者应该抛出异常。异常和输入数据都会被记录。如果输入良好,但工作者无法执行请求,它应该抛出zc.sqs.TransientError以指示稍后重试工作。
容器
要将您的工作者连接到队列,您需要使用一个容器,这是一个轮询SQS队列并调用您的程序的应用程序。目前有2个容器
- 顺序
顺序容器从SQS队列中拉取请求并将它们逐个交给工作者。
这是一个脚本入口点,接受一个包含ini文件路径的参数列表。它使用“长轮询”来高效循环。
- 测试
测试容器用于编写测试。它支持生产者和工作者代码的集成测试。在测试模式下运行时,它将替换(部分)顺序容器。
顺序入口点接受一个包含2个部分的ini文件名
- 容器
容器部分使用选项配置容器
- 工作者 MODULE:expr
工作者构造函数
- 队列
要监听的SQS队列的名称。
- 记录器
基于ZConfig的记录器配置字符串。
- 工作者(可选)
工作者选项,作为字典传递给工作者构造函数。
如果没有提供,将传递一个空字典。
以下是一个简单的(无意义的)示例,说明如何连接这些。首先,我们将定义一个工作者工厂
def scaled_addr(config): scale = float(config.get('scale', 1)) def add(a, b, x): if x == 'later': print ("not now") raise zc.sqs.TransientError # Not very imaginative, I know print (scale * (a + b + x)) return add
现在,我们将定义一个容器配置
[container] worker = zc.sqs.tests:scaled_addr queue = adder loggers = <logger> level INFO <logfile> path STDOUT format %(levelname)s %(name)s %(message)s </logfile> </logger> <logger> level INFO propagate false name zc.sqs.messages <logfile> path messages.log format %(message)s </logfile> </logger> [worker] scale = 2
现在,我们将运行容器。
>>> import zc.thread >>> @zc.thread.Thread ... def thread(): ... zc.sqs.sequential(['ini'])
我们因为在线程中运行容器,因为它会一直运行而不会返回。
通常,入口点会一直运行,但由于我们在测试模式下运行,容器只是将工作者连接到测试环境。
现在,如果我们创建一个队列(在测试模式下)
>>> adder = zc.sqs.Queue("adder") Connected to region us-east-1.
并发送它工作
>>> adder(1, 2, 3) 12.0 deleted '[[1, 2, 3], {}]'
我们看到工作者已经运行。
我们还看到了一个测试消息,表明测试成功。
如果工作者无法立即执行操作,它将指示通过抛出上例中的TransientError来延迟消息
>>> adder(1, 2, 'later') not now
在这种情况下,由于工作者抛出了TransientError,消息没有被从队列中删除。这意味着它将在作业超时时稍后处理。
如果工作者抛出异常,异常和消息将被记录
>>> adder(1, 2, '') # doctest: +ELLIPSIS ERROR zc.sqs Handling a message Traceback (most recent call last): ... TypeError: unsupported operand type(s) for +: 'int' and '...' deleted '[[1, 2, ""], {}]'>>> with open("messages.log") as f: ... print(f.read()) [[1, 2, ""], {}] <BLANKLINE>
静默测试
有时,您不希望在发送消息时测试基础设施输出信息。测试setUp方法将一个sqs_queues属性添加到全局。您可以调用be_silent使其停止输出信息
>>> sqs_queues.be_silent()
调用此方法后,任何后续队列都将保持安静
>>> queue = zc.sqs.Queue("quiet") >>> queue(1)
您可以获取队列数据
>>> [m.get_body() for m in sqs_queues.get_queue("quiet").get_messages()] ['[[1], {}]']
您可以切换回嘈杂模式
>>> sqs_queues.be_silent()>>> queue = zc.sqs.Queue("loud") >>> queue(1)
变更
1.0.0
Python 3支持。
0.3.0 (2014-10-17)
使用长轮询而不是可配置的轮询间隔。
0.2.1 (2013-05-15)
当SQS队列不存在时,更好的错误处理。
0.2.0 (2013-05-15)
为测试队列添加了新的静默模式。
0.1.0 (2013-04-23)
初始发布。