跳转到主要内容

Microsoft Azure Event Hubs使用Blob存储客户端库的检查点实现

项目描述

使用存储Blob的Python Azure EventHubs Checkpoint Store客户端库

Azure EventHubs Checkpoint Store用于在处理来自Azure Event Hubs的事件时存储检查点。此检查点存储包作为插件包与EventHubConsumerClient一起工作。它使用Azure Storage Blob作为持久存储,以维护检查点和分区所有权信息。

请注意,这是一个同步库,对于Azure EventHubs Checkpoint Store客户端库的异步版本,请参阅azure-eventhub-checkpointstoreblob-aio

源代码 | 包(PyPi) | API参考文档 | Azure Eventhubs文档 | Azure存储文档

开始使用

先决条件

  • Python 2.7、Python 3.6或更高版本。

  • Microsoft Azure 订阅:要使用 Azure 服务,包括 Azure 事件中心,您需要一个订阅。如果您没有现有的 Azure 帐户,您可以在创建帐户时注册免费试用或使用您的 MSDN 订阅者福利。

  • 具有事件中心的事件中心命名空间:要与 Azure 事件中心交互,您还需要一个命名空间和事件中心。如果您不熟悉创建 Azure 资源,您可能希望遵循使用 Azure 门户创建事件中心的分步指南。在那里,您还可以找到使用 Azure CLI、Azure PowerShell 或 Azure 资源管理器 (ARM) 模板创建事件中心的详细说明。

  • Azure 存储帐户:您需要一个 Azure 存储帐户并创建一个 Azure Blob 存储块容器来存储检查点数据。您可以遵循创建 Azure 块 Blob 存储帐户的指南。

安装包

$ pip install azure-eventhub-checkpointstoreblob

关键概念

检查点

检查点是读取器在分区事件序列中标记或提交其位置的过程。检查点是消费者的责任,并在消费者组内的每个分区基础上发生。这个责任意味着对于每个消费者组,每个分区读取器必须跟踪其在事件流中的当前位置,并在认为数据流完成时通知服务。如果读取器从分区断开连接,当它重新连接时,它将从消费者组中该分区的最后读取者之前提交的检查点开始读取。当读取器连接时,它将偏移量传递给事件中心以指定开始读取的位置。这样,您可以使用检查点将事件标记为“完成”,并为在不同机器上运行的读取器之间的故障转移提供弹性。您可以通过指定从检查点过程较低的偏移量来返回旧数据。通过这个机制,检查点可以实现故障转移弹性和事件流回放。

偏移量与序列号

偏移量和序列号都指的是事件在分区内的位置。您可以将它们视为客户端游标。偏移量是事件的字节编号。偏移量/序列号使事件消费者(读取器)能够指定他们希望从中开始读取事件的流中的点。您可以指定一个时间戳,这样您只会收到在给定时间戳之后入队的事件。消费者负责在其自己的 Event Hubs 服务之外存储其自己的偏移量值。在分区中,每个事件都包含一个偏移量、序列号以及入队时的时间戳。

示例

创建 EventHubConsumerClient

创建 EventHubConsumerClient 的最简单方法是使用连接字符串。

from azure.eventhub import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

有关创建 EventHubConsumerClient 的其他方法的详细信息,请参阅EventHubs 库

使用 BlobCheckpointStore 进行检查点以消费事件

from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'


def on_event(partition_context, event):
    # Put your code here.
    partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    with client:
        client.receive(on_event)

if __name__ == '__main__':
    main()

使用不同版本的 Azure 存储服务 API 的 BlobCheckpointStore

一些环境有不同的 Azure 存储服务 API 版本。《BlobCheckpointStore》默认使用存储服务 API 版本 2019-07-07。要使用不同版本,请在创建《BlobCheckpointStore》对象时指定《api_version》。

故障排除

常规

启用日志记录将有助于进行故障排除。

