跳转到主要内容

Microsoft Azure Schema Registry Avro Encoder客户端库(Python版)

项目描述

Azure Schema Registry Avro Encoder客户端库

Azure Schema Registry是一个由Azure Event Hubs托管的模式仓库服务,提供模式存储、版本管理和模式。此包提供了一种Avro编码器,能够对包含Schema Registry模式标识符和Avro编码内容的有效负载进行编码和解码。

源代码 | 包(PyPi) | API参考文档 | 示例 | 变更日志

免责声明

Azure SDK Python包对Python 2.7的支持已于2022年1月1日结束。有关更多信息,请参阅https://github.com/Azure/azure-sdk-for-python/issues/20691

入门

安装包

使用 pip 安装 Azure Schema Registry Avro Encoder Python 客户端库。

pip install azure-schemaregistry-avroencoder

先决条件

要使用此包,您必须具备以下条件:

验证客户端

与 Schema Registry Avro Encoder 的交互从 AvroEncoder 类的实例开始,该类接受架构组名称和 Schema Registry 客户端 类。客户端构造函数接受 Event Hubs 完整限定名称和 Azure Active Directory 凭据

  • Schema Registry 实例的完整限定名称应遵循以下格式:`<yournamespace>.servicebus.windows.net`。

  • 应将实现 TokenCredential 协议的 AAD 凭据传递给构造函数。在 azure-identity 包 中提供了 `TokenCredential` 协议的实现。要使用 `azure-identity` 提供的凭据类型,请使用 pip 安装 Azure Identity Python 客户端库。

pip install azure-identity
  • 另外,要使用异步 API,您必须首先安装异步传输,例如 aiohttp
pip install aiohttp

使用 azure-schemaregistry 库创建 AvroEncoder

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

关键概念

AvroEncoder

提供编码到和从 Avro 二进制编码以及带有架构 ID 的内容类型的 API。使用 SchemaRegistryClient 从架构内容获取架构 ID 或反之亦然。

支持的消息模型

已添加对某些 Azure Messaging SDK 模型类的支持,以与 `AvroEncoder` 互操作。这些模型是位于 `azure.schemaregistry.encoder.avroencoder` 命名空间下的 `MessageType` 协议定义的子类型。目前,支持以下模型类:

  • `azure.eventhub.EventData` (对于 `azure-eventhub>=5.9.0`)

消息格式

如果将遵循 `MessageType` 协议的消息类型提供给编码器进行编码,它将设置相应的 `content` 和 `content_type` 属性,其中

  • `content`:Avro 负载数据 (通常是特定于格式的负载数据)

    • Avro 二进制编码
    • 不是 Avro 对象容器文件,它包含架构并违背了此编码器将架构从消息有效负载中移除到架构注册表的初衷。
  • `content type`:格式为 `avro/binary+<schema ID>` 的字符串,其中

    • `avro/binary` 是格式指示符
    • `<schema ID>` 是 GUID 的十六进制表示,格式和字节顺序与 Schema Registry 服务的字符串相同。

如果将 `EventData` 作为消息类型传入,以下属性将设置在 `EventData` 对象上

  • `body` 属性将设置为 `content` 值。
  • `content_type` 属性将设置为 `content_type` 值。

