跳转到主要内容

Microsoft Azure Azure Schema Registry 客户端库 for Python

项目描述

Azure Schema Registry 客户端库 for Python

Azure Schema Registry 是由 Azure Event Hubs 托管的模式存储库服务,提供模式存储、版本管理和维护。该注册表由编码器利用以减少负载大小,并通过模式标识符而不是完整模式来描述负载结构。此包提供

  1. 用于注册和检索模式和相应属性的客户端库。

  2. 一个基于 JSON 模式的编码器,能够编码和解码包含 Schema Registry 模式标识符的负载,这些标识符对应于用于验证的 JSON 模式和编码内容。

源代码 | 包(PyPi) | 包(Conda) | API参考文档 | 示例 | 变更日志

免责声明

Azure SDK Python包对Python 2.7的支持已于2022年1月1日结束。有关更多信息及疑问,请参阅https://github.com/Azure/azure-sdk-for-python/issues/20691

入门指南

安装包

使用 pip 安装Azure Schema Registry客户端库

pip install azure-schemaregistry

要使用与JSON Schema Encoder集成的内置 jsonschema 验证器,请安装 jsonencoder 扩展

pip install azure-schemaregistry[jsonencoder]

先决条件

使用此包,您必须具备以下条件

客户端认证

与Schema Registry的交互从SchemaRegistryClient类的实例开始。客户端构造函数需要一个Azure Event Hubs完全限定的命名空间和一个Azure Active Directory凭证

  • Schema Registry实例的完全限定命名空间应遵循以下格式:<yournamespace>.servicebus.windows.net

  • 应将实现TokenCredential协议的AAD凭证传递给构造函数。在azure-identity包中提供了可用的 TokenCredential 协议实现。要使用 azure-identity 提供的凭证类型,请使用 pip 安装Azure Identity客户端库

pip install azure-identity
  • 另外,要使用异步API,您必须首先安装一个异步传输,例如 aiohttp
pip install aiohttp

使用azure-identity库创建客户端

from azure.schemaregistry import SchemaRegistryClient
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net/'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)

使用azure-schemaregistry库创建JsonSchemaEncoder

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = JsonSchemaEncoder(client=schema_registry_client, group_name=group_name)

关键概念

客户端概念

  • Schema:Schema是数据的组织或结构。更详细的信息可以在此处找到这里

  • Schema Group:基于业务标准的类似Schema的逻辑组,可以包含多个版本的Schema。更详细的信息可以在此处找到这里

  • SchemaRegistryClient:SchemaRegistryClient提供在Schema Registry中存储和检索Schema的API。

编码器概念

  • JsonSchemaEncoder:提供API将内容编码到二进制编码中,从二进制编码中解码内容,对内容进行JSON Schema验证,并使用SchemaRegistryClient在本地上缓存从注册表中检索的Schema/Schema ID。

  • OutboundMessageContent:在azure.schemaregistry下定义的协议,允许JsonSchemaEncoder.encode与某些Azure Messaging SDK消息类型进行互操作性。已添加对

    • azure.eventhub.EventData支持
  • InboundMessageContent:在azure.schemaregistry下定义的协议,允许JsonSchemaEncoder.decode与某些Azure Messaging SDK消息类型进行互操作性。已添加对

    • azure.eventhub.EventData支持

OutboundMessageContent/InboundMessageContent的支持

如果向 JsonSchemaEncoder 提供遵循 OutboundMessageContent 协议的消息类型,它将设置对应的内容和内容类型属性。如果向编码器提供遵循 InboundMessageContent 协议的消息类型对象,它将获取对应的内容和内容类型属性。这些定义为

  • content:二进制编码、经过 JSON 模式验证的有效负载(通常,特定格式的有效负载)

  • content type:格式为 application/json;serialization=Json+<schema ID> 的字符串,其中

    • application/json;serialization=Json 是格式指示器
    • <schema ID> 是 GUID 的十六进制表示,与来自 Schema Registry 服务的字符串格式和字节顺序相同。

如果将 EventData 作为消息类型传递,则将在 EventData 对象上设置以下属性

  • body 属性设置为编码后的内容值。

  • content_type 属性设置为内容类型值。

如果没有提供消息类型,默认情况下,编码器将创建以下字典:{"content": <encoded payload>, "content_type": 'application/json;serialization=Json+<schema ID>'}

示例

以下部分提供了一些代码片段,涵盖了 Schema Registry 和 Json Schema Encoder 的许多常见任务,包括

注册模式

