Python的低级AMQP客户端(amqplib的分支)。
项目描述
- 版本:
5.2.0
- 网站:
- 下载:
- 源码:
- 关键词:
amqp, rabbitmq
关于
这是一个由Barry Pederson最初编写的amqplib的分支。它由Celery项目维护,当librabbitmq不可用时,由kombu用作纯Python替代方案。
该库应与librabbitmq具有API兼容性。
与amqplib的区别
支持从多个通道中提取事件(Connection.drain_events)
支持超时
在通道发生错误后,通道会恢复,而不是必须关闭连接。
支持心跳
Connection.heartbeat_tick(rate=2)必须在固定间隔内调用(如果速率是2,则为心跳值的一半)。
或者使用Connection.send_heartbeat采用其他方案。
- 支持RabbitMQ扩展
- 消费者取消通知
默认情况下,取消会导致ChannelError被引发
但是如果将on_cancel回调传递给basic_consume,则不会发生这种情况。
- 发布者确认
Channel.confirm_select()启用发布者确认。
Channel.events['basic_ack'].append(my_callback)将一个回调添加到在消息被确认时将被调用的回调。然后,该回调将使用签名(delivery_tag, multiple)调用。
- 交换到交换的绑定:exchange_bind / exchange_unbind。
Channel.confirm_select()启用发布者确认。
Channel.events['basic_ack'].append(my_callback)将一个回调添加到在消息被确认时将被调用的回调。然后,该回调将使用签名(delivery_tag, multiple)调用。
- 认证失败通知
在连接到rabbitmq-server 3.2.0或更高版本时,py-amqp不会在无效凭据上突然关闭连接,而是引发一个AccessRefused错误。
支持basic_return
- 使用AMQP 0-9-1而不是0-8。
Channel.access_request和ticket参数从方法中移除。
支持basic_consume的arguments参数。
exchange_declare的internal参数已移除。
exchange_declare的auto_delete参数已弃用
Connection的insist参数已移除。
Channel.alerts已移除。
支持Channel.basic_recover_async。
Channel.basic_recover已弃用。
- 异常已重命名,以具有习惯性名称
AMQPException -> AMQPError
AMQPConnectionException -> ConnectionError``
AMQPChannelException -> ChannelError``
Connection.known_hosts已移除。
Connection不再支持重定向。
queue_bind的exchange参数现在可以为空,以使用“默认交换”。
添加了Connection.is_alive,它试图检测连接是否还可以使用。
添加了Connection.connection_errors和.channel_errors,这是一个可恢复错误的列表。
将底层套接字公开为Connection.sock。
添加了Channel.no_ack_consumers以跟踪设置了no_ack标志的消费者标签。
在错误恢复方面略有改进
简要概述
简单的生产者向默认交换机发布消息到test队列
import amqp
with amqp.Connection('broker.example.com') as c:
ch = c.channel()
ch.basic_publish(amqp.Message('Hello World'), routing_key='test')
生产者向具有发布确认启用并使用虚拟主机test_vhost的test_exchange交换机发布消息
import amqp
with amqp.Connection(
'broker.example.com', exchange='test_exchange',
confirm_publish=True, virtual_host='test_vhost'
) as c:
ch = c.channel()
ch.basic_publish(amqp.Message('Hello World'), routing_key='test')
启用了确认的消费者
import amqp
with amqp.Connection('broker.example.com') as c:
ch = c.channel()
def on_message(message):
print('Received message (delivery tag: {}): {}'.format(message.delivery_tag, message.body))
ch.basic_ack(message.delivery_tag)
ch.basic_consume(queue='test', callback=on_message)
while True:
c.drain_events()
未启用确认的消费者
import amqp
with amqp.Connection('broker.example.com') as c:
ch = c.channel()
def on_message(message):
print('Received message (delivery tag: {}): {}'.format(message.delivery_tag, message.body))
ch.basic_consume(queue='test', callback=on_message, no_ack=True)
while True:
c.drain_events()
速度提升
此库对速度提升有实验性支持。速度提升是通过Cython实现的。要启用速度提升,必须在构建/安装期间设置环境变量CELERY_ENABLE_SPEEDUPS。目前,速度提升可以通过以下方式安装
使用源包(使用--no-binary开关)
CELERY_ENABLE_SPEEDUPS=true pip install --no-binary :all: amqp
直接构建源代码
CELERY_ENABLE_SPEEDUPS=true python setup.py install
进一步
AMQP 0.8与0.9.1之间的差异
AMQP 0.9.1 快速参考
RabbitMQ 扩展
有关AMQP的更多信息,请访问
有关其他Python客户端库,请参阅
py-amqp作为Tidelift订阅的一部分
py-amqp的维护者以及成千上万的其他包的维护者正在与Tidelift合作,为构建应用程序时使用的开源依赖项提供商业支持和维护。节省时间,降低风险,并提高代码质量,同时为使用的确切依赖项的维护者付款。[了解更多信息。](https://tidelift.com/subscription/pkg/pypi-amqp?utm_source=pypi-amqp&utm_medium=referral&utm_campaign=readme&utm_term=repo)
项目详情
下载文件
下载您平台对应的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。