如果未提供消息类型,默认情况下,编码器将创建以下字典:`{"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

示例

以下部分提供了一些代码片段,涵盖了一些最常见 Schema Registry 任务,包括

编码

使用AvroEncoder.encode方法使用给定的Avro模式编码内容。该方法将使用先前注册到Schema Registry服务的模式,并将模式缓存起来以供未来编码使用。为了避免预先将模式注册到服务,并自动使用encode方法注册,应将关键字参数auto_register=True传递给AvroEncoder构造函数。

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

解码

使用AvroEncoder.decode方法通过以下方式解码Avro编码的内容:

  • 传递一个消息对象,该对象是MessageType协议的子类型。
  • 传递一个字典,其中包含键content(类型bytes)和content_type(类型字符串)。方法会自动从Schema Registry服务检索模式,并将模式缓存起来以供未来解码使用。
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Event Hubs 发送集成

Event Hubs集成,发送一个EventData对象,其中body设置为Avro编码的内容和相应的content_type

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Event Hubs 接收集成

Event Hubs集成,接收一个EventData对象,并解码Avro编码的body值。

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

故障排除

一般

Azure Schema Registry Avro Encoder在遇到与Schema Registry服务通信错误时会抛出在Azure Core中定义的异常。与无效内容/内容类型和无效模式相关的错误将分别作为azure.schemaregistry.encoder.avroencoder.InvalidContentErrorazure.schemaregistry.encoder.avroencoder.InvalidSchemaError抛出,其中__cause__将包含由Apache Avro库抛出的基本异常。

日志记录

此库使用标准的logging库进行日志记录。HTTP会话的基本信息(URL、头等)以INFO级别记录。

可以通过在客户端使用logging_enable参数启用详细的DEBUG级别日志记录,包括请求/响应体和未标记的头。

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

类似地,logging_enable可以启用单个操作的详细日志记录,即使对于客户端没有启用

encoder.encode(dict_content, schema=definition, logging_enable=True)

下一步

更多示例代码

samples目录中有更多示例代码,展示了常见的Azure Schema Registry Avro Encoder场景。

贡献

此项目欢迎贡献和建议。大多数贡献需要您同意贡献者许可协议(CLA),声明您有权,并且确实授予我们使用您的贡献的权利。有关详细信息,请访问https://cla.microsoft.com

提交拉取请求时,CLA-bot将自动确定您是否需要提供CLA,并适当地装饰PR(例如,标签、评论)。只需遵循bot提供的说明即可。您只需在整个使用我们的CLA的仓库中这样做一次。

此项目采用了Microsoft Open Source Code of Conduct。有关更多信息,请参阅Code of Conduct FAQ或联系opencode@microsoft.com以获取任何额外的问题或评论。

版本历史

1.0.0 (2022-05-10)

注意:这是我们为创建一个用户友好的Pythonic Avro Encoder库而努力的第一版稳定版本,该库与Azure Schema Registry的Python客户端库集成。

新增功能

  • AvroEncoder同步和异步类提供了对遵循Apache Avro规范中定义的RecordSchema格式的模式进行编码和解码内容的功能。Apache Avro库用作编码和解码的实现。编码器将自动从Azure Schema Registry服务注册和检索模式。它提供了以下方法
    • 构造函数:如果传递了auto_register=True关键字,将自动将传递给encode方法的模式进行注册。否则,默认情况下,需要预先注册传递给encode的模式。接受一个可选的group_name参数,在解码时可选,但在编码时是必需的。
    • encode:根据给定的模式将字典内容编码为字节,如果需要则注册模式。根据提供的参数,返回一个包含编码内容和对应内容类型的字典,或者一个MessageType子类型对象。
    • decode:通过从服务中自动检索模式将字节内容解码为字典内容。
  • 已引入名为MessageContent的TypedDict,具有以下必需键
    • content:字节内容。
    • content_type:字符串内容类型,其中包含模式ID和记录格式指示符。
  • 已引入名为MessageType的,具有以下方法的类
    • from_message_content:一个类方法,用于创建一个具有给定字节内容和字符串内容类型的对象。
    • __message_content__:返回一个MessageContent对象,其内容值和内容类型值分别设置为对象上的相应属性。
  • 模式和模式ID被本地缓存,这样对于相同的模式/模式ID的多个调用将不会触发多个服务调用。
  • 当添加新条目时,将在info级别记录模式/模式ID缓存的命中次数、未命中次数和总条目数。
  • 已引入InvalidContentError,用于处理与无效内容和相关内容类型相关的错误,其中__cause__将包含由Avro库引发的底层异常。
  • 已引入InvalidSchemaError,用于处理与无效模式相关的错误,其中__cause__将包含由Apache Avro库引发的底层异常。
  • AvroEncoder上的encodedecode方法支持以下消息模型
    • azure.eventhub.EventDataazure-eventhub>=5.9.0

其他更改

  • 此包旨在替换不再支持的azure-schemaregistry-avroserializer包。
  • group_name现在在同步和异步AvroEncoder构造函数中是一个可选参数。

1.0.0b3 (2022-04-05)

破坏性更改

  • 在同步和异步AvroEncoder构造函数中的auto_register_schemas关键字已重命名为auto_register
  • SchemaParseErrorSchemaEncodeErrorSchemaDecodeError已被InvalidContentErrorInvalidSchemaError替换。错误已添加到azure.schemaregistry.encoder.avroencoder命名空间下。
  • 已删除azure.schemaregistry.encoder.avroencoder中的exceptions模块。
  • 在同步和异步AvroEncoder上的encode方法只允许将MessageType协议的子类型作为message_type可选参数的值,而不是任何具有方法签名(content: bytes, content_type: str, **kwargs: Any)的可调用对象。
  • 当添加新条目时,将在info级别记录模式/模式ID缓存中的命中次数/未命中次数,以及条目数。

其他更改

  • 此版本和未来的版本将不支持使用AvroSerializer编码的数据的解码。
  • AvroEncoder上的encodedecode方法支持以下消息模型
    • azure.eventhub.EventDataazure-eventhub==5.9.0b3

1.0.0b2 (2022-03-09)

新增功能

  • 已将request_options添加到AvroEncoder上的encodedecode,作为传递给客户端请求的可选参数。
  • 当添加新条目时,将在info级别记录当前模式/模式ID缓存的大小。

破坏性更改

  • MessageMetadataDict已重命名为MessageContent
  • MessageContent中的data已重命名为content
  • 在同步和异步AvroEncoder上的encodedecode中的data参数已重命名为content
  • MessageType 协议中的 from_message_data 方法已被重命名为 from_message_content。在 from_message_content 中的 data 参数已被重命名为 content
  • MessageType 协议中的 __message_data__ 方法已被重命名为 __message_content__

其他更改

  • 此 beta 版本将向后兼容使用 AvroSerializer 编码的数据。
  • AvroEncoder上的encodedecode方法支持以下消息模型
    • azure.eventhub.EventDataazure-eventhub==5.9.0b2

1.0.0b1 (2022-02-09)

此版本及所有未来版本将需要 Python 3.6+。Python 2.7 已不再支持。

新增功能

  • 此包旨在替换 azure-schemaregistry-avroserializer。
  • API 已更新,允许直接将数据编码到消息类型对象,并从消息类型对象解码,其中数据值是 Avro 编码的有效负载。
  • 消息的内容类型将包含模式 ID 和记录格式指示符。

其他更改

  • 此 beta 版本将向后兼容使用 AvroSerializer 编码的数据。
  • AvroEncoder上的encodedecode方法支持以下消息模型
    • azure.eventhub.EventDataazure-eventhub==5.9.0b1

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

azure-schemaregistry-avroencoder-1.0.0.zip (76.5 kB 查看哈希值)

上传时间

构建分发

azure_schemaregistry_avroencoder-1.0.0-py3-none-any.whl (27.9 kB 查看哈希值)

上传时间 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面