日志记录

  • 启用 azure.eventhub.extensions.checkpointstoreblob 日志记录器以收集库中的跟踪信息。
  • 启用 azure.eventhub 日志记录器以收集主 azure-eventhub 库中的跟踪信息。
  • 启用 azure.eventhub.extensions.checkpointstoreblob._vendor.storage 日志记录器以收集 azure 存储blob库中的跟踪信息。
  • 启用 uamqp 日志记录器以收集底层 uAMQP 库中的跟踪信息。
  • 在创建客户端时设置 logging_enable=True 以启用 AMQP 帧级别跟踪。

下一步

更多示例代码

开始使用我们的 EventHubs Checkpoint Store 示例

文档

参考文档可在 此处 获取。

提供反馈

如果您遇到任何错误或有所建议,请在项目的 问题 部分提交问题。

贡献

本项目欢迎贡献和建议。大多数贡献都需要您同意贡献者许可协议(CLA),声明您有权利,并且实际上确实授予我们使用您的贡献的权利。有关详细信息,请访问 https://cla.microsoft.com

当您提交拉取请求时,CLA-bot 将自动确定您是否需要提供 CLA 并相应地装饰 PR(例如,标签、注释)。只需遵循 bot 提供的说明即可。您只需在整个使用我们的 CLA 的所有存储库中这样做一次。

本项目已采用 Microsoft Open Source Code of Conduct。有关更多信息,请参阅 行为准则 FAQ 或通过 opencode@microsoft.com 联系我们,以了解任何额外的问题或评论。

Impressions

发布历史

1.1.4 (2021-04-07)

此版本和所有未来版本将需要 Python 2.7 或 Python 3.6+,Python 3.5 不再受支持。

新功能

  • 已更新 list_ownershipclaim_ownershipupdate_checkpointlist_checkpointsBlobCheckpointStore 上,以支持接受 **kwargs

1.1.3 (2021-03-09)

此版本将是最后支持 Python 3.5 的官方版本,未来版本将需要 Python 2.7 或 Python 3.6+。

错误修复

  • 已更新供应商 azure-storage-blob 依赖项到 v12.7.1。
    • 修复了由于请求日期头过旧导致的存储 blob 身份验证失败 (#16192)。

1.1.2 (2021-01-11)

错误修复

  • 修复了一个错误,当与启用 Data Lake 的 Blob Storage 一起工作时,由于读取父节点空元数据而触发 BlobCheckpointStore.list_ownershipBlobCheckpointStore.list_checkpointsKeyError

1.1.1 (2020-09-08)

错误修复

  • 修复了一个错误,如果启用存储帐户 "File share soft delete",则可能会逐渐减慢从存储 blob 获取检查点数据。#12836

1.1.0 (2020-03-09)

新功能

  • BlobCheckpointStoreapi_version 参数现在支持 Azure 存储服务 API 的旧版本。

1.0.0 (2020-01-13)

稳定版本。没有新功能或 API 变更。

1.0.0b6 (2019-12-04)

破坏性变更

  • 已将 BlobPartitionManager 重命名为 BlobCheckpointStore
  • BlobCheckpointStore 的构造函数已更新,可以直接接受存储容器详情,而不是 ContainerClient 实例。
  • 为 Blob 存储连接字符串添加了 from_connection_string 构造函数。
  • 模块 blobstoragepm 现在是内部的,所有导入应直接从 azure.eventhub.extensions.checkpointstoreblob 进行。
  • BlobCheckpointStore 现在有一个 close() 函数,用于关闭 HTTP 连接池,此外该对象还可以用作上下文管理器来管理连接。

1.0.0b5 (2019-11-04)

新功能

  • 使用 Azure Blob Storage 块 Blob 存储事件处理器检查点数据的 BlobPartitionManager

Impressions

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

azure-eventhub-checkpointstoreblob-1.1.4.zip (372.8 kB 查看哈希值)

上传时间

构建分布

azure_eventhub_checkpointstoreblob-1.1.4-py2.py3-none-any.whl (349.2 kB 查看哈希值)

上传时间 Python 2 Python 3

由以下支持