跳转到主要内容

gocept.amqprun 帮助您编写和运行AMQP消费者,并发送AMQP消息。它目前仅支持AMQP 0-9-1,并与Zope工具包(ZTK)集成,因此您可以使用适配器、实用工具以及所有相关功能。

项目描述

https://img.shields.io/codecov/c/github/NativeInstruments/gocept.amqprun:alt:Codecov

gocept.amqprun 帮助您编写和运行 AMQP 消费者,以及发送 AMQP 消息。目前它仅支持 AMQP 0-9-1,并与 Zope 工具包(ZTK)集成,因此您可以使用适配器、工具和所有这些功能。

基本概念和术语

  • 消息处理器 是一个与一个路由键绑定到恰好一个队列的函数。它为该队列上的每个消息调用一次,并可能返回一个消息列表作为结果。

  • 一个处理的消息的结果消息与处理的消息的 ACK 一起在一个事务中发送。

  • 在消息处理过程中抛出异常时,事务被中止。(如果 RabbitMQ 支持,接收到的消息会被 NACK。)

  • 消息处理器一次处理一条消息。可以使用多个进程同时处理多个消息。(每个进程是单线程的。)

您无需关注的事情

  • 处理与 AMQP 代理通信的套接字和锁定

  • 事务处理

  • 消息 ID

    • 每个发出的消息都获得一个类似电子邮件的消息 ID。

    • 每个发出的消息的相关 ID 被设置为接收到的消息的消息 ID。

    • 每个发出的消息都获得一个自定义引用头,该头设置为接收到的消息的引用头加上接收到的消息的消息 ID。

入门:接收消息

要开始,定义一个执行工作的函数。在这个例子中,我们记录消息正文并发送一个消息。declare 装饰器接受两个参数,即队列名称和路由键(您也可以传递一个列表,将函数绑定到多个路由键)。declare 装饰器还支持一个可选的 arguments 参数,它是一个字典,将被传递给 AMQP queue_declare 调用,例如,支持 RabbitMQ 上的镜像队列。可选参数 principal 指定使用给定的主要 ID 将处理程序调用包装到 zope.security 交互中(您需要使用 [security] setup.py 额外设置来使用此集成功能)。

import logging
import gocept.amqprun.handler
import gocept.amqprun.message

log = logging.getLogger(__name__)

@gocept.amqprun.handler.declare('test.queue', 'test.routing')
def log_message_body(message):
    log.info(message.body)
    msg = gocept.amqprun.message.Message(
        header=dict(content_type='text/plain'),
        body=u'Thank you for your message.',
        routing_key='test.thank.messages')
    return [msg]

处理程序函数需要注册为一个命名实用程序。使用 ZCML 看起来像这样 [1]

<configure xmlns="http://namespaces.zope.org/zope">
  <utility component="your.package.log_message_body" name="basic" />
</configure>

建议设置服务器时创建一个 buildout。以下 buildout 为 gocept.amqprun 创建了一个配置文件,以及一个用于组件配置的 ZCML 文件,并使用 ZDaemon 将进程守护化

[buildout]
parts =
        config
        zcml
        app
        server

[deployment]
name = queue
recipe = gocept.recipe.deploymentsandbox
root = ${buildout:directory}

[config]
recipe = lovely.recipe:mkfile
path = ${deployment:etc-directory}/queue.conf

amqp-hostname = localhost
amqp-username = guest
amqp-password = guest
amqp-virtualhost = /

eventlog =
    <eventlog>
      level DEBUG
      <logfile>
        formatter zope.exceptions.log.Formatter
        path STDOUT
      </logfile>
    </eventlog>
amqp-server =
    <amqp-server>
      hostname ${:amqp-hostname}
      username ${:amqp-username}
      password ${:amqp-password}
      virtual_host ${:amqp-virtualhost}
    </amqp-server>

content =
    ${:eventlog}
    ${:amqp-server}
    <worker>
      component-configuration ${zcml:path}
    </worker>
    <settings>
      your.custom.settings here
    </settings>

[zcml]
recipe = lovely.recipe:mkfile
path = ${deployment:etc-directory}/queue.zcml
content =
    <configure xmlns="http://namespaces.zope.org/zope">
      <include package="your.package" />
    </configure>

[app]
recipe = zc.recipe.egg:script
eggs =
   gocept.amqprun
   your.package
   zope.exceptions
arguments = '${config:path}'
scripts = server=app

[server]
recipe = zc.zdaemonrecipe
deployment = deployment
program = ${buildout:bin-directory}/app

发送消息

