Microsoft Azure Azure Schema Registry 客户端库 for Python
项目描述
Azure Schema Registry 客户端库 for Python
Azure Schema Registry 是由 Azure Event Hubs 托管的模式存储库服务,提供模式存储、版本管理和维护。该注册表由编码器利用以减少负载大小,并通过模式标识符而不是完整模式来描述负载结构。此包提供
-
用于注册和检索模式和相应属性的客户端库。
-
一个基于 JSON 模式的编码器,能够编码和解码包含 Schema Registry 模式标识符的负载,这些标识符对应于用于验证的 JSON 模式和编码内容。
源代码 | 包(PyPi) | 包(Conda) | API参考文档 | 示例 | 变更日志
免责声明
Azure SDK Python包对Python 2.7的支持已于2022年1月1日结束。有关更多信息及疑问,请参阅https://github.com/Azure/azure-sdk-for-python/issues/20691
入门指南
安装包
使用 pip 安装Azure Schema Registry客户端库
pip install azure-schemaregistry
要使用与JSON Schema Encoder集成的内置 jsonschema
验证器,请安装 jsonencoder
扩展
pip install azure-schemaregistry[jsonencoder]
先决条件
使用此包,您必须具备以下条件
- Azure订阅 - 创建免费账户
- Azure Schema Registry - 在此处是使用Azure门户创建Schema Registry组的快速入门指南。
- Python 3.8或更高版本 - 安装Python
客户端认证
与Schema Registry的交互从SchemaRegistryClient类的实例开始。客户端构造函数需要一个Azure Event Hubs完全限定的命名空间和一个Azure Active Directory凭证
-
Schema Registry实例的完全限定命名空间应遵循以下格式:
<yournamespace>.servicebus.windows.net
。 -
应将实现TokenCredential协议的AAD凭证传递给构造函数。在azure-identity包中提供了可用的
TokenCredential
协议实现。要使用azure-identity
提供的凭证类型,请使用 pip 安装Azure Identity客户端库
pip install azure-identity
- 另外,要使用异步API,您必须首先安装一个异步传输,例如 aiohttp
pip install aiohttp
使用azure-identity库创建客户端
from azure.schemaregistry import SchemaRegistryClient
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net/'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
使用azure-schemaregistry库创建JsonSchemaEncoder
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = JsonSchemaEncoder(client=schema_registry_client, group_name=group_name)
关键概念
客户端概念
-
Schema:Schema是数据的组织或结构。更详细的信息可以在此处找到这里。
-
Schema Group:基于业务标准的类似Schema的逻辑组,可以包含多个版本的Schema。更详细的信息可以在此处找到这里。
-
SchemaRegistryClient:
SchemaRegistryClient
提供在Schema Registry中存储和检索Schema的API。
编码器概念
-
JsonSchemaEncoder:提供API将内容编码到二进制编码中,从二进制编码中解码内容,对内容进行JSON Schema验证,并使用
SchemaRegistryClient
在本地上缓存从注册表中检索的Schema/Schema ID。 -
OutboundMessageContent:在
azure.schemaregistry
下定义的协议,允许JsonSchemaEncoder.encode
与某些Azure Messaging SDK消息类型进行互操作性。已添加对azure.eventhub.EventData
的支持
-
InboundMessageContent:在
azure.schemaregistry
下定义的协议,允许JsonSchemaEncoder.decode
与某些Azure Messaging SDK消息类型进行互操作性。已添加对azure.eventhub.EventData
的支持
OutboundMessageContent/InboundMessageContent的支持
如果向 JsonSchemaEncoder
提供遵循 OutboundMessageContent 协议的消息类型,它将设置对应的内容和内容类型属性。如果向编码器提供遵循 InboundMessageContent 协议的消息类型对象,它将获取对应的内容和内容类型属性。这些定义为
-
content
:二进制编码、经过 JSON 模式验证的有效负载(通常,特定格式的有效负载) -
content type
:格式为application/json;serialization=Json+<schema ID>
的字符串,其中application/json;serialization=Json
是格式指示器<schema ID>
是 GUID 的十六进制表示,与来自 Schema Registry 服务的字符串格式和字节顺序相同。
如果将 EventData
作为消息类型传递,则将在 EventData
对象上设置以下属性
-
将
body
属性设置为编码后的内容值。 -
将
content_type
属性设置为内容类型值。
如果没有提供消息类型,默认情况下,编码器将创建以下字典:{"content": <encoded payload>, "content_type": 'application/json;serialization=Json+<schema ID>'}
示例
以下部分提供了一些代码片段,涵盖了 Schema Registry 和 Json Schema Encoder 的许多常见任务,包括
注册模式
使用 SchemaRegistryClient.register_schema
方法注册模式。
import os
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_AVRO_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMA_REGISTRY_GROUP']
name = "your-schema-name"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
schema_properties = schema_registry_client.register_schema(group_name, name, definition, format)
id = schema_properties.id
通过 ID 获取模式
通过模式 ID 获取模式定义及其属性。
import os
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_AVRO_FULLY_QUALIFIED_NAMESPACE']
schema_id = 'your-schema-id'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
schema = schema_registry_client.get_schema(schema_id)
definition = schema.definition
properties = schema.properties
通过版本获取模式
通过模式版本获取模式定义及其属性。
import os
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_AVRO_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ["SCHEMAREGISTRY_GROUP"]
name = "your-schema-name"
version = int("<your schema version>")
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
schema = schema_registry_client.get_schema(group_name=group_name, name=name, version=version)
definition = schema.definition
properties = schema.properties
获取模式 ID
通过模式定义及其属性获取模式 ID。
import os
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_AVRO_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMA_REGISTRY_GROUP']
name = "your-schema-name"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
schema_properties = schema_registry_client.register_schema(group_name, name, definition, format)
id = schema_properties.id
编码
使用 SchemaRegistryClient
预注册模式。使用 JsonSchemaEncoder
对内容进行编码和验证。
encode
方法会自动从 Schema Registry 服务检索模式,对内容进行验证,并本地缓存模式。
import os
import json
from azure.schemaregistry import SchemaRegistryClient, SchemaFormat
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_JSON_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
format = SchemaFormat.JSON
DRAFT2020_12_SCHEMA_IDENTIFIER = "https://json-schema.fullstack.org.cn/draft/2020-12/schema"
schema = {
"$id": "https://example.com/person.schema.json",
"$schema": "https://json-schema.fullstack.org.cn/draft/2020-12/schema",
"title": "Person",
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Person's name."
},
"favorite_color": {
"type": "string",
"description": "Favorite color."
},
"favorite_number": {
"description": "Favorite number.",
"type": "integer",
}
}
}
name = schema["title"]
definition = json.dumps(schema)
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_properties = schema_registry_client.register_schema(group_name, name, definition, format)
schema_id = schema_properties.id
# group_name only needed if passing `schema` to encode
encoder = JsonSchemaEncoder(client=schema_registry_client, validate=DRAFT2020_12_SCHEMA_IDENTIFIER, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema_id=schema_id, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema_id=schema_id)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
# OR
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
message_content = encoder.encode(dict_content, schema=definition) # group_name required in constructor when `schema` is passed
解码
使用 JsonSchemaEncoder
对内容进行解码。
decode
方法会自动从 Schema Registry 服务检索模式,对内容进行验证,并本地缓存模式。
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ["SCHEMAREGISTRY_GROUP"]
DRAFT2020_12_SCHEMA_IDENTIFIER = "https://json-schema.fullstack.org.cn/draft/2020-12/schema"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = JsonSchemaEncoder(client=schema_registry_client, validate=DRAFT2020_12_SCHEMA_IDENTIFIER)
with encoder:
# event_data is an EventData object with encoded body
decoded_content = encoder.decode(event_data)
# OR
# content_dict is a TypedDict with encoded content and JSON content type
decoded_content = encoder.decode(content_dict)
事件中心发送集成
与 Event Hubs 集成以发送设置 body
为编码内容的 EventData
对象和相应的 content_type
。
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_JSON_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_id = os.environ['PERSON_JSON_SCHEMA_ID']
DRAFT2020_12_SCHEMA_IDENTIFIER = "https://json-schema.fullstack.org.cn/draft/2020-12/schema"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
json_schema_encoder = JsonSchemaEncoder(client=schema_registry_client, validate=DRAFT2020_12_SCHEMA_IDENTIFIER)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, json_schema_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = json_schema_encoder.encode(dict_content, schema_id=schema_id, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
事件中心接收集成
与 Event Hubs 集成以接收 EventData
对象并解码编码后的 body
值。
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_JSON_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
DRAFT2020_12_SCHEMA_IDENTIFIER = "https://json-schema.fullstack.org.cn/draft/2020-12/schema"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
json_schema_encoder = JsonSchemaEncoder(client=schema_registry_client, validate=DRAFT2020_12_SCHEMA_IDENTIFIER)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = json_schema_encoder.decode(event)
with eventhub_consumer, json_schema_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
故障排除
通用
在与 Schema Registry 服务通信时遇到错误时,Schema Registry 客户端会引发定义在 Azure Core 中的异常。
与无效内容/内容类型相关的编码和解码错误将引发 azure.schemaregistry.encoder.jsonencoder.InvalidContentError
异常,其中 __cause__
可能包含底层异常。
日志记录
此库使用标准的 logging 库进行日志记录。HTTP 会话的基本信息(URL、标题等)以 INFO 级别记录。
可以使用 logging_enable
参数在客户端上启用详细的 DEBUG 级别日志记录,包括请求/响应体和未脱敏的标题。
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
DRAFT2020_12_SCHEMA_IDENTIFIER = "https://json-schema.fullstack.org.cn/draft/2020-12/schema"
credential = DefaultAzureCredential()
# This client will log detailed information about its HTTP sessions, at DEBUG level
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
encoder = JsonSchemaEncoder(client=schema_registry_client, validate=DRAFT2020_12_SCHEMA_IDENTIFIER)
同样,即使未为客户端启用,logging_enable
也可以为单个操作启用详细日志记录。
schema_registry_client.get_schema(schema_id, logging_enable=True)
下一步
更多示例代码
请查看 samples 目录,以获取如何使用此库注册和检索模式到/从 Schema Registry 的详细示例。
贡献
本项目欢迎贡献和建议。大多数贡献都需要您同意一份贡献者许可协议(CLA),声明您有权利并且实际上确实授予我们使用您贡献的权利。有关详细信息,请访问 https://cla.microsoft.com。
当您提交拉取请求时,CLA机器人会自动确定您是否需要提供CLA,并适当装饰PR(例如,标签,注释)。只需遵循机器人提供的说明。您只需在整个使用我们CLA的所有存储库中执行此操作一次。
本项目已采用微软开源行为准则。有关更多信息,请参阅行为准则FAQ或通过opencode@microsoft.com联系以提出任何额外的问题或评论。
发行历史
1.3.0 (2024-09-18)
此版本和所有未来版本都将需要Python 3.8+。Python 3.7不再受支持。
新增功能
- 在
azure.schemaregistry.encoder.jsonencoder
下添加了同步和异步的JsonSchemaEncoder
。 - 在
azure.schemaregistry.encoder.jsonencoder
下添加了InvalidContentError
,用于与JsonSchemaEncoder
一起使用。 - 在
azure.schemaregistry
下添加了MessageContent
、OutboundMessageContent
、InboundMessageContent
和SchemaContentValidate
,作为与JsonSchemaEncoder
和/或未来编码器实现一起使用的协议。 - 在
SchemaFormat
中添加了支持的格式Json
和Custom
。 - 在
ApiVersion
中添加了V2022_10
并将其设置为默认API版本。
已修复的错误
- 修复了同步/异步的
register_schema
和get_schema_properties
中的错误,这些错误不接受作为format
参数的参数不区分大小写的字符串。 - 修复了从服务中接收到的未知内容类型字符串引发客户端错误的问题,而不是作为SchemaProperties
format
属性中的字符串返回。
其他更改
- 将azure-core最低依赖项更新为1.28.0。
- 添加了对Python 3.11和3.12的支持。
- 以下功能已被暂时移除,并将在未来预览中添加回来,因为我们致力于稳定发布
- 在
ApiVersion
中添加了V2023_07_01
。- 在
SchemaFormat
中添加了PROTOBUF
。
- 在
- 在
1.3.0b3 (2023-11-09)
新增功能
- 在
ApiVersion
中添加了V2023_07_01
并将其设置为默认API版本。- 在
SchemaFormat
中添加了Protobuf
。
- 在
其他更改
- 添加了对Python 3.12的支持。
1.3.0b2 (2023-08-09)
新增功能
以下功能是实验性的,可能会被移除
- 在
azure.schemaregistry.encoder.jsonencoder
下添加了同步和异步的JsonSchemaEncoder
。 - 在
azure.schemaregistry.encoder.jsonencoder
下添加了InvalidContentError
和JsonSchemaDraftIdentifier
,用于与JsonSchemaEncoder
一起使用。 - 在
azure.schemaregistry
下添加了MessageType
、MessageContent
、SchemaContentValidate
和SchemaEncoder
,作为定义/与JsonSchemaEncoder
和/或未来编码器实现一起使用的协议。
1.3.0b1 (2023-01-12)
新增功能
- 在
ApiVersion
中添加了V2022_10
并将其设置为默认api版本。- 在
SchemaFormat
中添加了支持的格式Json
和Custom
。 - 在发布此版本时,服务目前只支持JSON方案的草稿3。
- 在
已修复的错误
- 修复了同步/异步的
register_schema
和get_schema_properties
中的错误,这些错误不接受作为format
参数的参数不区分大小写的字符串。
其他更改
- 添加了对Python 3.11的支持。
1.2.0 (2022-10-10)
此版本和所有未来版本将需要Python 3.7+,Python 3.6不再受支持。
新增功能
group_name
、name
和version
已添加为同步和异步SchemaRegistryClient
上get_schema
方法的可选参数。- 在
SchemaProperties
中添加了version
。
其他更改
- 将azure-core最低依赖项更新为1.24.0。
- 为同步和异步
SchemaRegistryClient
添加了分布式跟踪支持。
1.1.0 (2022-05-10)
此版本和所有未来版本将需要Python 3.6+,Python 2.7不再受支持。
新增功能
- 已将
group_name
和name
添加为SchemaProperties
的实例变量。
其他更改
- 已更新azure-core最低依赖版本为1.23.0。
1.0.0 (2021-11-10)
注意:这是我们为Azure Schema Registry创建用户友好且Python风格的客户端库的努力的第一个稳定版本。
新增功能
SchemaRegistryClient
是顶层客户端类,与Azure Schema Registry服务进行交互。它提供了三个方法:register_schema
:通过提供方案组名、方案名、方案定义和方案格式将方案存储在服务中。get_schema
:通过方案ID获取方案定义及其属性。get_schema_properties
:通过提供方案组名、方案名、方案定义和方案格式获取方案属性。
SchemaProperties
具有以下实例变量:id
和format
。format
的类型已从str
更改为SchemaFormat
。
Schema
具有以下属性:properties
和definition
。SchemaFormat
提供要由服务存储的方案格式。目前,唯一支持的格式是Avro
。- 已将
api_version
添加为同步和异步SchemaRegistryClient
构造函数的关键字参数。
重大变更
SchemaProperties
中的version
实例变量已被删除。Schema
中的schema_definition
实例变量已重命名为definition
。- 同步和异步
SchemaRegistryClient
上的get_schema
方法的id
参数已重命名为schema_id
。 - 同步和异步
SchemaRegistryClient
上的register_schema
和get_schema_properties
方法的schema_definition
参数已重命名为definition
。 azure.schemaregistry
中的serializer
命名空间已被删除。
1.0.0b3 (2021-10-05)
重大变更
- 同步和异步
SchemaRegistryClient
上的get_schema_id
方法已重命名为get_schema_properties
。 - 同步和异步
SchemaRegistryClient
上的get_schema
方法的schema_id
参数已重命名为id
。 - 同步和异步
SchemaRegistryClient
上的register_schema
和get_schema_properties
方法现在按以下顺序接受以下参数:group_name
,已从schema_group
重命名name
,已从schema_name
重命名schema_definition
,已从schema_content
重命名format
,已从serialization_type
重命名
SchemaRegistryClient
构造函数中的endpoint
参数已重命名为fully_qualified_namespace
。SchemaProperties
中的location
实例变量已被删除。Schema
和SchemaProperties
不再具有位置参数,因为它们将由用户构建。
其他更改
- 已更新azure-core依赖项到1.19.0。
- 已删除已注册方案的缓存支持,因此请求将发送到服务以注册方案、获取方案属性和获取方案。
1.0.0b2 (2021-08-17)
此版本以及所有未来版本将需要Python 2.7或Python 3.6+,Python 3.5不再受支持。
新增功能
- 支持已注册方案的缓存,并且仅在缓存中没有找到查找的方案/方案ID时才向服务发送请求。
1.0.0b1 (2020-09-09)
1.0.0b1是我们为Azure Schema Registry创建用户友好且Python风格的客户端库努力的第一个预览版本。
新功能
SchemaRegistryClient
是顶层客户端类,与Azure Schema Registry服务进行交互。它提供了三个方法:register_schema
:将方案存储到服务中。get_schema
:通过方案ID获取方案内容及其属性。get_schema_id
:通过方案组、方案名、序列化类型和方案内容获取方案ID及其属性。
项目详情
下载文件
下载适合您平台文件。如果您不确定选择哪个,请了解有关安装包的更多信息。