跳转到主要内容

Microsoft Azure Event Hubs Blob存储客户端库的检查点实现

项目描述

使用存储块实现的Azure EventHubs Checkpoint Store Python客户端库

Azure EventHubs Checkpoint Store用于在处理来自Azure Event Hubs的事件时存储检查点。此Checkpoint Store软件包作为插件包与EventHubConsumerClient一起使用。它使用Azure Storage Blob作为持久存储来维护检查点和分区所有权信息。

请注意,这是一个异步库,对于Azure EventHubs Checkpoint Store客户端库的同步版本,请参阅azure-eventhub-checkpointstoreblob

源代码 | 软件包(PyPi) | API参考文档 | Azure Eventhubs文档 | Azure Storage文档

入门

先决条件

  • Python 3.6或更高版本。

  • Microsoft Azure 订阅:要使用 Azure 服务,包括 Azure Event Hubs,您需要一个订阅。如果您没有现有的 Azure 账户,您可以在 创建账户 时注册免费试用或使用您的 MSDN 订阅者福利。

  • 具有事件中心的命名空间:要与 Azure Event Hubs 交互,您还需要一个命名空间和可用的事件中心。如果您不熟悉创建 Azure 资源,您可以遵循 使用 Azure 门户创建事件中心的分步指南。在那里,您还可以找到使用 Azure CLI、Azure PowerShell 或 Azure 资源管理器 (ARM) 模板创建事件中心的详细说明。

  • Azure 存储账户:您需要有一个 Azure 存储账户并创建一个 Azure Blob 存储块容器来存储带有块的检查点数据。您可以按照 创建 Azure 块 Blob 存储账户的指南 进行操作。

安装包

$ pip install azure-eventhub-checkpointstoreblob-aio

关键概念

检查点:

检查点是一个过程,其中读取器在分区事件序列中标记或提交其位置。检查点是消费者的责任,在消费者组内按分区基础发生。这个责任意味着对于每个消费者组,每个分区读取器必须跟踪其在事件流中的当前位置,并且可以在认为数据流完成时通知服务。如果读取器从一个分区断开连接,当它重新连接时,它将从该消费者组中该分区的最后读取器之前提交的检查点开始读取。当读取器连接时,它将偏移量传递给事件中心,以指定开始读取的位置。这样,您可以使用检查点将事件标记为“完成”,并为在不同机器上运行的读取器之间的故障转移提供弹性。您可以通过指定低于此检查点过程的偏移量来返回旧数据。通过此机制,检查点既提供了故障转移弹性,也提供了事件流重放。

偏移量与序列号:

偏移量和序列号都指代事件在分区中的位置。您可以将其视为客户端游标。偏移量是事件的字节编号。偏移量/序列号使事件消费者(读取器)能够指定他们想要从其中开始读取事件的事件流中的点。您可以指定一个时间戳,以便只接收在给定时间戳之后入队的事件。消费者负责在 Event Hubs 服务之外存储自己的偏移量值。在分区中,每个事件都包括偏移量、序列号以及它在入队时的戳记。

示例

创建 EventHubConsumerClient

创建 EventHubConsumerClient 的最简单方法是使用连接字符串。

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

有关创建 EventHubConsumerClient 的其他方法,请参阅 EventHubs 库 以获取更多详细信息。

使用 BlobCheckpointStore 进行检查点以消费事件

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

使用不同版本的 Azure 存储服务 API 与 BlobCheckpointStore 配合使用

某些环境有不同的 Azure 存储服务 API 版本。《BlobCheckpointStore》默认使用存储服务 API 版本 2019-07-07。要使用它针对不同的版本,在创建 BlobCheckpointStore 对象时指定 api_version

故障排除

通用

启用日志记录将有助于进行故障排除。

