Microsoft Azure Event Hubs使用Blob存储客户端库的检查点实现
项目描述
使用存储Blob的Python Azure EventHubs Checkpoint Store客户端库
Azure EventHubs Checkpoint Store用于在处理来自Azure Event Hubs的事件时存储检查点。此检查点存储包作为插件包与EventHubConsumerClient
一起工作。它使用Azure Storage Blob作为持久存储,以维护检查点和分区所有权信息。
请注意,这是一个同步库,对于Azure EventHubs Checkpoint Store客户端库的异步版本,请参阅azure-eventhub-checkpointstoreblob-aio。
源代码 | 包(PyPi) | API参考文档 | Azure Eventhubs文档 | Azure存储文档
开始使用
先决条件
-
Python 2.7、Python 3.6或更高版本。
-
Microsoft Azure 订阅:要使用 Azure 服务,包括 Azure 事件中心,您需要一个订阅。如果您没有现有的 Azure 帐户,您可以在创建帐户时注册免费试用或使用您的 MSDN 订阅者福利。
-
具有事件中心的事件中心命名空间:要与 Azure 事件中心交互,您还需要一个命名空间和事件中心。如果您不熟悉创建 Azure 资源,您可能希望遵循使用 Azure 门户创建事件中心的分步指南。在那里,您还可以找到使用 Azure CLI、Azure PowerShell 或 Azure 资源管理器 (ARM) 模板创建事件中心的详细说明。
-
Azure 存储帐户:您需要一个 Azure 存储帐户并创建一个 Azure Blob 存储块容器来存储检查点数据。您可以遵循创建 Azure 块 Blob 存储帐户的指南。
安装包
$ pip install azure-eventhub-checkpointstoreblob
关键概念
检查点
检查点是读取器在分区事件序列中标记或提交其位置的过程。检查点是消费者的责任,并在消费者组内的每个分区基础上发生。这个责任意味着对于每个消费者组,每个分区读取器必须跟踪其在事件流中的当前位置,并在认为数据流完成时通知服务。如果读取器从分区断开连接,当它重新连接时,它将从消费者组中该分区的最后读取者之前提交的检查点开始读取。当读取器连接时,它将偏移量传递给事件中心以指定开始读取的位置。这样,您可以使用检查点将事件标记为“完成”,并为在不同机器上运行的读取器之间的故障转移提供弹性。您可以通过指定从检查点过程较低的偏移量来返回旧数据。通过这个机制,检查点可以实现故障转移弹性和事件流回放。
偏移量与序列号
偏移量和序列号都指的是事件在分区内的位置。您可以将它们视为客户端游标。偏移量是事件的字节编号。偏移量/序列号使事件消费者(读取器)能够指定他们希望从中开始读取事件的流中的点。您可以指定一个时间戳,这样您只会收到在给定时间戳之后入队的事件。消费者负责在其自己的 Event Hubs 服务之外存储其自己的偏移量值。在分区中,每个事件都包含一个偏移量、序列号以及入队时的时间戳。
示例
创建 EventHubConsumerClient
创建 EventHubConsumerClient
的最简单方法是使用连接字符串。
from azure.eventhub import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
有关创建 EventHubConsumerClient
的其他方法的详细信息,请参阅EventHubs 库。
使用 BlobCheckpointStore
进行检查点以消费事件
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
def on_event(partition_context, event):
# Put your code here.
partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
with client:
client.receive(on_event)
if __name__ == '__main__':
main()
使用不同版本的 Azure 存储服务 API 的 BlobCheckpointStore
一些环境有不同的 Azure 存储服务 API 版本。《BlobCheckpointStore》默认使用存储服务 API 版本 2019-07-07。要使用不同版本,请在创建《BlobCheckpointStore》对象时指定《api_version》。
故障排除
常规
启用日志记录将有助于进行故障排除。
日志记录
- 启用
azure.eventhub.extensions.checkpointstoreblob
日志记录器以收集库中的跟踪信息。 - 启用
azure.eventhub
日志记录器以收集主 azure-eventhub 库中的跟踪信息。 - 启用
azure.eventhub.extensions.checkpointstoreblob._vendor.storage
日志记录器以收集 azure 存储blob库中的跟踪信息。 - 启用
uamqp
日志记录器以收集底层 uAMQP 库中的跟踪信息。 - 在创建客户端时设置
logging_enable=True
以启用 AMQP 帧级别跟踪。
下一步
更多示例代码
开始使用我们的 EventHubs Checkpoint Store 示例。
- receive_events_using_checkpoint_store.py - 使用 blob checkpoint store 的 EventHubConsumerClient 示例
- receive_events_using_checkpoint_store_storage_api_version.py - 使用 blob checkpoint store 和存储版本示例的 EventHubConsumerClient
文档
参考文档可在 此处 获取。
提供反馈
如果您遇到任何错误或有所建议,请在项目的 问题 部分提交问题。
贡献
本项目欢迎贡献和建议。大多数贡献都需要您同意贡献者许可协议(CLA),声明您有权利,并且实际上确实授予我们使用您的贡献的权利。有关详细信息,请访问 https://cla.microsoft.com。
当您提交拉取请求时,CLA-bot 将自动确定您是否需要提供 CLA 并相应地装饰 PR(例如,标签、注释)。只需遵循 bot 提供的说明即可。您只需在整个使用我们的 CLA 的所有存储库中这样做一次。
本项目已采用 Microsoft Open Source Code of Conduct。有关更多信息,请参阅 行为准则 FAQ 或通过 opencode@microsoft.com 联系我们,以了解任何额外的问题或评论。
发布历史
1.1.4 (2021-04-07)
此版本和所有未来版本将需要 Python 2.7 或 Python 3.6+,Python 3.5 不再受支持。
新功能
- 已更新
list_ownership
、claim_ownership
、update_checkpoint
、list_checkpoints
在BlobCheckpointStore
上,以支持接受**kwargs
。
1.1.3 (2021-03-09)
此版本将是最后支持 Python 3.5 的官方版本,未来版本将需要 Python 2.7 或 Python 3.6+。
错误修复
- 已更新供应商 azure-storage-blob 依赖项到 v12.7.1。
- 修复了由于请求日期头过旧导致的存储 blob 身份验证失败 (#16192)。
1.1.2 (2021-01-11)
错误修复
- 修复了一个错误,当与启用 Data Lake 的 Blob Storage 一起工作时,由于读取父节点空元数据而触发
BlobCheckpointStore.list_ownership
和BlobCheckpointStore.list_checkpoints
的KeyError
。
1.1.1 (2020-09-08)
错误修复
- 修复了一个错误,如果启用存储帐户 "File share soft delete",则可能会逐渐减慢从存储 blob 获取检查点数据。#12836
1.1.0 (2020-03-09)
新功能
BlobCheckpointStore
的api_version
参数现在支持 Azure 存储服务 API 的旧版本。
1.0.0 (2020-01-13)
稳定版本。没有新功能或 API 变更。
1.0.0b6 (2019-12-04)
破坏性变更
- 已将
BlobPartitionManager
重命名为BlobCheckpointStore
。 BlobCheckpointStore
的构造函数已更新,可以直接接受存储容器详情,而不是ContainerClient
实例。- 为 Blob 存储连接字符串添加了
from_connection_string
构造函数。 - 模块
blobstoragepm
现在是内部的,所有导入应直接从azure.eventhub.extensions.checkpointstoreblob
进行。 BlobCheckpointStore
现在有一个close()
函数,用于关闭 HTTP 连接池,此外该对象还可以用作上下文管理器来管理连接。
1.0.0b5 (2019-11-04)
新功能
- 使用 Azure Blob Storage 块 Blob 存储事件处理器检查点数据的
BlobPartitionManager
项目详情
azure-eventhub-checkpointstoreblob-1.1.4.zip 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 9bd98666ebefdc3e9e3682092bd80edf4956083003b0354d0ce8c7fd9b28b0d5 |
|
MD5 | 35af79961245d4dccf2d523174b513f4 |
|
BLAKE2b-256 | a74e6425436dc6365909d810c75dc913eb3f7cfb172691ac3841ea106c2e7aac |
azure_eventhub_checkpointstoreblob-1.1.4-py2.py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | b1089c9b87473be8c1b67ae09f0da33590f99fb747e4126d2187b2d14d16acb7 |
|
MD5 | 85c79ba018ead70e5253b2840ece9adc |
|
BLAKE2b-256 | f783233253f67f69e4a0722bb0ca46fbbe44e0fac2aba39ef6c60a95c494bb0f |