跳转到主要内容

==================

项目描述

这是SQS的一个小型包装器,提供了一些测试支持和boto SQS API的一些抽象。

有两个基本部分,生产者API和工作者API。

注意,这些API不允许您传递AWS凭证。这意味着您必须通过 ~/.boto 配置、环境变量或通过EC2实例角色提供的临时凭证传递凭证。

生产作业

要将工作发送到工作者,实例化一个队列

>>> import zc.sqs
>>> queue = zc.sqs.Queue("myqueue")
Connected to region us-east-1.

SQS队列必须已经存在。创建队列超出了这些API的范围。尝试使用不存在的队列名称创建队列实例将导致引发异常。

>>> import mock
>>> with mock.patch("boto.sqs.connect_to_region") as conn:
...     conn().get_queue.return_value = None
...     zc.sqs.Queue("nonexistent") # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
NonExistentQueue: nonexistent

要将数据放入队列,您可以调用它。您可以传递位置参数和/或关键字参数。

>>> queue(1, 2, x=3)
[[1, 2], {'x': 3}]

在这个例子中,我们在测试模式下运行。在测试模式下,数据只是简单地回显(除非我们设置了一个工作者,如以下所述)。

参数必须是可json编码的。

工作者

工作者由接受配置数据的工厂提供,并返回用队列消息调用的可调用对象。工作者工厂可以使用具有 __init__ 和 __call__ 方法的类实现,也可以使用接受配置数据并返回处理消息的嵌套函数的函数实现。

通常,工作者不会返回任何内容。如果输入不良,工作者应该抛出异常。异常和输入数据都会被记录。如果输入良好,但工作者无法执行请求,它应该抛出zc.sqs.TransientError以指示稍后重试工作。

容器

要将您的工作者连接到队列,您需要使用一个容器,这是一个轮询SQS队列并调用您的程序的应用程序。目前有2个容器

顺序

顺序容器从SQS队列中拉取请求并将它们逐个交给工作者。

这是一个脚本入口点,接受一个包含ini文件路径的参数列表。它使用“长轮询”来高效循环。

测试

测试容器用于编写测试。它支持生产者和工作者代码的集成测试。在测试模式下运行时,它将替换(部分)顺序容器。

顺序入口点接受一个包含2个部分的ini文件名

容器

容器部分使用选项配置容器

工作者 MODULE:expr

工作者构造函数

队列

要监听的SQS队列的名称。

记录器

基于ZConfig的记录器配置字符串。

工作者(可选)

工作者选项,作为字典传递给工作者构造函数。

如果没有提供,将传递一个空字典。

以下是一个简单的(无意义的)示例,说明如何连接这些。首先,我们将定义一个工作者工厂

def scaled_addr(config):
    scale = float(config.get('scale', 1))

    def add(a, b, x):
        if x == 'later':
            print ("not now")
            raise zc.sqs.TransientError # Not very imaginative, I know
        print (scale * (a + b + x))

    return add

现在,我们将定义一个容器配置

[container]
worker = zc.sqs.tests:scaled_addr
queue = adder
loggers =
   <logger>
     level INFO
     <logfile>
       path STDOUT
       format %(levelname)s %(name)s %(message)s
     </logfile>
   </logger>
   <logger>
     level INFO
     propagate false
     name zc.sqs.messages
     <logfile>
       path messages.log
       format %(message)s
     </logfile>
   </logger>

[worker]
scale = 2

现在,我们将运行容器。

>>> import zc.thread
>>> @zc.thread.Thread
... def thread():
...     zc.sqs.sequential(['ini'])

我们因为在线程中运行容器,因为它会一直运行而不会返回。

通常,入口点会一直运行,但由于我们在测试模式下运行,容器只是将工作者连接到测试环境。

现在,如果我们创建一个队列(在测试模式下)

>>> adder = zc.sqs.Queue("adder")
Connected to region us-east-1.

并发送它工作

>>> adder(1, 2, 3)
12.0
deleted '[[1, 2, 3], {}]'

我们看到工作者已经运行。

我们还看到了一个测试消息,表明测试成功。

如果工作者无法立即执行操作,它将指示通过抛出上例中的TransientError来延迟消息

>>> adder(1, 2, 'later')
not now

在这种情况下,由于工作者抛出了TransientError,消息没有被从队列中删除。这意味着它将在作业超时时稍后处理。

如果工作者抛出异常,异常和消息将被记录

>>> adder(1, 2, '') # doctest: +ELLIPSIS
ERROR zc.sqs Handling a message
Traceback (most recent call last):
...
TypeError: unsupported operand type(s) for +: 'int' and '...'
deleted '[[1, 2, ""], {}]'
>>> with open("messages.log") as f:
...     print(f.read())
[[1, 2, ""], {}]
<BLANKLINE>

静默测试

有时,您不希望在发送消息时测试基础设施输出信息。测试setUp方法将一个sqs_queues属性添加到全局。您可以调用be_silent使其停止输出信息

>>> sqs_queues.be_silent()

调用此方法后,任何后续队列都将保持安静

>>> queue = zc.sqs.Queue("quiet")
>>> queue(1)

您可以获取队列数据

>>> [m.get_body() for m in sqs_queues.get_queue("quiet").get_messages()]
['[[1], {}]']

您可以切换回嘈杂模式

>>> sqs_queues.be_silent()
>>> queue = zc.sqs.Queue("loud")
>>> queue(1)

变更

1.0.0

  • Python 3支持。

0.3.0 (2014-10-17)

  • 使用长轮询而不是可配置的轮询间隔。

0.2.1 (2013-05-15)

  • 当SQS队列不存在时,更好的错误处理。

0.2.0 (2013-05-15)

  • 为测试队列添加了新的静默模式。

0.1.0 (2013-04-23)

初始发布。

项目详情


下载文件

为您的平台下载文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

zc.sqs-1.0.0.tar.gz (8.1 kB 查看哈希)

上传时间

由以下支持