跳转到主要内容

Python Rest Client用于与Schema Registry confluent服务器交互

项目描述

Python Rest Client Schema Registry

Python package GitHub license codecov Python Version

Python Rest Client用于与schema-registry confluent服务器交互以管理AvroJSON模式资源。

需求

python 3.8+

安装

pip install python-schema-registry-client

如果您想使用Faust功能

pip install python-schema-registry-client[faust]

请注意,这将自动添加对faust的faust-streaming分支的依赖。如果您想使用旧的faust版本,只需手动安装它,然后安装python-schema-registry-client而不启用faust扩展,功能将相同。

客户端API、序列化器、Faust集成和模式服务器描述

文档: https://marcosschroh.github.io/python-schema-registry-client.io

Avro Schema使用

from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)

schema_id = client.register("test-deployment", avro_schema)

或异步

from schema_registry.client import AsyncSchemaRegistryClient, schema

async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)

schema_id = await async_client.register("test-deployment", avro_schema)

JSON Schema使用

from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "definitions" : {
        "JsonDeployment" : {
            "type" : "object",
            "required" : ["image", "replicas", "port"],
            "properties" : {
                "image" :       {"type" : "string"},
                "replicas" :    {"type" : "integer"},
                "port" :        {"type" : "integer"}
            }
        }
    },
    "$ref" : "#/definitions/JsonDeployment"
}

json_schema = schema.JsonSchema(deployment_schema)

schema_id = client.register("test-deployment", json_schema)

或异步

from schema_registry.client import AsyncSchemaRegistryClient, schema

async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "definitions" : {
        "JsonDeployment" : {
            "type" : "object",
            "required" : ["image", "replicas", "port"],
            "properties" : {
                "image" :       {"type" : "string"},
                "replicas" :    {"type" : "integer"},
                "port" :        {"type" : "integer"}
            }
        }
    },
    "$ref" : "#/definitions/JsonDeployment"
}

json_schema = schema.JsonSchema(deployment_schema)

schema_id = await async_client.register("test-deployment", json_schema)

使用dataclasses-avroschema对avro模式进行使用

您可以使用dataclasses-avroschema直接从Python类生成avro schema,并在API中进行register schemascheck versionstest compatibility

import dataclasses

from dataclasses_avroschema import AvroModel, types

from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="http://127.0.0.1:8081")


@dataclasses.dataclass
class UserAdvance(AvroModel):
    name: str
    age: int
    pets: typing.List[str] = dataclasses.field(default_factory=lambda: ["dog", "cat"])
    accounts: typing.Dict[str, int] = dataclasses.field(default_factory=lambda: {"key": 1})
    has_car: bool = False
    favorite_colors: types.Enum = types.Enum(["BLUE", "YELLOW", "GREEN"], default="BLUE")
    country: str = "Argentina"
    address: str = None

# register the schema
schema_id = client.register(subject, UserAdvance.avro_schema())

print(schema_id)
# >>> 12

result = client.check_version(subject, UserAdvance.avro_schema())
print(result)
# >>> SchemaVersion(subject='dataclasses-avroschema-subject-2', schema_id=12, schema=1, version={"type":"record" ...')

compatibility = client.test_compatibility(subject, UserAdvance.avro_schema())
print(compatibility)

# >>> True

使用pydantic对json模式进行使用

您可以使用pydantic直接从Python类生成json schema,并在API中进行register schemas、check versions和test compatibility。

import typing

from enum import Enum

from pydantic import BaseModel

from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

class ColorEnum(str, Enum):
  BLUE = "BLUE"
  YELLOW = "YELLOW"
  GREEN = "GREEN"


class UserAdvance(BaseModel):
    name: str
    age: int
    pets: typing.List[str] = ["dog", "cat"]
    accounts: typing.Dict[str, int] = {"key": 1}
    has_car: bool = False
    favorite_colors: ColorEnum = ColorEnum.BLUE
    country: str = "Argentina"
    address: str = None

# register the schema
schema_id = client.register(subject, UserAdvance.model_json_schema(), schema_type="JSON")

print(schema_id)
# >>> 12

