跳转到主要内容

Python的低级AMQP客户端(amqplib的分支)。

项目描述

Build status coverage BSD License Python AMQP can be installed via wheel Supported Python versions. Support Python implementations.

版本:

5.2.0

网站:

https://amqp.readthedocs.io/

下载:

https://pypi.ac.cn/project/amqp/

源码:

http://github.com/celery/py-amqp/

关键词:

amqp, rabbitmq

关于

这是一个由Barry Pederson最初编写的amqplib的分支。它由Celery项目维护,当librabbitmq不可用时,由kombu用作纯Python替代方案。

该库应与librabbitmq具有API兼容性。

amqplib的区别

  • 支持从多个通道中提取事件(Connection.drain_events

  • 支持超时

  • 在通道发生错误后,通道会恢复,而不是必须关闭连接。

  • 支持心跳

    • Connection.heartbeat_tick(rate=2)必须在固定间隔内调用(如果速率是2,则为心跳值的一半)。

    • 或者使用Connection.send_heartbeat采用其他方案。

  • 支持RabbitMQ扩展
    • 消费者取消通知
      • 默认情况下,取消会导致ChannelError被引发

      • 但是如果将on_cancel回调传递给basic_consume,则不会发生这种情况。

    • 发布者确认
      • Channel.confirm_select()启用发布者确认。

      • Channel.events['basic_ack'].append(my_callback)将一个回调添加到在消息被确认时将被调用的回调。然后,该回调将使用签名(delivery_tag, multiple)调用。

    • 交换到交换的绑定:exchange_bind / exchange_unbind
      • Channel.confirm_select()启用发布者确认。

      • Channel.events['basic_ack'].append(my_callback)将一个回调添加到在消息被确认时将被调用的回调。然后,该回调将使用签名(delivery_tag, multiple)调用。

    • 认证失败通知

      在连接到rabbitmq-server 3.2.0或更高版本时,py-amqp不会在无效凭据上突然关闭连接,而是引发一个AccessRefused错误。

  • 支持basic_return

  • 使用AMQP 0-9-1而不是0-8。
    • Channel.access_requestticket参数从方法中移除。

    • 支持basic_consumearguments参数。

    • exchange_declareinternal参数已移除。

    • exchange_declareauto_delete参数已弃用

    • Connectioninsist参数已移除。

    • Channel.alerts已移除。

    • 支持Channel.basic_recover_async

    • Channel.basic_recover已弃用。

  • 异常已重命名,以具有习惯性名称
    • AMQPException -> AMQPError

    • AMQPConnectionException -> ConnectionError``

    • AMQPChannelException -> ChannelError``

    • Connection.known_hosts已移除。

    • Connection不再支持重定向。

    • queue_bindexchange参数现在可以为空,以使用“默认交换”。

  • 添加了Connection.is_alive,它试图检测连接是否还可以使用。

  • 添加了Connection.connection_errors.channel_errors,这是一个可恢复错误的列表。

  • 将底层套接字公开为Connection.sock

  • 添加了Channel.no_ack_consumers以跟踪设置了no_ack标志的消费者标签。

  • 在错误恢复方面略有改进

简要概述

简单的生产者向默认交换机发布消息到test队列

import amqp

with amqp.Connection('broker.example.com') as c:
    ch = c.channel()
    ch.basic_publish(amqp.Message('Hello World'), routing_key='test')

生产者向具有发布确认启用并使用虚拟主机test_vhosttest_exchange交换机发布消息

import amqp

with amqp.Connection(
    'broker.example.com', exchange='test_exchange',
    confirm_publish=True, virtual_host='test_vhost'
) as c:
    ch = c.channel()
    ch.basic_publish(amqp.Message('Hello World'), routing_key='test')

启用了确认的消费者

import amqp

with amqp.Connection('broker.example.com') as c:
    ch = c.channel()
    def on_message(message):
        print('Received message (delivery tag: {}): {}'.format(message.delivery_tag, message.body))
        ch.basic_ack(message.delivery_tag)
    ch.basic_consume(queue='test', callback=on_message)
    while True:
        c.drain_events()

未启用确认的消费者

import amqp

with amqp.Connection('broker.example.com') as c:
    ch = c.channel()
    def on_message(message):
        print('Received message (delivery tag: {}): {}'.format(message.delivery_tag, message.body))
    ch.basic_consume(queue='test', callback=on_message, no_ack=True)
    while True:
        c.drain_events()

速度提升

此库对速度提升有实验性支持。速度提升是通过Cython实现的。要启用速度提升,必须在构建/安装期间设置环境变量CELERY_ENABLE_SPEEDUPS。目前,速度提升可以通过以下方式安装

  1. 使用源包(使用--no-binary开关)

CELERY_ENABLE_SPEEDUPS=true pip install --no-binary :all: amqp
  1. 直接构建源代码

CELERY_ENABLE_SPEEDUPS=true python setup.py install

进一步

py-amqp作为Tidelift订阅的一部分

py-amqp的维护者以及成千上万的其他包的维护者正在与Tidelift合作,为构建应用程序时使用的开源依赖项提供商业支持和维护。节省时间,降低风险,并提高代码质量,同时为使用的确切依赖项的维护者付款。[了解更多信息。](https://tidelift.com/subscription/pkg/pypi-amqp?utm_source=pypi-amqp&utm_medium=referral&utm_campaign=readme&utm_term=repo)

项目详情


发布历史 发布通知 | RSS源

下载文件

下载您平台对应的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源代码分发

amqp-5.2.0.tar.gz (128.8 kB 查看散列值)

上传时间 源代码

构建分发

amqp-5.2.0-py3-none-any.whl (50.9 kB 查看散列值)

上传时间 Python 3

由...