如果您只想发送消息,无需注册任何处理程序,但可以直接使用 gocept.amqprun.server.Server.send()。虽然处理程序通常在由 server 入口点启动的自己的进程中运行(如上所述),但如果您只是发送消息,也可以跳过额外的过程,并在原始进程中运行 gocept.amqprun.server.Server。以下是一些示例代码

def start_server(**kw):
    server = gocept.amqprun.server.Server(kw, setup_handlers=False)
    server.connect()
    return server

(当您使用 ZCA 时,您可能还想在该点将 Server 注册为实用程序,以便客户端可以轻松访问它以发送消息。)

设置

对于特定于应用程序的设置,gocept.amqprun 通过 ISettings 实用程序使配置中的 <settings> 部分可用

settings = zope.component.getUtility(
    gocept.amqprun.interfaces.ISettings)
settings.get('your.settings.key')

限制

  • 目前所有消息都通过 amq.topic 交换发送和接收。目前不支持其他交换。

与文件系统交互

阅读

在 setup.py 中有一个 send_files 入口点。它可以配置三个参数:zconfig 文件的路径、监控路径和路由键。它读取监控路径中名为 new 的目录中的新文件,并以文件内容作为消息体,将文件名作为 X-Filename 标题发送到路由。已发送的文件会被移动到监控路径中名为 cur 的目录。

开发

您可以通过环境变量设置运行测试的 AMQP 服务器参数

AMQP_HOSTNAME:

默认:localhost

AMQP_USERNAME:

默认:guest

AMQP_PASSWORD:

默认:guest

AMQP_VIRTUALHOST:

默认:None,因此会创建并自动删除一个具有临时名称的 vhost(使用 AMQP_RABBITMQCTL 命令)

AMQP_RABBITMQCTL:

默认:‘sudo rabbitmqctl’

源代码可在 Mercurial 仓库中找到,网址为 https://github.com/gocept/gocept.amqprun

请将您发现的任何错误报告到 https://github.com/gocept/gocept.amqprun/issues

bin/test_sender 和 bin/test_server

test_sendertest_server 是提供基本发送和处理能力以进行烟测试当前实现行为的脚本。当启动 test_sender 时,会发出 10 条消息,这些消息被路由到 test.routingtest.server 声明了一个 test.queue,所有来自 test.routing 的消息都会发送到该队列,并且有一个处理程序记录来自 test.queue 的每条消息。

bin/test_send_files

test_send_files 启动一个服务器,该服务器监控 ./testdir/new 目录,并将复制到其中的文件作为 amqp 消息发送到 test.routing。它的入口点是 gocept.amqprun.readfiles:main

变更

3.0 (2020-10-06)

破坏性更改

  • 停止支持 Python 2.7。

  • 删除不再使用的异常 .interfaces.CounterBelowZero

功能

  • 重试在事务提交期间发生的错误,并且已配置为重新抛出。

  • 添加对 Python 3.9 的支持。

2.1 (2020-09-18)

  • 在发生 socket.error 时让服务器退出。我们在 2.0 中已经失去了重新连接功能。

2.1a1(2020-04-17)

  • 添加对 Python 3.7 和 3.8 的支持。