使用 SchemaRegistryClient.register_schema 方法注册模式。

import os

from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_AVRO_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMA_REGISTRY_GROUP']
name = "your-schema-name"
format = "Avro"
definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
    schema_properties = schema_registry_client.register_schema(group_name, name, definition, format)
    id = schema_properties.id

通过 ID 获取模式

通过模式 ID 获取模式定义及其属性。

import os

from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_AVRO_FULLY_QUALIFIED_NAMESPACE']
schema_id = 'your-schema-id'

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
    schema = schema_registry_client.get_schema(schema_id)
    definition = schema.definition
    properties = schema.properties

通过版本获取模式

通过模式版本获取模式定义及其属性。

import os

from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_AVRO_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ["SCHEMAREGISTRY_GROUP"]
name = "your-schema-name"
version = int("<your schema version>")

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
    schema = schema_registry_client.get_schema(group_name=group_name, name=name, version=version)
    definition = schema.definition
    properties = schema.properties

获取模式 ID

通过模式定义及其属性获取模式 ID。

import os

from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_AVRO_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMA_REGISTRY_GROUP']
name = "your-schema-name"
format = "Avro"
definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
    schema_properties = schema_registry_client.register_schema(group_name, name, definition, format)
    id = schema_properties.id

编码

使用 SchemaRegistryClient 预注册模式。使用 JsonSchemaEncoder 对内容进行编码和验证。

encode 方法会自动从 Schema Registry 服务检索模式,对内容进行验证,并本地缓存模式。

import os
import json
from azure.schemaregistry import SchemaRegistryClient, SchemaFormat
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_JSON_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
format = SchemaFormat.JSON
DRAFT2020_12_SCHEMA_IDENTIFIER = "https://json-schema.fullstack.org.cn/draft/2020-12/schema"

schema = {
    "$id": "https://example.com/person.schema.json",
    "$schema": "https://json-schema.fullstack.org.cn/draft/2020-12/schema",
    "title": "Person",
    "type": "object",
    "properties": {
        "name": {
            "type": "string",
            "description": "Person's name."
        },
        "favorite_color": {
            "type": "string",
            "description": "Favorite color."
        },
        "favorite_number": {
            "description": "Favorite number.",
            "type": "integer",
        }
    }
}
name = schema["title"]
definition = json.dumps(schema)

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_properties = schema_registry_client.register_schema(group_name, name, definition, format)
schema_id = schema_properties.id

# group_name only needed if passing `schema` to encode
encoder = JsonSchemaEncoder(client=schema_registry_client, validate=DRAFT2020_12_SCHEMA_IDENTIFIER, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema_id=schema_id, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema_id=schema_id)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

    # OR

    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    message_content = encoder.encode(dict_content, schema=definition)  # group_name required in constructor when `schema` is passed

解码

使用 JsonSchemaEncoder 对内容进行解码。

decode 方法会自动从 Schema Registry 服务检索模式,对内容进行验证,并本地缓存模式。

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ["SCHEMAREGISTRY_GROUP"]
DRAFT2020_12_SCHEMA_IDENTIFIER = "https://json-schema.fullstack.org.cn/draft/2020-12/schema"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = JsonSchemaEncoder(client=schema_registry_client, validate=DRAFT2020_12_SCHEMA_IDENTIFIER)

with encoder:
    # event_data is an EventData object with encoded body
    decoded_content = encoder.decode(event_data)

    # OR 

    # content_dict is a TypedDict with encoded content and JSON content type 
    decoded_content = encoder.decode(content_dict)

事件中心发送集成

