gocept.amqprun 帮助您编写和运行AMQP消费者,并发送AMQP消息。它目前仅支持AMQP 0-9-1,并与Zope工具包(ZTK)集成,因此您可以使用适配器、实用工具以及所有相关功能。
项目描述
gocept.amqprun 帮助您编写和运行 AMQP 消费者,以及发送 AMQP 消息。目前它仅支持 AMQP 0-9-1,并与 Zope 工具包(ZTK)集成,因此您可以使用适配器、工具和所有这些功能。
基本概念和术语
消息处理器 是一个与一个路由键绑定到恰好一个队列的函数。它为该队列上的每个消息调用一次,并可能返回一个消息列表作为结果。
一个处理的消息的结果消息与处理的消息的 ACK 一起在一个事务中发送。
在消息处理过程中抛出异常时,事务被中止。(如果 RabbitMQ 支持,接收到的消息会被 NACK。)
消息处理器一次处理一条消息。可以使用多个进程同时处理多个消息。(每个进程是单线程的。)
您无需关注的事情
处理与 AMQP 代理通信的套接字和锁定
事务处理
消息 ID
每个发出的消息都获得一个类似电子邮件的消息 ID。
每个发出的消息的相关 ID 被设置为接收到的消息的消息 ID。
每个发出的消息都获得一个自定义引用头,该头设置为接收到的消息的引用头加上接收到的消息的消息 ID。
入门:接收消息
要开始,定义一个执行工作的函数。在这个例子中,我们记录消息正文并发送一个消息。declare 装饰器接受两个参数,即队列名称和路由键(您也可以传递一个列表,将函数绑定到多个路由键)。declare 装饰器还支持一个可选的 arguments 参数,它是一个字典,将被传递给 AMQP queue_declare 调用,例如,支持 RabbitMQ 上的镜像队列。可选参数 principal 指定使用给定的主要 ID 将处理程序调用包装到 zope.security 交互中(您需要使用 [security] setup.py 额外设置来使用此集成功能)。
import logging import gocept.amqprun.handler import gocept.amqprun.message log = logging.getLogger(__name__) @gocept.amqprun.handler.declare('test.queue', 'test.routing') def log_message_body(message): log.info(message.body) msg = gocept.amqprun.message.Message( header=dict(content_type='text/plain'), body=u'Thank you for your message.', routing_key='test.thank.messages') return [msg]
处理程序函数需要注册为一个命名实用程序。使用 ZCML 看起来像这样 [1]
<configure xmlns="http://namespaces.zope.org/zope"> <utility component="your.package.log_message_body" name="basic" /> </configure>
建议设置服务器时创建一个 buildout。以下 buildout 为 gocept.amqprun 创建了一个配置文件,以及一个用于组件配置的 ZCML 文件,并使用 ZDaemon 将进程守护化
[buildout] parts = config zcml app server [deployment] name = queue recipe = gocept.recipe.deploymentsandbox root = ${buildout:directory} [config] recipe = lovely.recipe:mkfile path = ${deployment:etc-directory}/queue.conf amqp-hostname = localhost amqp-username = guest amqp-password = guest amqp-virtualhost = / eventlog = <eventlog> level DEBUG <logfile> formatter zope.exceptions.log.Formatter path STDOUT </logfile> </eventlog> amqp-server = <amqp-server> hostname ${:amqp-hostname} username ${:amqp-username} password ${:amqp-password} virtual_host ${:amqp-virtualhost} </amqp-server> content = ${:eventlog} ${:amqp-server} <worker> component-configuration ${zcml:path} </worker> <settings> your.custom.settings here </settings> [zcml] recipe = lovely.recipe:mkfile path = ${deployment:etc-directory}/queue.zcml content = <configure xmlns="http://namespaces.zope.org/zope"> <include package="your.package" /> </configure> [app] recipe = zc.recipe.egg:script eggs = gocept.amqprun your.package zope.exceptions arguments = '${config:path}' scripts = server=app [server] recipe = zc.zdaemonrecipe deployment = deployment program = ${buildout:bin-directory}/app
发送消息
如果您只想发送消息,无需注册任何处理程序,但可以直接使用 gocept.amqprun.server.Server.send()。虽然处理程序通常在由 server 入口点启动的自己的进程中运行(如上所述),但如果您只是发送消息,也可以跳过额外的过程,并在原始进程中运行 gocept.amqprun.server.Server。以下是一些示例代码
def start_server(**kw): server = gocept.amqprun.server.Server(kw, setup_handlers=False) server.connect() return server
(当您使用 ZCA 时,您可能还想在该点将 Server 注册为实用程序,以便客户端可以轻松访问它以发送消息。)
设置
对于特定于应用程序的设置,gocept.amqprun 通过 ISettings 实用程序使配置中的 <settings> 部分可用
settings = zope.component.getUtility( gocept.amqprun.interfaces.ISettings) settings.get('your.settings.key')
限制
目前所有消息都通过 amq.topic 交换发送和接收。目前不支持其他交换。
与文件系统交互
阅读
在 setup.py 中有一个 send_files 入口点。它可以配置三个参数:zconfig 文件的路径、监控路径和路由键。它读取监控路径中名为 new 的目录中的新文件,并以文件内容作为消息体,将文件名作为 X-Filename 标题发送到路由。已发送的文件会被移动到监控路径中名为 cur 的目录。
开发
您可以通过环境变量设置运行测试的 AMQP 服务器参数
- AMQP_HOSTNAME:
默认:localhost
- AMQP_USERNAME:
默认:guest
- AMQP_PASSWORD:
默认:guest
- AMQP_VIRTUALHOST:
默认:None,因此会创建并自动删除一个具有临时名称的 vhost(使用 AMQP_RABBITMQCTL 命令)
- AMQP_RABBITMQCTL:
默认:‘sudo rabbitmqctl’
源代码可在 Mercurial 仓库中找到,网址为 https://github.com/gocept/gocept.amqprun
请将您发现的任何错误报告到 https://github.com/gocept/gocept.amqprun/issues
bin/test_sender 和 bin/test_server
test_sender 和 test_server 是提供基本发送和处理能力以进行烟测试当前实现行为的脚本。当启动 test_sender 时,会发出 10 条消息,这些消息被路由到 test.routing。 test.server 声明了一个 test.queue,所有来自 test.routing 的消息都会发送到该队列,并且有一个处理程序记录来自 test.queue 的每条消息。
bin/test_send_files
test_send_files 启动一个服务器,该服务器监控 ./testdir/new 目录,并将复制到其中的文件作为 amqp 消息发送到 test.routing。它的入口点是 gocept.amqprun.readfiles:main。
变更
3.0 (2020-10-06)
破坏性更改
停止支持 Python 2.7。
删除不再使用的异常 .interfaces.CounterBelowZero。
功能
重试在事务提交期间发生的错误,并且已配置为重新抛出。
添加对 Python 3.9 的支持。
2.1 (2020-09-18)
在发生 socket.error 时让服务器退出。我们在 2.0 中已经失去了重新连接功能。
2.1a1(2020-04-17)
添加对 Python 3.7 和 3.8 的支持。
2.0 (2020-04-15)
添加对 zope.interface >= 5 的支持。(#13)
将 CI 管道从 Travis 更改为 GitHub Actions。
2.0a1(2020-03-20)
破坏性更改
使用 kombu/py-amqp 代替 pika 在 amqp 方法上提供基本抽象。
删除多线程 - 服务器和工作者在当前进程中运行。
删除 gocept.amqprun.main.main_server 作为主循环。
删除 configure.zcml 和 meta.zcml。
删除将消息写入文件和 amqp:writefile 指令。
删除 setup.py 中的 writefiles 附加内容。
删除 amqp:readfile zcml 指令,并为 readfiles 进程添加 send_files 入口点。它接受三个参数:配置文件路径、监控目录路径和发送文件的路径。(见 bin/test_send_files,详细信息请参阅 README.rst)。
删除 .channel.Channel 并使用 amqp.channel.Channel 代替。
删除 .interfaces.IChannelManager。
删除 .interfaces.ProcessStarted。
删除 .interfaces.ProcessStopping。
删除 .connection.Connection 并使用 kombu.Connection 代替。
在zconfig配置中,将conf键heartbeat_interval重命名为heartbeat。
从worker zconfig中移除amount。
强制为unicode消息体启用content_encoding头。
将.message.Message.header.headers重命名为.message.Message.header.application_headers。
将.testing.ZCML_LAYER重命名为.testing.ZCA_LAYER。
移除.testing.LoopTestCase,使用unittest.TestCase代替。
移除.testing.MainTestCase.wait_for_response,使用wait_for_message代替。
移除未使用的.testing.SettingsLayer。
将.connection.Parameters移动到.server,它现在返回一个字典。
功能
添加bin/test_sender和bin/test_server脚本来进行发送和接收行为的冒烟测试。参见README.rst。
为.handler.Handler类添加表示。
1.8 (2019-09-11)
更新到当前的bootstrap.py。
提高与Python 3的向前兼容性。
修复弃用警告。
防止使用pytest >= 5以保持Python 2的兼容性。
将仓库迁移到github。
1.7.1 (2016-08-22)
修复当将字符串传递给gocept.amqprun.connection.Parameters作为端口号时的错误。
1.7 (2016-05-31)
修复在worker中发生异常后消息处理器可能出现的无响应问题。异常导致计数器欠载,防止切换通道。计数器不再绑定到事务,而是绑定到消息处理。
添加一个安全措施,如果通道上的引用计数低于零,则终止整个进程。
在将任务放入worker队列之前获取通道,以防止可能的竞态条件。
如果tpc_abort()失败,则释放提交锁。这样,下一次tpc_begin()在尝试获取锁并阻塞整个进程时不会无限期等待。
1.6 (2016-04-04)
允许在.testing.QueueTestCase.send_message()中使用任意关键字参数传递给生成的消息。
1.5 (2016-02-01)
添加一个testing额外功能,可以用于重用此包的测试基础设施来开发测试。
1.4 (2015-11-20)
在ErrorHandlingHandler上确认非可恢复错误的消息。在0.17.0中丢失了此功能。修复:https://bitbucket.org/gocept/gocept.amqprun/issue/5
ErrorHandlingHandler在非可恢复错误上的错误消息再次使用其correlation_id引用原始消息。在0.17.0中丢失了此功能。修复:https://bitbucket.org/gocept/gocept.amqprun/issue/6
1.3 (2015-09-11)
更新测试基础设施,使其能够运行比3.1.5版新版本的RabbitMQ。
1.2 (2015-09-11)
添加py.test作为测试运行器。
将settings属性添加到IHandler中,以简化对ISettings的访问。
1.1 (2015-09-01)
.connection.Parameters不再接受任意关键字参数。
1.0 (2015-04-10)
在完全配置了gocept.amqprun之后但在启动工作者之前,通知IConfigFinished事件。此事件可用于打开数据库连接等。
将版本号提升到1.0,因为此包已在生产环境中使用了多年。
0.17.0 (2015-04-09)
在ErrorHandlingHandler上非可恢复错误时中止事务。修复:https://bitbucket.org/gocept/gocept.amqprun/issue/4
0.16.0 (2015-03-25)
如果在连接后连接不活动,则引发RuntimeError(这可能发生在凭据错误的情况下)。
0.15.2 (2015-02-24)
修复.testing.QueueLayer.rabbitmqctl()中的错误报告。
0.15.1 (2015-01-21)
将代码和bug跟踪器移动到Bitbucket。
0.15.0 (2014-10-08)
假装支持保存点,以便我们可以与其他支持(并需要)它们的系统协同工作。
0.14.0 (2014-09-09)
引入gocept.amqprun.handler.ErrorHandlingHandler基类。
在单独的ZCA中运行集成测试(MainTestCase),与其他测试可能完成的任何注册隔离。
为每个测试运行在rabbitmq服务器上创建临时虚拟主机。
改进了测试用例方法send_message,以接受标题作为参数。
0.13.0 (2014-04-16)
引入IResponse以实现事务集成异常处理。
修复了时间错误:当远程端关闭套接字同时触发通道切换时,这曾经会崩溃(因为套接字已关闭)。我们现在忽略通道切换尝试,并简单地等待重新连接,无论如何都会打开一个新的通道。
0.12.2 (2014-02-20)
将xfilename变量添加到<amqp:writefiles pattern="">设置中。
0.12.1 (2014-02-20)
添加保护措施,确保两个处理程序不能绑定到同一个队列。
0.12 (2014-02-13)
将<amqp:writefiles>包含到事务处理中,即仅在提交时写入文件。以前,当发生错误时,例如在MessageStored事件中,会写入重复的文件(每次错误后重试处理消息时都会如此)。
0.11 (2014-01-21)
当<amqp:readfiles>发生文件系统错误时终止进程。以前,只有文件读取线程崩溃,但进程仍在运行,没有意识到它现在不再执行其工作,因为线程已死亡。
0.10 (2013-05-28)
支持单个进程中的多个服务器。
为服务器引入setup_handlers参数,以便客户端可以禁用设置处理程序。
修复了<amqp:readfiles>事务实现中的一个错误,该错误在回滚事务时导致崩溃(#12437)。
允许测试服务器在慢速计算机上启动时间更长。
0.9.5 (2013-04-16)
重构Session / DataManager职责(#9988)。
0.9.4 (2012-09-07)
修复了IDataManager实现:无论事务结果如何,abort()可能被多次调用。仅释放一次通道引用计数。
0.9.3 (2012-09-07)
改进了IDataManager的日志记录。
0.9.2 (2012-09-07)
改进了IChannelManager.acquire/release的日志记录。
0.9.1 (2012-09-06)
修复了IDataManager实现:即使没有先前的tpc_begin(),tpc_abort()也可能被调用(例如,在保存点中的错误发生时)。
修复了Connection.close()的方法签名。
0.9 (2012-08-31)
引入了与zope.security的可选集成:处理程序可以声明一个用于创建交互的principal id。
为非响应于接收到的消息的消息使用单独的通道发送。
引入了SETTINGS_LAYER,以便依赖于ISettings的测试。
0.8 (2012-04-04)
修复了导致消息在不同的通道上确认而不是接收的通道的竞争条件(#10635)。
修复了在服务器未正确启动之前发送消息的尝试导致的竞争条件(#10620)。
0.7 (2012-03-22)
修复了在DataManager中获取当前通道和在Server中切换当前通道之间的竞争条件(#10521)。
使AMQP服务器可配置以进行测试(#9232)。
0.6.1 (2012-02-23)
修复了在创建引用标题时,当父消息没有引用时产生的错误(#10478)。
0.6 (2012-02-22)
功能
将FileStoreReader从其自己的进程更改为使用gocep.amqprun发送的线程(以前它使用amqplib)。引入了amqp:readfiles ZCML指令。(#10177)
将filestore额外更改为readfiles额外。
从amqp:readfiles传输文件名作为X-Filename标题。
引入了ISender实用程序。
错误
修复了在0.5中引入的消息确认错误(#10030)。
内部
将MainTestCase的API从create_reader更改为start_server。
0.5.1 (2012-01-09)
修复了支持Unicode参数用于队列声明的bug,因为pika在这里只支持字节字符串。
修复了使arguments参数在amqp:writefiles中工作的问题 (#10115)。
0.5 (2011-12-08)
通用
添加了writefiles扩展,使ZCML指令amqp:writefiles成为可选。
添加了filestore扩展,使gocept.amqprun.filestore成为可选。
将amqp:writefiles的声明从configure.zcml移动到meta.zcml。
功能
将gocept.amqprun.server.MessageReader重命名为gocept.amqprun.server.Server,并添加了一个send方法,以便它可以启动消息发送。
添加了对队列声明参数的支持,例如支持RabbitMQ镜像队列部署的x-ha-policy头(#10036)。
内部
在server.AMQPDataManager.__init__中更改了内部API:现在message参数是可选的,因此它被移动到参数列表的末尾。
使用plone.testing进行层基础设施。
0.4.2 (2011-08-23)
为FileWriter添加了处理头文件的帮助方法(针对#9443)。
0.4.1 (2011-08-22)
记录消息ID。
0.4 (2011-07-25)
设置输出消息的消息ID。
将输出消息的关联ID设置为接收消息的消息ID(如果设置了)。
将自定义头references设置为接收消息的引用头+接收消息的消息ID(类似于RFC5322中的References)。
修复了损坏的测试。
允许设置键中使用大写字母。
扩展了AMQP服务器配置,为FileStoreReader包括凭据和虚拟主机。
允许指定多个路由键(#9326)。
允许指定文件名/路径模式(#9327)。
FileWriter除了存储正文外,还存储头信息(#9328)。
FileWriter发送IMessageStored事件(#9335)。
0.3 (2011-02-05)
将装饰器从handle重命名为declare。
向MainTestCase添加了帮助方法wait_for_response。
添加了一个在启动期间发送的IProcessStarting事件。
添加了设置处理程序将接收消息写入文件的<amqp:writefiles/>指令。
添加了对<logger>指令的处理。
0.2 (2010-09-14)
添加了装饰器gocept.amqprun.handler.handle(queue_name, routing_key)。
0.1 (2010-08-13)
第一个版本。
项目详情
下载文件
下载适用于您平台的自定义文件。如果您不确定该选择哪个,请了解更多关于安装软件包的信息。