日志记录

  • 启用 azure.eventhub.extensions.checkpointstoreblobaio 记录器以收集库中的跟踪信息。
  • 启用 azure.eventhub 记录器以收集主 azure-eventhub 库中的跟踪信息。
  • 启用 azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage 记录器以收集 azure 存储blob库中的跟踪信息。
  • 启用 uamqp 记录器以收集底层 uAMQP 库中的跟踪信息。
  • 通过在创建客户端时设置 logging_enable=True 来启用 AMQP 帧级跟踪。

下一步

更多示例代码

开始使用我们的EventHubs Checkpoint Store 异步示例

文档

参考文档可在此处找到。

提供反馈

如果您遇到任何错误或建议,请在此项目的问题部分提交问题。

贡献

此项目欢迎贡献和建议。大多数贡献需要您同意贡献者许可协议(CLA),声明您有权,并且确实授予我们使用您的贡献的权利。有关详细信息,请访问https://cla.microsoft.com

在提交拉取请求时,CLA-bot 将自动确定您是否需要提供 CLA 并适当地装饰 PR(例如,标签、注释)。只需遵循机器人提供的说明即可。您只需在整个使用我们的 CLA 的所有存储库中做一次。

此项目已采用Microsoft 开源行为准则。有关更多信息,请参阅行为准则常见问题解答或联系opencode@microsoft.com以获取任何额外的问题或评论。

Impressions

发布历史

1.1.4 (2021-04-07)

此版本和所有未来的版本将需要 Python 2.7 或 Python 3.6+,Python 3.5 将不再受支持。

新功能

  • BlobCheckpointStore 上更新了 list_ownershipclaim_ownershipupdate_checkpointlist_checkpoints 以支持使用 **kwargs

1.1.3 (2021-03-09)

此版本将是最后一个官方支持 Python 3.5 的版本,未来的版本将需要 Python 2.7 或 Python 3.6+。

错误修复

  • 更新了供应商依赖项 azure-storage-blob 到 v12.7.1。
    • 修复了由于请求日期头过旧导致的存储blob身份验证失败 (#16192)。

1.1.2 (2021-01-11)

错误修复

  • 修复了在启用 Data Lake 的 Blob Storage 读取父节点空元数据时,BlobCheckpointStore.list_ownershipBlobCheckpointStore.list_checkpoints 触发 KeyError 的错误。

1.1.1 (2020-09-08)

错误修复

  • 修复了如果存储帐户 "文件共享软删除" 启用时,可能会逐渐减慢从存储 blob 获取检查点数据的错误。#12836

1.1.0 (2020-03-09)

新功能

  • BlobCheckpointStoreapi_version 参数现在支持 Azure 存储服务 API 的旧版本。

1.0.0 (2020-01-13)

稳定版本。没有新功能或 API 更改。

1.0.0b6 (2019-12-04)

重大变更

  • BlobPartitionManager 重命名为 BlobCheckpointStore
  • 《BlobCheckpointStore》的构造函数已被更新,现在直接接收存储容器详情,而不是《ContainerClient》实例。
  • 为Blob存储连接字符串添加了from_connection_string构造函数。
  • 模块blobstoragepmaio现在是内部模块,所有导入应直接来自azure.eventhub.extensions.checkpointstoreblobaio
  • 《BlobCheckpointStore》现在具有一个close()函数,用于关闭HTTP连接池,此外该对象还可以用作上下文管理器来管理连接。

1.0.0b5 (2019-11-04)

新功能

  • 添加了list_checkpoints方法,该方法列出给定事件中心命名空间、事件中心名称和消费者组下的所有检查点。

1.0.0b4 (2019-10-09)

本版本仅进行了微小的内部更改。没有功能更改。

1.0.0b1 (2019-09-10)

新功能

  • 使用Azure Blob存储块Blob存储EventProcessor检查点数据的《BlobPartitionManager》

Impressions

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

azure-eventhub-checkpointstoreblob-aio-1.1.4.zip (374.8 kB 查看哈希)

上传时间: 源代码

构建分发

azure_eventhub_checkpointstoreblob_aio-1.1.4-py3-none-any.whl (349.7 kB 查看哈希)

上传时间: Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面