Event Hubs 集成以发送设置 body 为编码内容的 EventData 对象和相应的 content_type

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_JSON_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_id = os.environ['PERSON_JSON_SCHEMA_ID']
DRAFT2020_12_SCHEMA_IDENTIFIER = "https://json-schema.fullstack.org.cn/draft/2020-12/schema"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
json_schema_encoder = JsonSchemaEncoder(client=schema_registry_client, validate=DRAFT2020_12_SCHEMA_IDENTIFIER)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, json_schema_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = json_schema_encoder.encode(dict_content, schema_id=schema_id, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

事件中心接收集成

Event Hubs 集成以接收 EventData 对象并解码编码后的 body 值。

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_JSON_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
DRAFT2020_12_SCHEMA_IDENTIFIER = "https://json-schema.fullstack.org.cn/draft/2020-12/schema"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
json_schema_encoder = JsonSchemaEncoder(client=schema_registry_client, validate=DRAFT2020_12_SCHEMA_IDENTIFIER)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = json_schema_encoder.decode(event)

with eventhub_consumer, json_schema_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

故障排除

通用

在与 Schema Registry 服务通信时遇到错误时,Schema Registry 客户端会引发定义在 Azure Core 中的异常。

与无效内容/内容类型相关的编码和解码错误将引发 azure.schemaregistry.encoder.jsonencoder.InvalidContentError 异常,其中 __cause__ 可能包含底层异常。

日志记录

此库使用标准的 logging 库进行日志记录。HTTP 会话的基本信息(URL、标题等)以 INFO 级别记录。

可以使用 logging_enable 参数在客户端上启用详细的 DEBUG 级别日志记录,包括请求/响应体和未脱敏的标题。

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.jsonencoder import JsonSchemaEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
DRAFT2020_12_SCHEMA_IDENTIFIER = "https://json-schema.fullstack.org.cn/draft/2020-12/schema"
credential = DefaultAzureCredential()
# This client will log detailed information about its HTTP sessions, at DEBUG level
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
encoder = JsonSchemaEncoder(client=schema_registry_client, validate=DRAFT2020_12_SCHEMA_IDENTIFIER)

同样,即使未为客户端启用,logging_enable 也可以为单个操作启用详细日志记录。

schema_registry_client.get_schema(schema_id, logging_enable=True)

下一步

更多示例代码

请查看 samples 目录,以获取如何使用此库注册和检索模式到/从 Schema Registry 的详细示例。

贡献

本项目欢迎贡献和建议。大多数贡献都需要您同意一份贡献者许可协议(CLA),声明您有权利并且实际上确实授予我们使用您贡献的权利。有关详细信息,请访问 https://cla.microsoft.com

当您提交拉取请求时,CLA机器人会自动确定您是否需要提供CLA,并适当装饰PR(例如,标签,注释)。只需遵循机器人提供的说明。您只需在整个使用我们CLA的所有存储库中执行此操作一次。

本项目已采用微软开源行为准则。有关更多信息,请参阅行为准则FAQ或通过opencode@microsoft.com联系以提出任何额外的问题或评论。

发行历史

1.3.0 (2024-09-18)

此版本和所有未来版本都将需要Python 3.8+。Python 3.7不再受支持。

新增功能

  • azure.schemaregistry.encoder.jsonencoder下添加了同步和异步的JsonSchemaEncoder
  • azure.schemaregistry.encoder.jsonencoder下添加了InvalidContentError,用于与JsonSchemaEncoder一起使用。
  • azure.schemaregistry下添加了MessageContentOutboundMessageContentInboundMessageContentSchemaContentValidate,作为与JsonSchemaEncoder和/或未来编码器实现一起使用的协议。
  • SchemaFormat中添加了支持的格式JsonCustom
  • ApiVersion中添加了V2022_10并将其设置为默认API版本。

已修复的错误

  • 修复了同步/异步的register_schemaget_schema_properties中的错误,这些错误不接受作为format参数的参数不区分大小写的字符串。
  • 修复了从服务中接收到的未知内容类型字符串引发客户端错误的问题,而不是作为SchemaProperties format属性中的字符串返回。

其他更改

  • 将azure-core最低依赖项更新为1.28.0。
  • 添加了对Python 3.11和3.12的支持。
  • 以下功能已被暂时移除,并将在未来预览中添加回来,因为我们致力于稳定发布
    • ApiVersion中添加了V2023_07_01
      • SchemaFormat中添加了PROTOBUF

1.3.0b3 (2023-11-09)

新增功能

  • ApiVersion中添加了V2023_07_01并将其设置为默认API版本。
    • SchemaFormat中添加了Protobuf

其他更改

  • 添加了对Python 3.12的支持。

1.3.0b2 (2023-08-09)

新增功能

以下功能是实验性的,可能会被移除

  • azure.schemaregistry.encoder.jsonencoder下添加了同步和异步的JsonSchemaEncoder
  • azure.schemaregistry.encoder.jsonencoder下添加了InvalidContentErrorJsonSchemaDraftIdentifier,用于与JsonSchemaEncoder一起使用。
  • azure.schemaregistry下添加了MessageTypeMessageContentSchemaContentValidateSchemaEncoder,作为定义/与JsonSchemaEncoder和/或未来编码器实现一起使用的协议。

1.3.0b1 (2023-01-12)

新增功能

  • ApiVersion中添加了V2022_10并将其设置为默认api版本。
    • SchemaFormat中添加了支持的格式JsonCustom
    • 在发布此版本时,服务目前只支持JSON方案的草稿3。

已修复的错误

  • 修复了同步/异步的register_schemaget_schema_properties中的错误,这些错误不接受作为format参数的参数不区分大小写的字符串。

其他更改

  • 添加了对Python 3.11的支持。

1.2.0 (2022-10-10)

此版本和所有未来版本将需要Python 3.7+,Python 3.6不再受支持。

新增功能

  • group_namenameversion已添加为同步和异步SchemaRegistryClientget_schema方法的可选参数。
  • SchemaProperties中添加了version

其他更改

  • 将azure-core最低依赖项更新为1.24.0。
  • 为同步和异步SchemaRegistryClient添加了分布式跟踪支持。

1.1.0 (2022-05-10)

此版本和所有未来版本将需要Python 3.6+,Python 2.7不再受支持。

新增功能

  • 已将group_namename添加为SchemaProperties的实例变量。

其他更改

  • 已更新azure-core最低依赖版本为1.23.0。

1.0.0 (2021-11-10)

注意:这是我们为Azure Schema Registry创建用户友好且Python风格的客户端库的努力的第一个稳定版本。

新增功能

  • SchemaRegistryClient是顶层客户端类,与Azure Schema Registry服务进行交互。它提供了三个方法:
    • register_schema:通过提供方案组名、方案名、方案定义和方案格式将方案存储在服务中。
    • get_schema:通过方案ID获取方案定义及其属性。
    • get_schema_properties:通过提供方案组名、方案名、方案定义和方案格式获取方案属性。
  • SchemaProperties具有以下实例变量:idformat
    • format的类型已从str更改为SchemaFormat
  • Schema具有以下属性:propertiesdefinition
  • SchemaFormat提供要由服务存储的方案格式。目前,唯一支持的格式是Avro
  • 已将api_version添加为同步和异步SchemaRegistryClient构造函数的关键字参数。

重大变更

  • SchemaProperties中的version实例变量已被删除。
  • Schema中的schema_definition实例变量已重命名为definition
  • 同步和异步SchemaRegistryClient上的get_schema方法的id参数已重命名为schema_id
  • 同步和异步SchemaRegistryClient上的register_schemaget_schema_properties方法的schema_definition参数已重命名为definition
  • azure.schemaregistry中的serializer命名空间已被删除。

1.0.0b3 (2021-10-05)

重大变更

  • 同步和异步SchemaRegistryClient上的get_schema_id方法已重命名为get_schema_properties
  • 同步和异步SchemaRegistryClient上的get_schema方法的schema_id参数已重命名为id
  • 同步和异步SchemaRegistryClient上的register_schemaget_schema_properties方法现在按以下顺序接受以下参数:
    • group_name,已从schema_group重命名
    • name,已从schema_name重命名
    • schema_definition,已从schema_content重命名
    • format,已从serialization_type重命名
  • SchemaRegistryClient构造函数中的endpoint参数已重命名为fully_qualified_namespace
  • SchemaProperties中的location实例变量已被删除。
  • SchemaSchemaProperties不再具有位置参数,因为它们将由用户构建。

其他更改

  • 已更新azure-core依赖项到1.19.0。
  • 已删除已注册方案的缓存支持,因此请求将发送到服务以注册方案、获取方案属性和获取方案。

1.0.0b2 (2021-08-17)

此版本以及所有未来版本将需要Python 2.7或Python 3.6+,Python 3.5不再受支持。

新增功能

  • 支持已注册方案的缓存,并且仅在缓存中没有找到查找的方案/方案ID时才向服务发送请求。

1.0.0b1 (2020-09-09)

1.0.0b1是我们为Azure Schema Registry创建用户友好且Python风格的客户端库努力的第一个预览版本。

新功能

  • SchemaRegistryClient是顶层客户端类,与Azure Schema Registry服务进行交互。它提供了三个方法:
    • register_schema:将方案存储到服务中。
    • get_schema:通过方案ID获取方案内容及其属性。
    • get_schema_id:通过方案组、方案名、序列化类型和方案内容获取方案ID及其属性。

项目详情


下载文件

下载适合您平台文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源代码发行版

azure_schemaregistry-1.3.0.tar.gz (96.8 kB 查看哈希值)

上传时间 源代码

构建发行版

azure_schemaregistry-1.3.0-py3-none-any.whl (86.1 kB 查看哈希值)

上传时间 Python 3

由以下提供支持