Microsoft Azure Event Hubs Python客户端库
项目描述
Azure Event Hubs Python客户端库
Azure Event Hubs是一个高度可扩展的发布/订阅服务,可以每秒处理数百万个事件并将其流式传输到多个消费者。这使您能够处理和分析由您的连接设备和应用程序产生的海量数据。一旦Event Hubs收集了数据,您就可以使用任何实时分析提供商或使用批处理/存储适配器来检索、转换和存储它。如果您想了解更多关于Azure Event Hubs的信息,您可能希望查看:什么是Event Hubs?
Azure Event Hubs 客户端库允许发布和消费 Azure Event Hubs 事件,可以用于
- 发布关于您的应用程序的遥测数据,用于业务智能和诊断目的。
- 发布关于您的应用程序状态的实时信息,感兴趣的相关方可以观察并用作采取行动的触发器。
- 观察您业务或其他生态系统中发生的有趣操作和交互,允许松散耦合的系统进行交互,而无需将它们绑定在一起。
- 接收来自一个或多个发布者的事件,将它们转换为更符合您生态系统需求的形式,然后将转换后的事件发布到新的流中以供消费者观察。
源代码 | 包(PyPi) | 包(Conda) | API 参考文档 | 产品文档 | 示例
入门指南
先决条件
-
Python 3.8 或更高版本。
-
Microsoft Azure 订阅:要使用 Azure 服务,包括 Azure Event Hubs,您需要一个订阅。如果您没有现有的 Azure 账户,您可以在创建账户时注册免费试用或使用您的 MSDN 订阅者权益。
-
Event Hubs 命名空间与 Event Hub:要与 Azure Event Hubs 交互,您还需要一个命名空间和 Event Hub。如果您不熟悉创建 Azure 资源,您可能希望遵循使用 Azure 门户创建 Event Hub 的分步指南。在那里,您还可以找到使用 Azure CLI、Azure PowerShell 或 Azure 资源管理器(ARM)模板创建 Event Hub 的详细说明。
安装包
使用 pip 安装 Azure Event Hubs 客户端库 for Python
$ pip install azure-eventhub
验证客户端
与 Event Hubs 的交互从 EventHubConsumerClient 或 EventHubProducerClient 类的实例开始。您需要主机名、SAS/AAD 凭据和事件中心名称,或者一个连接字符串来实例化客户端对象。
对于 Event Hubs 客户端库与 Event Hub 交互来说,最简单的方法是使用连接字符串,该字符串在创建 Event Hubs 命名空间时自动创建。如果您不熟悉 Azure 中的共享访问策略,您可能希望遵循获取 Event Hubs 连接字符串的分步指南。
from_connection_string
方法接受形式为Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey>
的连接字符串以及您的 Event Hub 实例的实体名称。您可以从Azure 门户获取连接字符串。
或者,可以使用凭据对象通过 azure-identity 包进行 AAD 验证。
- 上面链接的示例中演示的构造函数采用您的 Event Hub 实例的主机名和实体名称以及实现 TokenCredential 协议的凭据。在 azure-identity 包 中有可用的
TokenCredential
协议实现。主机名的格式为<yournamespace.servicebus.windows.net>
。 - 要使用
azure-identity
提供的凭据类型,请安装该包:pip install azure-identity
- 此外,要使用异步API,您必须首先安装异步传输,例如
aiohttp
:pip install aiohttp
- 当使用Azure Active Directory时,您的主体必须分配有允许访问事件中心的角色,例如Azure事件中心数据所有者角色。有关使用Azure Active Directory授权与事件中心的更多信息,请参阅相关文档。
关键概念
-
EventHubProducerClient 是遥测数据、诊断信息、使用日志或其他日志数据的来源,作为嵌入式设备解决方案、移动设备应用程序、在控制台或其他设备上运行的游戏标题、某些基于客户端或服务器的业务解决方案或网站的一部分。
-
EventHubConsumerClient 从事件中心获取此类信息并进行处理。处理可能涉及聚合、复杂计算和过滤。处理还可能涉及以原始或转换形式分发或存储信息。事件中心消费者通常是强大且高规模的平台基础设施部分,具有内置的分析功能,如Azure流分析、Apache Spark或Apache Storm。
-
分区 是在事件中心中保存的有序事件序列。Azure事件中心通过分区消费者模式提供消息流,其中每个消费者只读取消息流的一个特定子集或分区。随着新事件的到来,它们将被添加到该序列的末尾。分区的数量在创建事件中心时指定,并且不能更改。
-
消费者组 是整个事件中心的视图。消费者组允许多个消费应用程序各自拥有事件流的单独视图,并且可以独立以自己的速度和位置读取流。每个消费者组在一个分区的分区上最多可以有5个并发读取器;然而,建议对于给定的分区和消费者组配对,只有一个活动消费者。每个活动读取器接收其分区的所有事件;如果同一分区上有多个读取器,则它们将接收重复事件。
有关更多概念和深入讨论,请参阅: 事件中心功能。此外,OASIS高级消息队列协议(AMQP)版本1.0的概念在OASIS中有很好的文档。
线程安全
我们不保证EventHubProducerClient或EventHubConsumerClient是线程安全的。我们不推荐在多个线程之间重用这些实例。这些类在运行的应用程序中以线程安全的方式使用取决于运行的应用程序。
数据模型类型EventDataBatch
不是线程安全的。它不应在多个线程之间共享,也不应与客户端方法同时使用。
示例
以下部分提供了几个代码片段,涵盖了事件中心的一些常见任务,包括
- 检查事件中心
- 向事件中心发布事件
- 从事件中心消费事件
- 批量从事件中心消费事件
- 异步向事件中心发布事件
- 异步从事件中心消费事件
- 异步批量从事件中心消费事件
- 使用检查点存储来消费事件并保存检查点
- 使用EventHubConsumerClient与IoT Hub一起工作
检查事件中心
获取事件中心的分区ID。
import os
from azure.eventhub import EventHubConsumerClient
from azure.identity import DefaultAzureCredential
FULLY_QUALIFIED_NAMESPACE = os.environ["EVENT_HUB_HOSTNAME"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
consumer_client = EventHubConsumerClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
consumer_group='$Default',
eventhub_name=EVENTHUB_NAME,
credential=DefaultAzureCredential(),
)
with consumer_client:
pass # consumer_client is now ready to be used.
向事件中心发布事件
在EventHubProducerClient
上使用create_batch
方法创建一个EventDataBatch
对象,然后可以使用send_batch
方法发送该对象。可以通过使用add
方法将事件添加到EventDataBatch
中,直到达到以字节为单位的最大批量大小限制。
def send_event_data_batch(producer):
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.
event_data_batch = producer.create_batch()
event_data_batch.add(EventData('Single message'))
producer.send_batch(event_data_batch)
从事件中心消费事件
有几种从EventHub消费事件的方法。要简单地在接收到事件时触发回调,可以使用以下EventHubConsumerClient.receive
方法
import logging
from azure.eventhub import EventHubConsumerClient
from azure.identity import DefaultAzureCredential
fully_qualified_namespace = '<< EVENT HUBS FULLY QUALIFIED NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group=consumer_group,
credential=DefaultAzureCredential(),
)
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
def on_event(partition_context, event):
logger.info("Received event from partition {}".format(partition_context.partition_id))
partition_context.update_checkpoint(event)
with client:
client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive(on_event=on_event, partition_id='0')
批量从事件中心消费事件
以上示例在接收到每条消息时都会触发回调,而以下示例在触发一批事件时触发回调,尝试一次接收一定数量的消息。
import logging
from azure.eventhub import EventHubConsumerClient
from azure.identity import DefaultAzureCredential
fully_qualified_namespace = '<< EVENT HUBS FULLY QUALIFIED NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group=consumer_group,
credential=DefaultAzureCredential(),
)
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
def on_event_batch(partition_context, events):
logger.info("Received event from partition {}".format(partition_context.partition_id))
partition_context.update_checkpoint()
with client:
client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive_batch(on_event_batch=on_event_batch, partition_id='0')
异步向事件中心发布事件
使用EventHubProducer
的create_batch
方法创建一个EventDataBatch
对象,然后可以使用send_batch
方法发送。可以通过add
方法向EventDataBatch
中添加事件,直到达到最大批量大小的字节数限制。
import asyncio
from azure.eventhub.aio import EventHubProducerClient # The package name suffixed with ".aio" for async
from azure.eventhub import EventData
from azure.identity.aio import DefaultAzureCredential
fully_qualified_namespace = '<< EVENT HUBS FULLY QUALIFIED NAMESPACE >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
async def create_batch(client):
event_data_batch = await client.create_batch()
can_add = True
while can_add:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
return event_data_batch
async def send():
client = EventHubProducerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=DefaultAzureCredential(),
)
batch_data = await create_batch(client)
async with client:
await client.send_batch(batch_data)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(send())
异步从事件中心消费事件
此SDK支持同步和基于asyncio的代码。要在aio中接收,如上述示例所示,但需要在以下情况下
import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
fully_qualified_namespace = '<< EVENT HUBS FULLY QUALIFIED NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
async def on_event(partition_context, event):
logger.info("Received event from partition {}".format(partition_context.partition_id))
await partition_context.update_checkpoint(event)
async def receive():
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group=consumer_group,
credential=DefaultAzureCredential(),
)
async with client:
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# await client.receive(on_event=on_event, partition_id='0')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(receive())
异步批量从事件中心消费事件
所有同步函数在aio中也得到支持。如上例中同步批量接收所示,可以在asyncio中完成相同的事情,如下所示
import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
fully_qualified_namespace = '<< EVENT HUBS FULLY QUALIFIED NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
async def on_event_batch(partition_context, events):
logger.info("Received event from partition {}".format(partition_context.partition_id))
await partition_context.update_checkpoint()
async def receive_batch():
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group=consumer_group,
credential=DefaultAzureCredential(),
)
async with client:
await client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(receive_batch())
使用检查点存储来消费事件并保存检查点
EventHubConsumerClient
是一个高级构造,允许您同时从多个分区接收事件,并使用相同的Event Hub和消费者组与其他消费者进行负载均衡。
这还允许用户通过检查点跟踪事件处理进度。
检查点用来表示用户从Event Hub实例中的消费者组的特定分区成功处理过的最后一条事件。使用CheckpointStore
实例来更新检查点和存储负载均衡算法所需的相关信息。
使用前缀azure-eventhub-checkpointstore
在pypi中搜索,以找到支持此功能的软件包,并使用来自此类软件包的CheckpointStore
实现。请注意,同步和异步库都提供了。
在以下示例中,我们创建了一个EventHubConsumerClient
实例并使用了一个BlobCheckpointStore
。您需要创建一个Azure Storage账户和一个Blob容器来运行此代码。
Azure Blob Storage Checkpoint Store Async和Azure Blob Storage Checkpoint Store Sync是我们提供的一些CheckpointStore
实现之一,它们将Azure Blob Storage作为持久化存储。
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from azure.identity.aio import DefaultAzureCredential
fully_qualified_namespace = '<< EVENT HUBS FULLY QUALIFIED NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
blob_account_url = '<< STORAGE ACCOUNT URL >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'
async def on_event(partition_context, event):
# do something
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def receive(client):
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
async def main():
checkpoint_store = BlobCheckpointStore(
blob_account_url=blob_account_url,
container_name=container_name,
credential=DefaultAzureCredential()
)
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=DefaultAzureCredential(),
consumer_group=consumer_group,
checkpoint_store=checkpoint_store, # For load balancing and checkpoint. Leave None for no load balancing
)
async with client:
await receive(client)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
使用EventHubConsumerClient与IoT Hub一起工作
您还可以使用EventHubConsumerClient
与IoT Hub一起工作。这对于从链接的Event Hub接收IoT Hub的遥测数据很有用。相关的连接字符串将不包含发送声明,因此无法发送事件。
请注意,连接字符串需要是与Event Hub兼容的端点,例如:"Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
有两种方法可以获得与Event Hubs兼容的端点
- 手动在Azure Portal中获取IoT Hub的“内置端点”并从中接收。
from azure.eventhub import EventHubConsumerClient
connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)
partition_ids = client.get_partition_ids()
- 通过编程方式检索内置的与Event Hubs兼容的端点。请参阅IoT Hub连接字符串示例。
故障排除
有关如何诊断各种失败场景的详细信息,请参阅azure-eventhub
的故障排除指南。
日志记录
- 启用
azure.eventhub
日志记录器以收集库中的跟踪信息。 - 通过在创建客户端时设置
logging_enable=True
来启用AMQP帧级别的跟踪。 - 有关如何为Python的Azure库配置日志记录的更多信息,请参阅此指南。
import logging
import sys
handler = logging.StreamHandler(stream=sys.stdout)
logger = logging.getLogger('azure.eventhub')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
...
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
producer = EventHubProducerClient(..., logging_enable=True)
consumer = EventHubConsumerClient(..., logging_enable=True)
下一步
更多示例代码
请查看samples目录,以了解如何使用此库发送和接收Event Hubs的事件。
文档
参考文档可在此处找到。
模式注册表和Avro编码器
EventHubs SDK与模式注册表服务和Avro集成良好。有关更多信息,请参阅模式注册表SDK和模式注册表Avro编码器SDK。
纯Python AMQP传输和向后兼容性支持
Azure Event Hubs客户端库现在基于纯Python AMQP实现。《uAMQP》作为必需依赖项已被移除。
要使用uAMQP
作为底层传输
- 使用pip安装
uamqp
。
$ pip install uamqp
- 在客户端构造期间传递
uamqp_transport=True
。
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
from azure.identity import DefaultAzureCredential
fully_qualified_namespace = '<< EVENT HUBS FULLY QUALIFIED NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=DefaultAzureCredential(),
uamqp_transport=True
)
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=DefaultAzureCredential(),
consumer_group=consumer_group,
uamqp_transport=True
)
注意:之前暴露了《uamqp.Message》的《EventData》/《EventDataBatch》上的message
属性已被弃用。已引入由《EventData.message》/《EventDataBatch.message》返回的“Legacy”对象,以帮助促进过渡。
从源构建uAMQP轮
如果uAMQP打算用作《azure-eventhub》底层AMQP协议实现,则大多数主要操作系统上都可以找到uAMQP轮。
如果您打算使用uAMQP
并且您正在运行的平台上没有提供uAMQP轮,请按照uAMQP安装指南从源安装。
提供反馈
如果您遇到任何错误或有所建议,请在项目的问题部分提交问题。
贡献
本项目欢迎贡献和建议。大多数贡献都需要您同意贡献者许可协议(CLA),声明您有权,并且实际上确实授予我们使用您的贡献的权利。有关详细信息,请访问https://cla.microsoft.com。
当您提交拉取请求时,CLA-bot将自动确定您是否需要提供CLA,并适当地装饰PR(例如,标签,评论)。只需遵循bot提供的说明即可。您只需在整个使用我们CLA的repo中进行一次。
本项目已采用Microsoft开源代码行为准则。有关更多信息,请参阅代码行为准则FAQ或通过opencode@microsoft.com联系以获得任何额外的问题或评论。
版本历史
5.12.2 (2024-10-02)
已修复的错误
- 为异步消费者实现背压以解决内存泄漏问题。(#36398)
5.12.1 (2024-06-11)
已修复的错误
- 修复了一个错误,其中消费者身份验证未使用正确的URI,导致在消费者组级别分配角色时出现问题。(#35337)
5.12.0 (2024-05-16)
添加的功能
- 添加了对非TLS连接的支持(#34272)
5.11.7 (2024-04-10)
已修复的错误
- 修复了在缓冲模式下使用《EventHubProducerClient》可能会丢失缓冲消息而未实际发送的错误。(#34712)
其他更改
- 更新网络跟踪日志,根据OpenTelemetry规范用空字符串替换AMQP连接信息中的
None
值。
5.11.6 (2024-02-12)
此版本及所有未来版本将需要Python 3.8+。Python 3.7不再受支持。
添加的功能
- 在《EventHubProducerClient》上添加了
keep_alive
功能,以允许长生命周期生产者。#33726
其他更改
- 添加了对Python 3.12的支持。
5.11.5 (2023-11-13)
已修复的错误
- 修复了pyAMQP的错误,该错误导致事件跨越多个TransferFrames时,每个消息消耗的连接信用超过1个信用。(#32767)
5.11.4 (2023-08-08)
添加的功能
- 在同步和异步的
EventHubConsumerClient
和EventHubProducerClient
中添加了新的浮点型关键字参数socket_timeout
。
已修复的错误
- 修复了错误#31258,该错误导致异步
BlobCheckpointStore
在错误后重新处理旧事件。
5.11.3 (2023-07-12)
已修复的错误
- 修复了错误
end frame received on invalid channel
,该错误在服务发送断开连接时引发(#30860) - 修复了错误
link already closed
,该错误在客户端关闭并与服务断开连接时引发(#30836)
其他更改
- 更新了跟踪(#29934)
- 修改了跨度名称
Azure.EventHubs.send
更改为EventHubs.send
Azure.EventHubs.message
更改为EventHubs.message
Azure.EventHubs.process
更改为EventHubs.process
- 在接收到事件时将创建一个
EventHubs.receive
跨度。 - 向跨度添加了额外的属性
messaging.system
- 消息系统(例如,eventhubs
)messaging.operation
- 操作类型(例如,publish
、receive
或process
)messaging.batch.message_count
- 发送、接收或处理的消息数(如果超过一个)
- 从所有跨度中删除了
component
属性。 - 现在,所有
send
跨度都包含指向message
跨度的链接。现在,message
跨度将不再包含指向send
跨度的链接。 - 消息应用程序属性现在将包含
traceparent
(如果适用,还包括tracestate
)的值 - 当基于每个消息进行事件处理时,进程跨度现在将是消息跨度上下文的直接子项。(#30537)
- 修改了跨度名称
5.11.2 (2023-03-20)
已修复的错误
- 修复了一个长期空闲期和网络中断后无法重新连接的bug(问题#28996)
5.11.1 (2023-01-25)
已修复的错误
- 修复了一个bug,当未安装
websocket-client
时,错误没有被正确捕获/引发(问题#28453)。
5.11.0 (2023-01-19)
版本5.11.0是基于纯Python实现的AMQP堆栈的Azure Event Hubs客户端库的第一个稳定版本。
添加的功能
- 在同步和异步的
EventHubProducerClient
/EventHubConsumerClient
构造函数中添加了新的布尔型关键字参数uamqp_transport
,它指示是否使用uamqp
库或默认纯Python AMQP库作为底层传输。
已修复的错误
- 修复了在启用跟踪时发送批次的错误(问题#27986)。
- 修复了当
EventHubSharedKeyCredential
返回一个类型为bytes
的AccessToken.token
而不是str
时的问题,现在与文档匹配。
其他更改
- 从
EventData
/EventDataBatch
上删除了message
属性,该属性以前公开了uamqp.Message
。- 引入了由
EventData
/EventDataBatch
上的message
属性返回的LegacyMessage
/LegacyBatchMessage
对象,以帮助促进过渡。
- 引入了由
- 删除了uAMQP作为必需依赖项。
- 将
uamqp >= 1.6.3
添加为使用uamqp_transport
关键字的可选依赖项。- 添加了对Python 3.11的支持。
5.8.0b2(2022-10-11)
添加的功能
- 将异步传输AMQP的依赖项从
websocket-client
更新到aiohttp
(问题#24315,感谢@hansmbakker的建议)。
5.8.0b1(2022-09-22)
此版本和所有未来的版本将需要Python 3.7+。Python 3.6不再受支持。
其他更改
- 向客户端添加了
uamqp_transport
可选参数,允许切换到作为传输的uamqp
库。
5.10.1 (2022-08-22)
本版本及所有未来版本将需要 Python 3.7+,Python 3.6 已不再支持。
已修复的错误
- 修复了在异步
BufferedProducer
中存在的bug,当清空队列时会导致阻塞,使客户端冻结(问题 #23510)。 - 修复了异步
EventHubProducerClient
和EventHubConsumerClient
中的bug,将from_connection_string
方法中的transport_type
参数的默认值设置为None
而不是TransportType.Amqp
。
其他更改
- 内部重构以支持即将推出的基于纯Python AMQP的版本。
- 已将uAMQP依赖项更新到1.6.0。
5.8.0a5(2022-07-19)
已修复的错误
- 修复了阻止定期刷新令牌的bug。
- 修复了错误传递调试关键字参数的bug,因此当请求时,会输出网络跟踪调试日志。
其他更改
- 添加了日志记录以跟踪适当的令牌刷新和检索,输出生产者初始化失败的原因。
5.10.0 (2022-06-08)
添加的功能
- 包括以下与事件缓冲发送相关的功能
- 为
EventHubProducerClient
添加了一个新的方法send_event
,允许发送单个EventData
或AmqpAnnotatedMessage
。 - 为
EventHubProducerClient
实现了缓冲模式发送,旨在允许在无需显式管理应用程序中的批次的情况下高效发布事件。EventHubProducerClient
构造函数和from_connection_string
方法现在接受以下新关键字参数进行配置buffered_mode
:启用/禁用缓冲模式发送的标志。on_success
:事件成功发布后将被调用的回调。on_error
:事件发布失败后将被调用的回调。max_buffer_length
:在触发刷新之前,每个分区可以缓存的最多事件数。max_wait_time
:在发布之前,等待缓冲区中的事件构建批次的时间量。
- 添加了一个新的方法
EventHubProducerClient.flush
,该方法将缓冲区中的事件立即发送。 - 添加了一个新的方法
EventHubProducerClient.get_buffered_event_count
,该方法返回给定分区中等待发布的事件的数量。 - 添加了一个新的属性
EventHubProducerClient.total_buffered_event_count
,该方法返回当前所有分区中等待发布的事件总数。 - 在
EventHubProducerClient.close
中添加了一个新的布尔关键字参数flush
,指示在关闭时是否刷新缓冲区。
- 为
5.8.0a4(2022-06-07)
添加的功能
- 添加了对使用WebSocket和HTTP代理建立连接的支持。
- 添加了对通过WebSocket使用自定义端点连接的支持。
5.9.0 (2022-05-10)
添加的功能
- 为
EventData
添加了类方法from_message_content
,以与Schema Registry Avro Encoder库进行互操作性,并接受content
和content_type
作为位置参数。
其他更改
- 与事件缓冲发送相关的功能仍在测试版中,不会包含在本版本中。
5.9.0b3(2022-04-20)
添加的功能
- 为
EventHubProducerClient
引入了新的方法send_event
,允许发送单个EventData
或AmqpAnnotatedMessage
。 - 为
EventHubProducerClient
引入了缓冲模式发送,旨在允许在无需显式管理应用程序中的批次的情况下高效发布事件。- 现在,
EventHubProducerClient
构造函数和from_connection_string
方法接受以下新关键字参数进行配置buffered_mode
:启用/禁用缓冲模式发送的标志。on_success
:事件成功发布后将被调用的回调。on_error
:事件发布失败后将被调用的回调。max_buffer_length
:在触发刷新之前,每个分区可以缓存的最多事件数。max_wait_time
:在发布之前,等待缓冲区中的事件构建批次的时间量。
- 引入了新的方法
EventHubProducerClient.flush
,该方法将缓冲区中的事件立即发送。 - 引入了新的方法
EventHubProducerClient.get_buffered_event_count
,该方法返回给定分区中等待发布的事件数量。 - 引入了新的属性
EventHubProducerClient.total_buffered_event_count
,该方法返回当前所有分区中等待发布的事件总数。 - 在
EventHubProducerClient.close
中引入了新的布尔关键字参数flush
,用于指示关闭时是否刷新缓冲区。
- 现在,
其他更改
- 更新了
EventData
的内部实现,以便与Schema Registry Avro Encoder库兼容。
5.9.0b2 (2022-03-09)
重大更改
- 为了与Schema Registry Avro Encoder库兼容,将
EventData
上的from_message_data
重命名为from_message_content
。将data
参数重命名为content
。
5.8.0a3 (2022-03-08)
其他更改
- 提高了异步发送和接收的性能。
5.9.0b1 (2022-02-09)
- 以下功能已暂时从异步
EventHubProducerClient
和EventHubConsumerClient
中移除,我们将在未来预览中将其添加回来,因为我们致力于发布一个稳定的版本:- 不支持将以下关键字参数传递给
EventHubProducerClient
和EventHubConsumerClient
的构造函数和from_connection_string
方法:transport_type
、http_proxy
、custom_endpoint_address
和connection_verify
。
- 不支持将以下关键字参数传递给
5.8.0a2 (2022-02-09)
添加的功能
- 添加了对异步
EventHubProducerClient
和EventHubConsumerClient
的支持。
5.8.0a1 (2022-01-13)
版本5.8.0a1是我们首次尝试基于纯Python实现的AMQP堆栈构建Azure Event Hubs客户端库的努力。
重大更改
- 以下功能已暂时移除,我们将在未来预览中将其添加回来,因为我们致力于发布一个稳定的版本:
- 不支持异步。
- 不支持将以下关键字参数传递给
EventHubProducerClient
和EventHubConsumerClient
的构造函数和from_connection_string
方法:transport_type
、http_proxy
、custom_endpoint_address
和connection_verify
。
其他更改
- 移除了uAMQP依赖项。
5.7.0 (2022-01-12)
此版本及所有未来版本将需要Python 3.6+。Python 2.7不再受支持。
添加的功能
- 添加了对固定(线性)重试退避的支持
- 同步/异步
EventHubProducerClient
和EventHubConsumerClient
的构造函数和from_connection_string
方法将retry_mode
作为关键字参数。
- 同步/异步
已修复的错误
- 修复了一个问题,即当客户端先前已关闭时,
EventHubProducerClient
可以重新打开以发送事件,而不是遇到KeyError
(问题#21849)。
其他更改
- 改进了令牌刷新时机,以防止在令牌即将过期时阻塞主流程。
- 将uAMQP依赖项更新到1.5.1。
5.6.1 (2021-10-06)
已修复的错误
- 修复了检查
azure.eventhub.amqp.AmqpMessageHeader
和azure.eventhub.amqp.AmqpMessageProperties
是否包含特定属性时使用in
关键字的问题。
其他更改
- 将uAMQP依赖项更新到1.4.3。
- 添加了对Python 3.10的支持。
- 修复了win32 socketio和tlsio中的内存泄漏(问题#19777)。
- 修复了在将AMQPValue转换为字符串的过程中出现的内存泄漏(问题#19777)。
5.6.0 (2021-07-07)
添加的功能
- 添加了对发送AMQP注释消息的支持,这允许完全访问AMQP消息字段。
- 引入了新的命名空间
azure.eventhub.amqp
。 - 添加了新的枚举类
azure.eventhub.amqp.AmqpMessageBodyType
,用于表示消息的正文类型,包括DATA
:消息的正文由一个或多个数据部分组成,每个部分包含不可见二进制数据。SEQUENCE
:消息的正文由一个或多个序列部分组成,每个部分包含任意数量的结构化数据元素。VALUE
:消息的正文由一个AMQP-value部分组成,该部分包含一个AMQP值。
- 引入了新的类
azure.eventhub.amqp.AmqpAnnotatedMessage
,用于访问低级别的AMQP消息部分,可以用于发送。 - 引入了新的类
azure.eventhub.amqp.AmqpMessageHeader
和azure.eventhub.amqp.AmqpMessageProperties
,用于访问AMQP头和属性。 - 在
azure.eventhub.EventData
上添加了新的属性body_type
,它返回azure.eventhub.amqp.AmqpMessageBodyType
。 - 在
azure.eventhub.EventData
上添加了新的只读属性raw_amqp_message
,它返回azure.eventhub.amqp.AmqpAnnotatedMessage
。
- 引入了新的命名空间
修复
- 将uAMQP依赖项更新到1.4.1。
- 修复了在MessageProperties上的属性creation_time、absolute_expiry_time和group_sequence应与Python 2.7上的整数类型兼容的问题。
5.5.0 (2021-05-13)
新功能
- 添加了对使用
azure.core.credentials.AzureNamedKeyCredential
作为生产者和消费者客户端凭证的支持。
错误修复
- 修复了自定义用户代理字符串应置于内置用户代理字符串之前,而不是附加在后面的bug。
- 将uAMQP依赖项更新到1.4.0。
- 修复了在链接附加过程中内存泄漏的问题,其中源和目标cython对象未正确释放(#15747)。
- 改进了管理操作回调,不再将非AMQP_TYPE_STRING类型的描述值解析为字符串(#18361)。
注意
- 将azure-core依赖项更新到1.14.0。
5.4.0 (2021-04-07)
本版本继版本5.3.1之后,而不是5.4.0b1,因此不包括预览idempotent生产者功能。
新功能
- 添加了对使用
azure.core.credentials.AzureSasCredential
作为生产者和消费者客户端认证凭证的支持。 - 更新了同步和异步的
CheckpointStore
上的list_ownership
、claim_ownership
、update_checkpoint
和list_checkpoints
,以支持接受**kwargs
。- 警告:实现不支持在上述方法中接受
**kwargs
的自定义checkpointstore将导致以下pylint错误:W0221: 参数与覆盖的______方法(arguments-differ)不同
。
- 警告:实现不支持在上述方法中接受
- 更新了同步和异步的
PartitionContext
上的update_checkpoint
,以支持接受**kwargs
。
错误修复
- 将uAMQP依赖项更新到1.3.0。
- 修复了在底层数据包连接丢失时,发送大尺寸消息引发段错误的问题(#13739、#14543)。
- 修复了链接流控制中的bug,其中链接信用和交付计数应根据每条消息而不是每个传输帧来计算(#16934)。
注意
- 将azure-core依赖项更新到1.13.0。
5.4.0b1(2021-03-09)
本版本及所有未来版本将需要Python 2.7或Python 3.6+,Python 3.5不再受支持。
新功能
- 添加了对幂等发布的支持,以尽量减少发布的事件重复次数。
EventHubProducerClient
构造函数接受两个新的幂等发布参数enable_idempotent_partitions
:一个布尔值,告诉EventHubProducerClient
是否启用幂等性。partition_config
:可以指定以影响特定于配置的事件中心分区的发布行为的配置集合。
- 在
EventHubProducerClient
上引入了新的方法get_partition_publishing_properties
,用于检查分区的发布状态信息。 - 在
EventData
上引入了新的属性published_sequence_number
,用于获取在事件成功发布时分配的发布序列号。 - 在
EventDataBatch
上引入了新的属性starting_published_sequence_number
,用于获取在批次成功发布时分配给批次中第一个事件的发布序列号。 - 引入了新的类
azure.eventhub.PartitionPublishingConfiguration
,它是一组配置,可以指定以影响直接向事件中心分区发布时的行为。
注意
- 将uAMQP依赖项更新到1.2.15。
5.3.1 (2021-03-09)
本版本将是官方支持Python 3.5的最后一个版本,未来的版本将需要Python 2.7或Python 3.6+。
错误修复
- 现在发送空的
event_data_batch
将不会执行任何操作,而不是引发错误。
5.3.0 (2021-02-08)
新功能
- 添加了一个
parse_connection_string
方法,它将连接字符串解析为包含其组成部分的属性包EventHubConnectionStringProperties
。 EventHubConsumerClient
和EventHubProducerClient
的构造函数和from_connection_string
方法现在接受两个新的可选参数custom_endpoint_address
,它允许指定用于与事件中心服务通信的端点,当您的网络不允许与标准事件中心端点通信时非常有用。connection_verify
允许指定用于验证连接端点身份的 SSL 证书的自定义 CA_BUNDLE 文件的路径。
注意
- 更新 uAMQP 依赖到 1.2.14。
5.2.1 (2021-01-11)
错误修复
- 更新
azure.eventhub.extension.__init__.py
以兼容 pkgutil 风格的命名空间(PR #13210,感谢 @pjachowi)。 - 更新 uAMQP 依赖到 1.2.13。
- 添加了对 Python 3.9 的支持。
- 修复了 macOS 无法检测网络错误的 bug(#15473)。
- 修复了在连接建立期间,
uamqp.ReceiveClient
和uamqp.ReceiveClientAsync
接收消息的 bug(#15555)。 - 修复了在 macOS 上使用 Clang 12 建立连接时触发未识别选择器异常的 bug(#15567)。
- 修复了访问消息属性时,当底层 C 字节为 NULL 时触发段错误的 bug(#15568)。
5.2.0 (2020-09-08)
新功能
- 使用
from_connection_string
方法使用的连接字符串现在支持使用SharedAccessSignature
密钥代替sharedaccesskey
和sharedaccesskeyname
,使用正确构造的令牌字符串作为值。
5.2.0b1 (2020-07-06)
新功能
EventHubConsumerClient
构造函数接受两个新的负载均衡器参数。load_balancing_strategy
,可以是 "greedy" 或 "balanced"。在贪婪策略中,一次负载均衡执行将声明尽可能多的分区以平衡负载,而在平衡策略中,一次负载均衡执行最多声明 1 个分区。partition_ownership_expiration_interval
,允许您自定义用于负载均衡的分区所有权过期时间。当过期间隔较小时,消费者客户端可能更频繁地丢失其拥有的分区。但较大的间隔可能导致空闲分区更长的时间未被声明。
- 为
load_balancing_strategy
添加了枚举类azure.eventhub.LoadBalancingStrategy
。
5.1.0 (2020-05-04)
新功能
EventHubProducerClient.send_batch
接受一个EventDataBatch
或有限列表的EventData
。 #9181- 添加了分布式追踪的 span 链的 enqueueTime。 #9599
错误修复
- 修复了在没有检查点存储时,将
azure.eventhub.EventhubConsumerClient
转换为独占接收器的 bug(#11181)。 - 更新 uAMQP 依赖到 1.2.7。
- 修复了在 MacOS 上设置 tlsio 证书时的 bug(#7201)。
- 修复了当在
EventHubConsumerClient
和EventHubProducerClient
中将logging_enable
设置为True
时,导致网络追踪中段错误的 bug。
5.1.0b1 (2020-04-06)
新功能
- 添加了
EventHubConsumerClient.receive_batch()
以批量接收和处理事件,而不是逐个处理。 #9184 EventHubConsumerCliuent.receive()
新增了max_wait_time
参数。在没有接收到事件且max_wait_time
不是None
或 0 时,每max_wait_time
调用一次on_event
。PartitionContext.update_checkpoint
的 event 参数现在是可选的。如果没有传递参数 event,则使用最后一个接收的事件。- 在从 IotHub 消费消息时,
EventData.system_properties
添加了缺失的属性。 #10408
5.0.1 (2020-03-09)
错误修复
- 修复了在接收事件时使用
azure.eventhub.EventHubConsumerClient
吞吐错误的 bug(#9660)。 - 修复了在 Azure Stack 上调用
get_eventhub_properties
、get_partition_ids
和get_partition_properties
时引发错误的 bug(#9920)。
5.0.0 (2020-01-13)
重大更改
EventData
- 移除了已弃用的属性
application_properties
和已弃用的方法encode_message()
。
- 移除了已弃用的属性
EventHubConsumerClient
- 当
EventHubConsumerClient
失败于声明分区所有权时,会调用on_error
。 - 在
on_event
回调抛出异常的情况下,会调用on_partition_close
和on_partition_initialize
。- 在这种情况下,
EventHubConsumerClient
将关闭并重新打开内部分区接收器。
- 在这种情况下,
- 在从错误中恢复后,
EventHubConsumerClient
应从哪里恢复接收的默认起始位置已被重新优先排序。- 如果有检查点,它将从检查点恢复。
- 如果没有检查点但提供了
starting_position
,它将从starting_position
恢复。 - 如果没有检查点或
starting_position
,它将从最新位置恢复。
- 当
PartitionContext
update_checkpoint
会在没有明确提供检查点存储时执行内存中的检查点,而不是什么也不做。- 内存中的检查点将用于
EventHubConsumerClient
接收恢复。
- 内存中的检查点将用于
- 当服务返回错误状态码时,
get_partition_ids
、get_partition_properties
、get_eventhub_properties
将引发错误。- 当服务返回错误代码401时,将引发
AuthenticationError
。 - 当服务返回错误代码404时,将引发
ConnectError
。 - 当服务返回其他错误代码时,将引发
EventHubError
。
- 当服务返回错误代码401时,将引发
5.0.0b6 (2019-12-03)
重大更改
- 现在所有异常都应该从
azure.eventhub.exceptions
导入。 - 引入了用于同步和异步操作的单独的
EventHubSharedKeyCredential
对象。对于异步,从azure.eventhub.aio
命名空间导入凭据对象。 EventData
- 已将属性
application_properties
重命名为properties
。 EventData
不再具有属性last_enqueued_event_properties
- 请在PartitionContext
上使用此属性。
- 已将属性
EvenDataBatch
EventDataBatch.try_add
已重命名为EventDataBatch.add
。- 已将属性
size
重命名为size_in_bytes
。 - 已将属性
max_size
重命名为max_size_in_bytes
。
EventHubConsumerClient
和EventHubProducerClient
- 已将方法
get_properties
重命名为get_eventhub_properties
。 - 在构造函数中已重命名参数:
host
重命名为fully_qualified_namespace
,event_hub_path
重命名为eventhub_name
。 - 在
get_partition_properties
中已重命名参数:partition
重命名为partition_id
。 - 已重命名参数
consumer_group_name
为consumer_group
,并将其从receive
方法移动到EventHubConsumerClient
的构造函数中。 - 在
EventHubConsumerClient
的receive
方法上已重命名参数initial_event_position
为starting_position
。 - 在构造函数和客户端对象的
from_connection_string
方法中将event_hub_path
重命名为eventhub_name
。 EventHubProducerClient.send
已重命名为send_batch
,它只接受EventDataBatch
对象作为输入。EventHubProducerClient.create_batch
现在还接受可选的partition_id
和partition_key
参数(这些参数不再在发送时指定)。
- 已将方法
- 已将模块
PartitionManager
重命名为CheckpointStore
。 - 接收事件回调参数已重命名为
on_event
,现在它在一个单独的事件上操作,而不是在事件列表上。 - 已删除类
EventPostition
。receive
方法的starting_position
参数接受offset(str
)、序列号(int
)、datetime (datetime.datetime
)或这些类型的dict
。receive
方法的starting_position_inclusive
参数接受bool
或dict
,指示给定的事件位置是否包含。
PartitionContext
不再具有属性owner_id
。PartitionContext
现在具有属性last_enqueued_event_properties
,如果receive
方法中将track_last_enqueued_event_properties
设置为True
,则填充此属性。
新功能
- 在
EventHubConsumerClient
和EventHubProducerClient
的构造和from_connection_string
中添加了新参数idle_timeout
,在此之后,如果没有任何进一步的活动,将关闭底层连接。
5.0.0b5 (2019-11-04)
重大更改
- 已删除
EventHubClient
、EventHubConsumer
和EventHubProducer
。请使用EventHubProducerClient
和EventHubConsumerClient
。- 两个对象的构建与之前客户的相同。
- 引入了
EventHubProducerClient
作为EventHubProducer
的替代。EventHubProducerClient
支持向不同的分区发送事件。
- 引入了
EventHubConsumerClient
作为EventHubConsumer
的替代。EventHubConsumerClient
支持从单个/所有分区接收事件。- 不再有直接返回
EventData
的方法,所有接收操作都通过回调方法on_events
完成。
EventHubConsumerClient
承担了EventProcessor
的责任。EventHubConsumerClient
现在接受PartitionManager
以进行负载均衡和检查点。
- 用四个独立的回调方法(由
EventHubConsumerClient
上的receive
方法接受)替换了PartitionProcessor
。- 在收到事件时调用
on_events(partition_context, events)
。 - 当发生错误时调用
on_error(partition_context, exception)
。 - 当分区消费者打开时调用
on_partition_initialize(partition_context)
。 - 当分区消费者关闭时调用
on_partition_close(partition_context, reason)
。
- 在收到事件时调用
- 一些可以从几个不同地方导入的模块和类已被移除。
- 已移除
azure.eventhub.common
。从azure.eventhub
导入。 - 已移除
azure.eventhub.client_abstract
。使用azure.eventhub.EventHubProducerClient
或azure.eventhub.EventHubConsumerClient
代替。 - 已移除
azure.eventhub.client
。使用azure.eventhub.EventHubProducerClient
或azure.eventhub.EventHubConsumerClient
代替。 - 已移除
azure.eventhub.producer
。使用azure.eventhub.EventHubProducerClient
代替。 - 已移除
azure.eventhub.consumer
。使用azure.eventhub.EventHubConsumerClient
代替。 - 已移除
azure.eventhub.aio.client_async
。使用azure.eventhub.aio.EventHubProducerClient
或azure.eventhub.aio.EventHubConsumerClient
代替。 - 已移除
azure.eventhub.aio.producer_async
。使用azure.eventhub.aio.EventHubProducerClient
代替。 - 已移除
azure.eventhub.aio.consumer_async
。使用azure.eventhub.aio.EventHubConsumerClient
代替。 - 已移除
azure.eventhub.aio.event_processor.event_processor
。使用azure.eventhub.aio.EventHubConsumerClient
代替。 - 已移除
azure.eventhub.aio.event_processor.partition_processor
。使用回调方法代替。 - 已移除
azure.eventhub.aio.event_processor.partition_manager
。从azure.eventhub.aio
导入。 - 已移除
azure.eventhub.aio.event_processor.partition_context
。从azure.eventhub.aio
导入。 - 已移除
azure.eventhub.aio.event_processor.sample_partition_manager
。
- 已移除
错误修复
- 修复了用户代理字符串解析错误。
5.0.0b4 (2019-10-08)
新功能
- 添加了对跟踪的支持(问题编号 #7153)。
- 向
EventHubConsumer
添加了跟踪分区最后入队事件属性的功能。- 在方法
EventHubClient.create_consumer()
中添加了新的布尔类型参数track_last_enqueued_event_properties
。 EventHubConsumer
添加了新的属性last_enqueued_event_properties
,它包含序列号、偏移量、入队时间和检索时间信息。- 默认情况下,该功能是禁用的,因为它会在打开时额外消耗带宽以传输更多信息。
- 在方法
重大更改
- 移除了对IoT Hub直接连接的支持。
- 可以使用与EventHubs兼容的连接字符串创建
EventHubClient
并从IoT Hub读取属性或事件。IoT Hub的EventHubs兼容连接字符串
- 可以使用与EventHubs兼容的连接字符串创建
- 移除了向IoT Hub发送EventData的支持。
- 移除了
EventHubConsumer
和EventHubProcuer
方法close()
中的参数exception
。 - 更新uAMQP依赖项到1.2.3。
5.0.0b3 (2019-09-10)
新功能
- 添加了对多个
EventProcessor
之间自动负载均衡的支持。 - 添加了实现
PartitionManager
的BlobPartitionManager
。- Azure Blob Storage用于存储由
EventProcessor
使用的数据。 - 作为插件单独打包,以供
EventProcessor
使用。 - 有关详细信息,请参阅Azure Blob Storage Partition Manager。
- Azure Blob Storage用于存储由
- 在
EventData
上添加了system_properties
属性。
重大更改
- 移除了
PartitionProcessor
的构造函数方法。请实现initialize
方法进行初始化。 - 将
CheckpointManager
替换为PartitionContext
。PartitionContext
包含分区上下文信息和update_checkpoint
方法。
- 将
PartitionProcessor
的所有方法更新,将PartitionContext
作为参数的一部分。 - 将
EventHub/EventHubConsumer/EventHubProducer
中的类成员的访问性更改为私有。 - 将
azure.eventhub.eventprocessor
移动到aio
包下,现在为azure.eventhub.aio.eventprocessor
。
5.0.0b2 (2019-08-06)
新功能
- 在
EventHubProducer
上添加了create_batch
方法,用于创建一个EventDataBatch
,可以用来添加事件,直到达到最大大小。- 然后可以使用此批处理对象在
send()
方法中发送所有添加的事件到事件中心。 - 这允许发布者构建批处理,而不会在发送事件时遇到超出支持限制的错误。
- 这也允许关注带宽的发布者控制每个发布的批次大小。
- 然后可以使用此批处理对象在
- 添加了用于重试操作之间的指数延迟的新配置参数。
retry_total
:重做失败操作的总尝试次数。backoff_factor
:延迟时间因子。backoff_max
:总的延迟时间最大值。
- 为
EventHubClient
添加了上下文管理器支持。 - 为发送操作添加了新的错误类型
OperationTimeoutError
。 - 引入了一个新的类
EventProcessor
,它取代了较旧的 事件处理器主机 概念。这个早期预览旨在允许用户使用单个EventProcessor
实例测试新设计。将检查点保存到持久存储的能力将在未来的更新中添加。EventProcessor
:EventProcessor 为事件中心的所有分区创建并运行消费者。PartitionManager
:PartitionManager 定义了获取/声明分区所有权和更新检查点的接口。PartitionProcessor
:PartitionProcessor 定义了处理事件的接口。CheckpointManager
:CheckpointManager 负责在事件处理期间更新检查点。
重大更改
EventProcessorHost
已被EventProcessor
替换,请参阅详细信息中的新功能。- 将 EventHubClient 的
max_retries
配置参数替换为retry_total
。
5.0.0b1 (2019-06-25)
版本 5.0.0b1 是我们创建一个对 Python 生态系统友好且符合习惯的客户端库的预览。这次更新中大多数更改的原因可以在 Azure SDK Python 设计指南 中找到。更多信息,请访问 https://aka.ms/azure-sdk-preview1-python。
新功能
- 为创建 EventHubClient 添加了新的配置参数。
credential
:用于身份验证的凭据对象,实现了获取令牌的TokenCredential
接口。transport_type
:用于与事件中心服务通信的传输协议类型。max_retries
:当发生错误时重做失败操作的最大尝试次数。- 有关配置参数的详细信息,请阅读参考文档。
- 为 EventHubClient 添加了新的方法
get_partition_properties
和get_partition_ids
。 - 添加了对 http 代理的支持。
- 添加了对使用 azure-identity 凭据进行身份验证的支持。
- 添加了对通过 WebSocket 使用 AMQP 传输的支持。
重大更改
- 新的错误层次结构
azure.error.EventHubError
azure.error.ConnectionLostError
azure.error.ConnectError
azure.error.AuthenticationError
azure.error.EventDataError
azure.error.EventDataSendError
- 将发送器/接收器重命名为 EventHubProducer/EventHubConsumer。
- 将EventHubClient中的
add_sender
重命名为create_producer
,将add_receiver
重命名为create_consumer
。 - EventHubConsumer现在可以迭代。
- 将EventHubClient中的
- 将类azure.eventhub.Offset重命名为azure.eventhub.EventPosition。
- 将EventHubClient中的方法
get_eventhub_info
重命名为get_properties
。 - 重新组织连接管理,EventHubClient不再负责打开/关闭EventHubProducer/EventHubConsumer。
- 每个EventHubProducer/EventHubConsumer负责自己的连接管理。
- 为EventHubProducer和EventHubConsumer添加了对上下文管理器的支持。
- 将异步API组织到"azure.eventhub.aio"命名空间中,并删除了"_async"后缀。
- 将uAMQP依赖更新到1.2。
1.3.1 (2019-02-28)
错误修复
- 修复了datetime offset过滤器使用本地时间戳而不是UTC的问题。
- 修复了连续连接重试尝试中的stackoverflow错误。
1.3.0 (2019-01-29)
错误修复
- 添加了对令牌过期和其他认证错误自动重连的支持(问题#89)。
特性
- 添加了从现有的SAS认证令牌创建ServiceBusClient的功能,包括提供在令牌过期时自动续签该令牌的功能。
- 添加了对在检查点中存储自定义EPH上下文值的支持(问题#84,感谢@konstantinmiller)
1.2.0 (2018-11-29)
- 支持在azure.eventhub模块中使用Python 2.7(azure.eventprocessorhost将不支持Python 2.7)。
- 将EventData.enqueued_time解析为UTC时间戳(问题#72,感谢@vjrantal)
1.1.1 (2018-10-03)
- 修复了Azure命名空间包中的错误。
1.1.0 (2018-09-21)
-
更改了
AzureStorageCheckpointLeaseManager
参数以支持其他连接选项(问题#61)storage_account_name
、storage_account_key
和lease_container_name
参数现在是可选关键字参数。- 添加了一个必须与
storage_account_name
一起指定的sas_token
参数,以替代storage_account_key
。 - 添加了一个
endpoint_suffix
参数以支持国家云中的存储端点。 - 添加了一个
connection_string
参数,如果指定,将覆盖所有其他端点参数。 - 如果没有指定,
lease_container_name
参数现在默认为"eph-leases"
。
-
修复了如果运行多次调用时客户端无法启动的问题(问题#64)。
-
为EventData对象添加了方便的方法
body_as_str
和body_as_json
以简化消息数据的处理。
1.0.0 (2018-08-22)
- API稳定。
- 将内部
_async
模块重命名为async_ops
以生成文档。 - 为
EventHubClient
和EventHubClientAsync
添加了可选的auth_timeout
参数,用于配置令牌协商完成允许的最长时间。默认值为60秒。 - 为
EventHubClient.add_sender
和EventHubClientAsync.add_async_sender
添加了可选的send_timeout
参数,以确定成功发送事件的超时时间。默认值为60秒。 - 对日志进行了格式化以优化性能。
0.2.0 (2018-08-06)
-
对EPH进行了稳定性改进。
-
更新了uAMQP版本。
-
为发送器和接收器添加了新的配置选项:
keep_alive
和auto_reconnect
。这些标志已添加到以下内容中:EventHubClient.add_receiver
EventHubClient.add_sender
EventHubClientAsync.add_async_receiver
EventHubClientAsync.add_async_sender
EPHOptions.keey_alive_interval
EPHOptions.auto_reconnect_on_error
0.2.0rc2 (2018-07-29)
- 重大变更
EventData.offset
现在将返回类型为~uamqp.common.Offset
的对象,而不是str。原始字符串值可以从~uamqp.common.Offset.value
中获取。 - 每个发送器/接收器现在将在其独立连接中运行。
- 将uAMQP依赖更新到0.2.0。
- 修复了IoTHub客户端无法检索分区信息的问题。
- 为EventHubClient和EPH添加了对HTTP代理设置的 支持。
- 添加了错误处理策略,在可重试错误时自动重连。
- 添加了保持连接的线程以维护未使用的连接。
0.2.0rc1 (2018-07-06)
- 重大变更 重新结构化库以支持Python 3.7。子模块
async
已重命名,并且现在可以直接从azure.eventhub导入该模块中的所有类。 - 重大变更 从
Receiver.receive
和AsyncReceiver.receive
中删除了可选的callback
参数。 - 重大变更
EventData.properties
已重命名为EventData.application_properties
。这消除了消息可能通过回调被处理,而这些回调尚未在批次中返回的情况。 - 更新了 uAMQP 依赖项到 v0.1.0
- 增加了构建 IoTHub 连接的支持。
- 修复了接收操作中的内存泄漏。
- 取消了对 Python 2.7 轮子支持。
0.2.0b2 (2018-05-29)
- 将
namespace_suffix
添加到 EventHubConfig(),以支持国家云。 - 将
device_id
属性添加到 EventData,以支持 IoT Hub 用例。 - 添加了消息头,以解决支持 PartitionKey 的服务错误。
- 更新了 uAMQP 依赖项到 vRC1。
0.2.0b1 (2018-04-20)
- 将 uAMQP 更新到最新版本。
- 进一步测试和修复了一些小错误。
0.2.0a2 (2018-04-02)
- 更新了 uAQMP 依赖项。
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分发
构建分发
azure_eventhub-5.12.2.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 92bd42a73e426947d98e7688837e99df4b5028ab245677680c6135836ca59558 |
|
MD5 | 1150cac315c47265588f1f02baf20f3e |
|
BLAKE2b-256 | 6531bf73f1f2816dc2b606b06b85b557637e7322bd4cac963d9f720aedb570b0 |
azure_eventhub-5.12.2-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | a75fc0e21919cd164b111e9a09b6e3e59749dd35956402294af8d0d2a5bfca27 |
|
MD5 | 48a0aa40a92c2244857a1fff8c860221 |
|
BLAKE2b-256 | ea9ab53475d934953e57e9bba18faecd762cc43bbb9525df72fa508001c3713e |