跳转到主要内容

Pika Python AMQP客户端库

项目描述

Pika是Python的RabbitMQ(AMQP 0-9-1)客户端库。

Version Python versions Actions Status Coverage License Documentation Status

简介

Pika是AMQP 0-9-1协议的纯Python实现,包括RabbitMQ的扩展。

  • 支持Python 3.7+(1.1.0是最后一个支持2.7的版本)

  • 由于线程并不适用于所有情况,因此不需要线程。Pika核心也确保不会禁止它们。绿色通道、回调、延续和生成器也是如此。然而,Pika内置的连接适配器实例并不是线程安全的。

  • 人们可能会使用直接套接字、简单的select()或任何将网络事件发送到和从Python应用程序中获取的广泛方式。Pika试图与所有这些保持兼容,并尽可能简化将其适应新环境的过程。

文档

Pika的文档可以在https://pika.readthedocs.io找到。

示例

以下是最简单的使用示例,使用

pika.BlockingConnection

适配器发送消息

import pika

connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_publish(exchange='test', routing_key='test',
                      body=b'Test message.')
connection.close()

以下是编写阻塞消费者的示例

import pika

connection = pika.BlockingConnection()
channel = connection.channel()

for method_frame, properties, body in channel.consume('test'):
    # Display the message parts and acknowledge the message
    print(method_frame, properties, body)
    channel.basic_ack(method_frame.delivery_tag)

    # Escape out of the loop after 10 messages
    if method_frame.delivery_tag == 10:
        break

# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
connection.close()

Pika提供以下适配器

  • pika.adapters.asyncio_connection.AsyncioConnection - Python 3 AsyncIO I/O循环的异步适配器。

  • pika.BlockingConnection - 在库之上同步适配器,用于简单使用。

  • pika.SelectConnection - 无第三方依赖的异步适配器。

  • pika.adapters.gevent_connection.GeventConnection - 与Gevent I/O循环一起使用的异步适配器。

  • pika.adapters.tornado_connection.TornadoConnection - 与Tornado I/O循环一起使用的异步适配器。

  • pika.adapters.twisted_connection.TwistedProtocolConnection - 与Twisted I/O循环一起使用的异步适配器。

多个连接参数

您还可以传递多个

pika.ConnectionParameters

实例以实现容错,如下面的代码片段所示(主机名仅为例子)。要启用重试,将最后一个序列中的

pika.ConnectionParameters

元素中的

connection_attempts

retry_delay

设置为所需的值。在所有给定连接参数的连接尝试失败后进行重试。

import pika

parameters = (
    pika.ConnectionParameters(host='rabbitmq.zone1.yourdomain.com'),
    pika.ConnectionParameters(host='rabbitmq.zone2.yourdomain.com',
                              connection_attempts=5, retry_delay=1))
connection = pika.BlockingConnection(parameters)

对于非阻塞适配器,如

pika.SelectConnection

pika.adapters.asyncio_connection.AsyncioConnection

,您可以通过连接适配器的

create_connection()

类方法使用多个连接参数实例请求连接。

从另一个线程请求消息确认

单个Pika连接适配器实例的单线程使用约束可能导致长时间处理传入消息的消费者中的AMQP/stream连接由于AMQP心跳超时而丢失。一个常见的解决方案是将传入消息的处理委托给另一个线程,同时连接适配器的线程继续服务其I/O循环的消息泵,以便及时处理AMQP心跳和其他I/O。

在另一个线程中处理的消息可能无法直接从该线程进行确认,因为必须从单个线程访问连接适配器实例,即运行适配器I/O循环的线程。这是通过请求在适配器I/O循环线程中执行回调来实现的。例如,回调函数的实现可能如下所示

