Microsoft Azure Schema Registry Avro Encoder客户端库(Python版)
项目描述
Azure Schema Registry Avro Encoder客户端库
Azure Schema Registry是一个由Azure Event Hubs托管的模式仓库服务,提供模式存储、版本管理和模式。此包提供了一种Avro编码器,能够对包含Schema Registry模式标识符和Avro编码内容的有效负载进行编码和解码。
源代码 | 包(PyPi) | API参考文档 | 示例 | 变更日志
免责声明
Azure SDK Python包对Python 2.7的支持已于2022年1月1日结束。有关更多信息,请参阅https://github.com/Azure/azure-sdk-for-python/issues/20691
入门
安装包
使用 pip 安装 Azure Schema Registry Avro Encoder Python 客户端库。
pip install azure-schemaregistry-avroencoder
先决条件
要使用此包,您必须具备以下条件:
- Azure 订阅 - 创建免费账户
- Azure Schema Registry - 这里 是使用 Azure 门户创建 Schema Registry 组的快速入门指南。
- Python 3.6 或更高版本 - 安装 Python
验证客户端
与 Schema Registry Avro Encoder 的交互从 AvroEncoder 类的实例开始,该类接受架构组名称和 Schema Registry 客户端 类。客户端构造函数接受 Event Hubs 完整限定名称和 Azure Active Directory 凭据
-
Schema Registry 实例的完整限定名称应遵循以下格式:`<yournamespace>.servicebus.windows.net`。
-
应将实现 TokenCredential 协议的 AAD 凭据传递给构造函数。在 azure-identity 包 中提供了 `TokenCredential` 协议的实现。要使用 `azure-identity` 提供的凭据类型,请使用 pip 安装 Azure Identity Python 客户端库。
pip install azure-identity
- 另外,要使用异步 API,您必须首先安装异步传输,例如 aiohttp
pip install aiohttp
使用 azure-schemaregistry 库创建 AvroEncoder
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
关键概念
AvroEncoder
提供编码到和从 Avro 二进制编码以及带有架构 ID 的内容类型的 API。使用 SchemaRegistryClient 从架构内容获取架构 ID 或反之亦然。
支持的消息模型
已添加对某些 Azure Messaging SDK 模型类的支持,以与 `AvroEncoder` 互操作。这些模型是位于 `azure.schemaregistry.encoder.avroencoder` 命名空间下的 `MessageType` 协议定义的子类型。目前,支持以下模型类:
- `azure.eventhub.EventData` (对于 `azure-eventhub>=5.9.0`)
消息格式
如果将遵循 `MessageType` 协议的消息类型提供给编码器进行编码,它将设置相应的 `content` 和 `content_type` 属性,其中
-
`content`:Avro 负载数据 (通常是特定于格式的负载数据)
- Avro 二进制编码
- 不是 Avro 对象容器文件,它包含架构并违背了此编码器将架构从消息有效负载中移除到架构注册表的初衷。
-
`content type`:格式为 `
avro/binary+<schema ID>
` 的字符串,其中- `avro/binary` 是格式指示符
- `<schema ID>` 是 GUID 的十六进制表示,格式和字节顺序与 Schema Registry 服务的字符串相同。
如果将 `EventData` 作为消息类型传入,以下属性将设置在 `EventData` 对象上
- `body` 属性将设置为 `content` 值。
- `content_type` 属性将设置为 `content_type` 值。
如果未提供消息类型,默认情况下,编码器将创建以下字典:`{"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
示例
以下部分提供了一些代码片段,涵盖了一些最常见 Schema Registry 任务,包括
编码
使用AvroEncoder.encode
方法使用给定的Avro模式编码内容。该方法将使用先前注册到Schema Registry服务的模式,并将模式缓存起来以供未来编码使用。为了避免预先将模式注册到服务,并自动使用encode
方法注册,应将关键字参数auto_register=True
传递给AvroEncoder
构造函数。
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
解码
使用AvroEncoder.decode
方法通过以下方式解码Avro编码的内容:
- 传递一个消息对象,该对象是MessageType协议的子类型。
- 传递一个字典,其中包含键
content
(类型bytes)和content_type
(类型字符串)。方法会自动从Schema Registry服务检索模式,并将模式缓存起来以供未来解码使用。
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Event Hubs 发送集成
与Event Hubs集成,发送一个EventData
对象,其中body
设置为Avro编码的内容和相应的content_type
。
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Event Hubs 接收集成
与Event Hubs集成,接收一个EventData
对象,并解码Avro编码的body
值。
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
故障排除
一般
Azure Schema Registry Avro Encoder在遇到与Schema Registry服务通信错误时会抛出在Azure Core中定义的异常。与无效内容/内容类型和无效模式相关的错误将分别作为azure.schemaregistry.encoder.avroencoder.InvalidContentError
和azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
抛出,其中__cause__
将包含由Apache Avro库抛出的基本异常。
日志记录
此库使用标准的logging库进行日志记录。HTTP会话的基本信息(URL、头等)以INFO级别记录。
可以通过在客户端使用logging_enable
参数启用详细的DEBUG级别日志记录,包括请求/响应体和未标记的头。
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
类似地,logging_enable
可以启用单个操作的详细日志记录,即使对于客户端没有启用
encoder.encode(dict_content, schema=definition, logging_enable=True)
下一步
更多示例代码
在samples目录中有更多示例代码,展示了常见的Azure Schema Registry Avro Encoder场景。
贡献
此项目欢迎贡献和建议。大多数贡献需要您同意贡献者许可协议(CLA),声明您有权,并且确实授予我们使用您的贡献的权利。有关详细信息,请访问https://cla.microsoft.com。
提交拉取请求时,CLA-bot将自动确定您是否需要提供CLA,并适当地装饰PR(例如,标签、评论)。只需遵循bot提供的说明即可。您只需在整个使用我们的CLA的仓库中这样做一次。
此项目采用了Microsoft Open Source Code of Conduct。有关更多信息,请参阅Code of Conduct FAQ或联系opencode@microsoft.com以获取任何额外的问题或评论。
版本历史
1.0.0 (2022-05-10)
注意:这是我们为创建一个用户友好的Pythonic Avro Encoder库而努力的第一版稳定版本,该库与Azure Schema Registry的Python客户端库集成。
新增功能
AvroEncoder
同步和异步类提供了对遵循Apache Avro规范中定义的RecordSchema格式的模式进行编码和解码内容的功能。Apache Avro库用作编码和解码的实现。编码器将自动从Azure Schema Registry服务注册和检索模式。它提供了以下方法- 构造函数:如果传递了
auto_register=True
关键字,将自动将传递给encode
方法的模式进行注册。否则,默认情况下,需要预先注册传递给encode
的模式。接受一个可选的group_name
参数,在解码时可选,但在编码时是必需的。 encode
:根据给定的模式将字典内容编码为字节,如果需要则注册模式。根据提供的参数,返回一个包含编码内容和对应内容类型的字典,或者一个MessageType
子类型对象。decode
:通过从服务中自动检索模式将字节内容解码为字典内容。
- 构造函数:如果传递了
- 已引入名为
MessageContent
的TypedDict,具有以下必需键content
:字节内容。content_type
:字符串内容类型,其中包含模式ID和记录格式指示符。
- 已引入名为
MessageType
的,具有以下方法的类from_message_content
:一个类方法,用于创建一个具有给定字节内容和字符串内容类型的对象。__message_content__
:返回一个MessageContent
对象,其内容值和内容类型值分别设置为对象上的相应属性。
- 模式和模式ID被本地缓存,这样对于相同的模式/模式ID的多个调用将不会触发多个服务调用。
- 当添加新条目时,将在info级别记录模式/模式ID缓存的命中次数、未命中次数和总条目数。
- 已引入
InvalidContentError
,用于处理与无效内容和相关内容类型相关的错误,其中__cause__
将包含由Avro库引发的底层异常。 - 已引入
InvalidSchemaError
,用于处理与无效模式相关的错误,其中__cause__
将包含由Apache Avro库引发的底层异常。 AvroEncoder
上的encode
和decode
方法支持以下消息模型azure.eventhub.EventData
在azure-eventhub>=5.9.0
其他更改
- 此包旨在替换不再支持的azure-schemaregistry-avroserializer包。
group_name
现在在同步和异步AvroEncoder
构造函数中是一个可选参数。
1.0.0b3 (2022-04-05)
破坏性更改
- 在同步和异步
AvroEncoder
构造函数中的auto_register_schemas
关键字已重命名为auto_register
。 SchemaParseError
、SchemaEncodeError
和SchemaDecodeError
已被InvalidContentError
和InvalidSchemaError
替换。错误已添加到azure.schemaregistry.encoder.avroencoder
命名空间下。- 已删除
azure.schemaregistry.encoder.avroencoder
中的exceptions
模块。 - 在同步和异步
AvroEncoder
上的encode
方法只允许将MessageType
协议的子类型作为message_type
可选参数的值,而不是任何具有方法签名(content: bytes, content_type: str, **kwargs: Any)
的可调用对象。 - 当添加新条目时,将在info级别记录模式/模式ID缓存中的命中次数/未命中次数,以及条目数。
其他更改
- 此版本和未来的版本将不支持使用AvroSerializer编码的数据的解码。
AvroEncoder
上的encode
和decode
方法支持以下消息模型azure.eventhub.EventData
在azure-eventhub==5.9.0b3
1.0.0b2 (2022-03-09)
新增功能
- 已将
request_options
添加到AvroEncoder
上的encode
和decode
,作为传递给客户端请求的可选参数。 - 当添加新条目时,将在info级别记录当前模式/模式ID缓存的大小。
破坏性更改
MessageMetadataDict
已重命名为MessageContent
。MessageContent
中的data
已重命名为content
。- 在同步和异步
AvroEncoder
上的encode
和decode
中的data
参数已重命名为content
。 MessageType
协议中的from_message_data
方法已被重命名为from_message_content
。在from_message_content
中的data
参数已被重命名为content
。MessageType
协议中的__message_data__
方法已被重命名为__message_content__
。
其他更改
- 此 beta 版本将向后兼容使用 AvroSerializer 编码的数据。
AvroEncoder
上的encode
和decode
方法支持以下消息模型azure.eventhub.EventData
在azure-eventhub==5.9.0b2
1.0.0b1 (2022-02-09)
此版本及所有未来版本将需要 Python 3.6+。Python 2.7 已不再支持。
新增功能
- 此包旨在替换 azure-schemaregistry-avroserializer。
- API 已更新,允许直接将数据编码到消息类型对象,并从消息类型对象解码,其中数据值是 Avro 编码的有效负载。
- 消息的内容类型将包含模式 ID 和记录格式指示符。
其他更改
- 此 beta 版本将向后兼容使用 AvroSerializer 编码的数据。
AvroEncoder
上的encode
和decode
方法支持以下消息模型azure.eventhub.EventData
在azure-eventhub==5.9.0b1
项目详情
azure-schemaregistry-avroencoder-1.0.0.zip 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | f928850ffd6106473c4523654ae13897142414a9765aae9302e42a7f018d0ce3 |
|
MD5 | e3a3ca80eea8b65ac011f452bdeb51d0 |
|
BLAKE2b-256 | 41c8ce8de6845535463e7e3c900a5bf5f3a66aad9660542deeee4a801165571e |
azure_schemaregistry_avroencoder-1.0.0-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | e8bbf930001240294882b7030afd90b44f0af6f027e9eebca4f475c616d57c5b |
|
MD5 | 09ce1f1334da253d5c3470140dc77463 |
|
BLAKE2b-256 | fdf30b656259789dbb0ef8ee74f71ec208f9ef99af9de1a03e26676cf7b30288 |