result = client.check_version(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(result)
# >>> SchemaVersion(subject='pydantic-jsonschema-subject', schema_id=12, schema=1, version=<schema_registry.client.schema.JsonSchema object at 0x7f40354550a0>)

compatibility = client.test_compatibility(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(compatibility)

# >>> True

序列化器

您可以使用AvroMessageSerializeravro消息进行编码/解码。

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import AvroMessageSerializer


client = SchemaRegistryClient("http://127.0.0.1:8081")
avro_message_serializer = AvroMessageSerializer(client)

avro_user_schema = schema.AvroSchema({
    "type": "record",
    "namespace": "com.example",
    "name": "AvroUsers",
    "fields": [
        {"name": "first_name", "type": "string"},
        {"name": "last_name", "type": "string"},
        {"name": "age", "type": "int"},

    ],
})

# We want to encode the user_record with avro_user_schema
user_record = {
    "first_name": "my_first_name",
    "last_name": "my_last_name",
    "age": 20,
}

# Encode the record
message_encoded = avro_message_serializer.encode_record_with_schema(
    "user", avro_user_schema, user_record)

print(message_encoded)
# >>> b'\x00\x00\x00\x00\x01\x1amy_first_name\x18my_last_name('

或使用 json 架构

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import JsonMessageSerializer


client = SchemaRegistryClient("http://127.0.0.1:8081")
json_message_serializer = JsonMessageSerializer(client)

json_schema = schema.JsonSchema({
  "definitions" : {
    "record:python.test.basic.basic" : {
      "description" : "basic schema for tests",
      "type" : "object",
      "required" : [ "number", "name" ],
      "properties" : {
        "number" : {
          "oneOf" : [ {
            "type" : "integer"
          }, {
            "type" : "null"
          } ]
        },
        "name" : {
          "oneOf" : [ {
            "type" : "string"
          } ]
        }
      }
    }
  },
  "$ref" : "#/definitions/record:python.test.basic.basic"
})

# Encode the record
basic_record = {
    "number": 10,
    "name": "a_name",
}

message_encoded = json_message_serializer.encode_record_with_schema(
    "basic", json_schema, basic_record)

print(message_encoded)
# >>> b'\x00\x00\x00\x00\x02{"number": 10, "name": "a_name"}'

当使用此库时

通常,我们会遇到这种情况

Confluent Architecture

因此,我们的生产者/消费者每次从 Kafka 主题发送/接收消息时都必须序列化/反序列化消息。在这张图中,我们可以想象一个 Faust 应用程序接收消息(使用 Avro 架构编码)并希望反序列化它们,因此我们可以请求 架构服务器 帮助我们完成。在这种情况下,MessageSerializer 是完美的。

此外,也可能存在这样一种用例,我们只想有一个应用程序来管理 Avro 架构(注册、更新兼容性、删除旧架构等),因此 SchemaRegistryClient 是完美的。

开发

Poetry 用于安装依赖项并在本地开发

  1. 安装依赖项:poetry install --all-extras
  2. 代码检查:./scripts/format
  3. 运行测试:./scripts/test

为了提交消息,我们使用 commitizen 来标准化提交规则

注意:测试是在使用 docker compose架构服务器 运行的情况下进行的,因此您需要安装 DockerDocker Compose

在终端中运行 docker-compose up。然后在不同的终端中运行测试

./scripts/test

所有额外的参数都将传递给 pytest,例如

./scripts/test ./tests/client/

使用 python shell 进行测试

要使用 python shell 进行测试,您可以使用 docker-compose 运行项目。

  1. 运行 docker-compose up。然后,架构注册服务器 将在 http://127.0.0.1:8081 上运行,然后您可以使用 SchemaRegistryClient 与之交互
  2. 使用 python 解释器(在命令行中键入 python 获取 python shell)
  3. 架构服务器 玩玩
from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

# do some operations with the client...
deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)
client.register("test-deployment", avro_schema)
# >>>> Out[5]: 1

然后,您可以使用浏览器检查架构,方法是在网址 http://127.0.0.1:8081/schemas/ids/1 中访问

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载您平台上的文件。如果您不确定要选择哪个,请了解有关 安装包 的更多信息。

源分布

python_schema_registry_client-2.6.0.tar.gz (21.7 kB 查看哈希值)

上传时间 源码

构建版本

python_schema_registry_client-2.6.0-py3-none-any.whl (23.0 kB 查看哈希值)

上传时间 Python 3