2.0 (2020-04-15)

  • 添加对 zope.interface >= 5 的支持。(#13

  • 将 CI 管道从 Travis 更改为 GitHub Actions。

2.0a1(2020-03-20)

破坏性更改

  • 使用 kombu/py-amqp 代替 pika 在 amqp 方法上提供基本抽象。

  • 删除多线程 - 服务器和工作者在当前进程中运行。

  • 删除 gocept.amqprun.main.main_server 作为主循环。

  • 删除 configure.zcmlmeta.zcml

  • 删除将消息写入文件和 amqp:writefile 指令。

  • 删除 setup.py 中的 writefiles 附加内容。

  • 删除 amqp:readfile zcml 指令,并为 readfiles 进程添加 send_files 入口点。它接受三个参数:配置文件路径、监控目录路径和发送文件的路径。(见 bin/test_send_files,详细信息请参阅 README.rst)。

  • 删除 .channel.Channel 并使用 amqp.channel.Channel 代替。

  • 删除 .interfaces.IChannelManager

  • 删除 .interfaces.ProcessStarted

  • 删除 .interfaces.ProcessStopping

  • 删除 .connection.Connection 并使用 kombu.Connection 代替。

  • 在zconfig配置中,将conf键heartbeat_interval重命名为heartbeat

  • 从worker zconfig中移除amount

  • 强制为unicode消息体启用content_encoding头。

  • .message.Message.header.headers重命名为.message.Message.header.application_headers

  • .testing.ZCML_LAYER重命名为.testing.ZCA_LAYER

  • 移除.testing.LoopTestCase,使用unittest.TestCase代替。

  • 移除.testing.MainTestCase.wait_for_response,使用wait_for_message代替。

  • 移除未使用的.testing.SettingsLayer

  • .connection.Parameters移动到.server,它现在返回一个字典。

功能

  • 添加bin/test_senderbin/test_server脚本来进行发送和接收行为的冒烟测试。参见README.rst。

  • .handler.Handler类添加表示。

1.8 (2019-09-11)

  • 更新到当前的bootstrap.py。

  • 提高与Python 3的向前兼容性。

  • 修复弃用警告。

  • 防止使用pytest >= 5以保持Python 2的兼容性。

  • 将仓库迁移到github。

1.7.1 (2016-08-22)

  • 修复当将字符串传递给gocept.amqprun.connection.Parameters作为端口号时的错误。

1.7 (2016-05-31)

  • 修复在worker中发生异常后消息处理器可能出现的无响应问题。异常导致计数器欠载,防止切换通道。计数器不再绑定到事务,而是绑定到消息处理。

  • 添加一个安全措施,如果通道上的引用计数低于零,则终止整个进程。

  • 在将任务放入worker队列之前获取通道,以防止可能的竞态条件。

  • 如果tpc_abort()失败,则释放提交锁。这样,下一次tpc_begin()在尝试获取锁并阻塞整个进程时不会无限期等待。

1.6 (2016-04-04)

  • 允许在.testing.QueueTestCase.send_message()中使用任意关键字参数传递给生成的消息。

1.5 (2016-02-01)

  • 添加一个testing额外功能,可以用于重用此包的测试基础设施来开发测试。

1.4 (2015-11-20)

1.3 (2015-09-11)

  • 更新测试基础设施,使其能够运行比3.1.5版新版本的RabbitMQ。

1.2 (2015-09-11)

  • 添加py.test作为测试运行器。

  • settings属性添加到IHandler中,以简化对ISettings的访问。

1.1 (2015-09-01)

  • .connection.Parameters不再接受任意关键字参数。

1.0 (2015-04-10)

  • 在完全配置了gocept.amqprun之后但在启动工作者之前,通知IConfigFinished事件。此事件可用于打开数据库连接等。

  • 将版本号提升到1.0,因为此包已在生产环境中使用了多年。

0.17.0 (2015-04-09)

0.16.0 (2015-03-25)

  • 如果在连接后连接不活动,则引发RuntimeError(这可能发生在凭据错误的情况下)。

0.15.2 (2015-02-24)

  • 修复.testing.QueueLayer.rabbitmqctl()中的错误报告。

0.15.1 (2015-01-21)

  • 将代码和bug跟踪器移动到Bitbucket

0.15.0 (2014-10-08)

  • 假装支持保存点,以便我们可以与其他支持(并需要)它们的系统协同工作。

0.14.0 (2014-09-09)

  • 引入gocept.amqprun.handler.ErrorHandlingHandler基类。

  • 在单独的ZCA中运行集成测试(MainTestCase),与其他测试可能完成的任何注册隔离。

  • 为每个测试运行在rabbitmq服务器上创建临时虚拟主机。

  • 改进了测试用例方法send_message,以接受标题作为参数。

0.13.0 (2014-04-16)

  • 引入IResponse以实现事务集成异常处理。

  • 修复了时间错误:当远程端关闭套接字同时触发通道切换时,这曾经会崩溃(因为套接字已关闭)。我们现在忽略通道切换尝试,并简单地等待重新连接,无论如何都会打开一个新的通道。

0.12.2 (2014-02-20)

  • xfilename变量添加到<amqp:writefiles pattern="">设置中。

0.12.1 (2014-02-20)

  • 添加保护措施,确保两个处理程序不能绑定到同一个队列。

0.12 (2014-02-13)

  • <amqp:writefiles>包含到事务处理中,即仅在提交时写入文件。以前,当发生错误时,例如在MessageStored事件中,会写入重复的文件(每次错误后重试处理消息时都会如此)。

0.11 (2014-01-21)

  • <amqp:readfiles>发生文件系统错误时终止进程。以前,只有文件读取线程崩溃,但进程仍在运行,没有意识到它现在不再执行其工作,因为线程已死亡。

0.10 (2013-05-28)

  • 支持单个进程中的多个服务器。

  • 为服务器引入setup_handlers参数,以便客户端可以禁用设置处理程序。

  • 修复了<amqp:readfiles>事务实现中的一个错误,该错误在回滚事务时导致崩溃(#12437)。

  • 允许测试服务器在慢速计算机上启动时间更长。

0.9.5 (2013-04-16)

  • 重构Session / DataManager职责(#9988)。

0.9.4 (2012-09-07)

  • 修复了IDataManager实现:无论事务结果如何,abort()可能被多次调用。仅释放一次通道引用计数。

0.9.3 (2012-09-07)

  • 改进了IDataManager的日志记录。

0.9.2 (2012-09-07)

  • 改进了IChannelManager.acquire/release的日志记录。

0.9.1 (2012-09-06)

  • 修复了IDataManager实现:即使没有先前的tpc_begin(),tpc_abort()也可能被调用(例如,在保存点中的错误发生时)。

  • 修复了Connection.close()的方法签名。

0.9 (2012-08-31)

  • 引入了与zope.security的可选集成:处理程序可以声明一个用于创建交互的principal id。

  • 为非响应于接收到的消息的消息使用单独的通道发送。

  • 引入了SETTINGS_LAYER,以便依赖于ISettings的测试。

0.8 (2012-04-04)

  • 修复了导致消息在不同的通道上确认而不是接收的通道的竞争条件(#10635)。

  • 修复了在服务器未正确启动之前发送消息的尝试导致的竞争条件(#10620)。

0.7 (2012-03-22)

  • 修复了在DataManager中获取当前通道和在Server中切换当前通道之间的竞争条件(#10521)。

  • 使AMQP服务器可配置以进行测试(#9232)。

0.6.1 (2012-02-23)

  • 修复了在创建引用标题时,当父消息没有引用时产生的错误(#10478)。

0.6 (2012-02-22)

功能

  • 将FileStoreReader从其自己的进程更改为使用gocep.amqprun发送的线程(以前它使用amqplib)。引入了amqp:readfiles ZCML指令。(#10177)

  • filestore额外更改为readfiles额外。

  • amqp:readfiles传输文件名作为X-Filename标题。

  • 引入了ISender实用程序。

错误

  • 修复了在0.5中引入的消息确认错误(#10030)。

内部

  • 将MainTestCase的API从create_reader更改为start_server

0.5.1 (2012-01-09)

  • 修复了支持Unicode参数用于队列声明的bug,因为pika在这里只支持字节字符串。

  • 修复了使arguments参数在amqp:writefiles中工作的问题 (#10115)。

0.5 (2011-12-08)

通用

  • 添加了writefiles扩展,使ZCML指令amqp:writefiles成为可选。

  • 添加了filestore扩展,使gocept.amqprun.filestore成为可选。

  • amqp:writefiles的声明从configure.zcml移动到meta.zcml

功能

  • gocept.amqprun.server.MessageReader重命名为gocept.amqprun.server.Server,并添加了一个send方法,以便它可以启动消息发送。

  • 添加了对队列声明参数的支持,例如支持RabbitMQ镜像队列部署的x-ha-policy头(#10036)。

内部

  • server.AMQPDataManager.__init__中更改了内部API:现在message参数是可选的,因此它被移动到参数列表的末尾。

  • 使用plone.testing进行层基础设施。

0.4.2 (2011-08-23)

  • 为FileWriter添加了处理头文件的帮助方法(针对#9443)。

0.4.1 (2011-08-22)

  • 记录消息ID。

0.4 (2011-07-25)

  • 设置输出消息的消息ID。

  • 将输出消息的关联ID设置为接收消息的消息ID(如果设置了)。

  • 将自定义头references设置为接收消息的引用头+接收消息的消息ID(类似于RFC5322中的References)。

  • 修复了损坏的测试。

  • 允许设置键中使用大写字母。

  • 扩展了AMQP服务器配置,为FileStoreReader包括凭据和虚拟主机。

  • 允许指定多个路由键(#9326)。

  • 允许指定文件名/路径模式(#9327)。

  • FileWriter除了存储正文外,还存储头信息(#9328)。

  • FileWriter发送IMessageStored事件(#9335)。

0.3 (2011-02-05)

  • 将装饰器从handle重命名为declare

  • 向MainTestCase添加了帮助方法wait_for_response

  • 添加了一个在启动期间发送的IProcessStarting事件。

  • 添加了设置处理程序将接收消息写入文件的<amqp:writefiles/>指令。

  • 添加了对<logger>指令的处理。

0.2 (2010-09-14)

  • 添加了装饰器gocept.amqprun.handler.handle(queue_name, routing_key)

0.1 (2010-08-13)

  • 第一个版本。

项目详情


下载文件

下载适用于您平台的自定义文件。如果您不确定该选择哪个,请了解更多关于安装软件包的信息。

源分发

gocept.amqprun-3.0.tar.gz (43.3 kB 查看哈希值)

上传时间

构建分发

gocept.amqprun-3.0-py3-none-any.whl (46.3 kB 查看哈希值)

上传时间 Python 3

支持者