def ack_message(channel, delivery_tag):
    """Note that `channel` must be the same Pika channel instance via which
    the message being acknowledged was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        # Channel is already closed, so we can't acknowledge this message;
        # log and/or do something that makes sense for your app in this case.
        pass

在另一个线程中运行的代码可以使用适配器特定的机制请求在连接适配器的I/O循环线程中执行

ack_message()

函数

  • pika.BlockingConnection将它的I/O循环从应用程序中抽象出来,因此公开了

    pika.BlockingConnection.add_callback_threadsafe()

    。有关更多信息,请参阅此方法的文档字符串。例如

    connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
  • 当使用非阻塞连接适配器,例如 pika.adapters.asyncio_connection.AsyncioConnectionpika.SelectConnection 时,您会使用底层异步框架的原生API来从另一个线程请求与I/O循环绑定的回调。例如,pika.SelectConnection 的I/O循环提供了 add_callback_threadsafe()pika.adapters.tornado_connection.TornadoConnection 的I/O循环有 add_callback(),而 pika.adapters.asyncio_connection.AsyncioConnection 的I/O循环公开了 call_soon_threadsafe()

此线程安全回调请求机制还可以用于将消息的发布等操作从后台线程委托到连接适配器的线程。

连接恢复

某些RabbitMQ客户端(Bunny、Java、.NET、Objective-C、Swift)提供了一种在网络故障后自动恢复连接、通道和拓扑(例如队列、绑定和消费者)的方法。其他客户端要求应用程序代码执行连接恢复,并努力使其过程简单明了。Pika属于第二类。

Pika支持多种连接适配器。它们在连接恢复方面采用不同的方法。

对于 pika.BlockingConnection 适配器,可以使用异常处理来检查连接错误。以下是一个非常基本的示例

import pika

while True:
    try:
        connection = pika.BlockingConnection()
        channel = connection.channel()
        channel.basic_consume('test', on_message_callback)
        channel.start_consuming()
    # Don't recover if connection was closed by broker
    except pika.exceptions.ConnectionClosedByBroker:
        break
    # Don't recover on channel errors
    except pika.exceptions.AMQPChannelError:
        break
    # Recover on all other connection errors
    except pika.exceptions.AMQPConnectionError:
        continue

此示例可以在 examples/consume_recover.py 中找到。

通用操作重试库(如 retry)可用于。装饰器使您可以配置一些额外的恢复行为,例如重试之间的延迟和限制重试次数

from retry import retry


@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
    connection = pika.BlockingConnection()
    channel = connection.channel()
    channel.basic_consume('test', on_message_callback)

    try:
        channel.start_consuming()
    # Don't recover connections closed by server
    except pika.exceptions.ConnectionClosedByBroker:
        pass


consume()

此示例可以在 examples/consume_recover_retry.py 中找到。

对于异步适配器,使用 on_close_callback 来响应连接故障事件。此回调可用于清理和恢复连接。

使用 on_close_callback 进行恢复的示例可以在 examples/asynchronous_consumer_example.py 中找到。

贡献

要为Pika做出贡献,请确保任何新功能或现有功能的更改都包含测试覆盖率。

没有足够测试覆盖率的添加或更改代码的拉取请求将被拒绝。

此外,请在发出拉取请求之前使用 Yapfgoogle 风格格式化您的代码。 注意:仅格式化您在拉取请求中更改的行。如果您格式化整个文件并更改PR范围之外的代码,则可能会被拒绝。

扩展以支持额外的I/O框架

可以通过以下两种方式实现新的非阻塞适配器

  • 通过将 pika.BaseConnection 作为子类,实现其抽象方法,并将其实例化为 pika.adapters.utils.nbio_interface.AbstractIOServices 的实现。 pika.BaseConnection 实现了 pika.connection.Connection 的抽象方法,包括内部触发的连接逻辑。有关示例,请参考 pika.adapters.asyncio_connection.AsyncioConnectionpika.adapters.gevent_connection.GeventConnectionpika.adapters.tornado_connection.TornadoConnection 的实现。

  • 通过将 pika.connection.Connection 作为子类并实现其抽象方法。此方法有助于实现自定义连接建立和传输机制。有关示例,请参考 pika.adapters.twisted_connection.TwistedProtocolConnection 的实现。

项目详情


下载文件

下载您平台对应的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

pika-1.3.2.tar.gz (145.0 kB 查看哈希值)

上传时间

构建分布

pika-1.3.2-py3-none-any.whl (155.4 kB 查看哈希值)

上传时间 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面