Python Rest Client用于与Schema Registry confluent服务器交互
项目描述
Python Rest Client Schema Registry
Python Rest Client用于与schema-registry confluent服务器交互以管理Avro和JSON模式资源。
需求
python 3.8+
安装
pip install python-schema-registry-client
如果您想使用Faust
功能
pip install python-schema-registry-client[faust]
请注意,这将自动添加对faust的faust-streaming分支的依赖。如果您想使用旧的faust版本,只需手动安装它,然后安装python-schema-registry-client
而不启用faust
扩展,功能将相同。
客户端API、序列化器、Faust集成和模式服务器描述
文档: https://marcosschroh.github.io/python-schema-registry-client.io
Avro Schema使用
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
schema_id = client.register("test-deployment", avro_schema)
或异步
from schema_registry.client import AsyncSchemaRegistryClient, schema
async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
schema_id = await async_client.register("test-deployment", avro_schema)
JSON Schema使用
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"definitions" : {
"JsonDeployment" : {
"type" : "object",
"required" : ["image", "replicas", "port"],
"properties" : {
"image" : {"type" : "string"},
"replicas" : {"type" : "integer"},
"port" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/JsonDeployment"
}
json_schema = schema.JsonSchema(deployment_schema)
schema_id = client.register("test-deployment", json_schema)
或异步
from schema_registry.client import AsyncSchemaRegistryClient, schema
async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"definitions" : {
"JsonDeployment" : {
"type" : "object",
"required" : ["image", "replicas", "port"],
"properties" : {
"image" : {"type" : "string"},
"replicas" : {"type" : "integer"},
"port" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/JsonDeployment"
}
json_schema = schema.JsonSchema(deployment_schema)
schema_id = await async_client.register("test-deployment", json_schema)
使用dataclasses-avroschema对avro模式进行使用
您可以使用dataclasses-avroschema直接从Python类生成avro schema
,并在API中进行register schemas
、check versions
和test compatibility
。
import dataclasses
from dataclasses_avroschema import AvroModel, types
from schema_registry.client import SchemaRegistryClient
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
@dataclasses.dataclass
class UserAdvance(AvroModel):
name: str
age: int
pets: typing.List[str] = dataclasses.field(default_factory=lambda: ["dog", "cat"])
accounts: typing.Dict[str, int] = dataclasses.field(default_factory=lambda: {"key": 1})
has_car: bool = False
favorite_colors: types.Enum = types.Enum(["BLUE", "YELLOW", "GREEN"], default="BLUE")
country: str = "Argentina"
address: str = None
# register the schema
schema_id = client.register(subject, UserAdvance.avro_schema())
print(schema_id)
# >>> 12
result = client.check_version(subject, UserAdvance.avro_schema())
print(result)
# >>> SchemaVersion(subject='dataclasses-avroschema-subject-2', schema_id=12, schema=1, version={"type":"record" ...')
compatibility = client.test_compatibility(subject, UserAdvance.avro_schema())
print(compatibility)
# >>> True
使用pydantic对json模式进行使用
您可以使用pydantic直接从Python类生成json schema,并在API中进行register schemas、check versions和test compatibility。
import typing
from enum import Enum
from pydantic import BaseModel
from schema_registry.client import SchemaRegistryClient
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
class ColorEnum(str, Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
class UserAdvance(BaseModel):
name: str
age: int
pets: typing.List[str] = ["dog", "cat"]
accounts: typing.Dict[str, int] = {"key": 1}
has_car: bool = False
favorite_colors: ColorEnum = ColorEnum.BLUE
country: str = "Argentina"
address: str = None
# register the schema
schema_id = client.register(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(schema_id)
# >>> 12
result = client.check_version(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(result)
# >>> SchemaVersion(subject='pydantic-jsonschema-subject', schema_id=12, schema=1, version=<schema_registry.client.schema.JsonSchema object at 0x7f40354550a0>)
compatibility = client.test_compatibility(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(compatibility)
# >>> True
序列化器
您可以使用AvroMessageSerializer
对avro
消息进行编码/解码。
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import AvroMessageSerializer
client = SchemaRegistryClient("http://127.0.0.1:8081")
avro_message_serializer = AvroMessageSerializer(client)
avro_user_schema = schema.AvroSchema({
"type": "record",
"namespace": "com.example",
"name": "AvroUsers",
"fields": [
{"name": "first_name", "type": "string"},
{"name": "last_name", "type": "string"},
{"name": "age", "type": "int"},
],
})
# We want to encode the user_record with avro_user_schema
user_record = {
"first_name": "my_first_name",
"last_name": "my_last_name",
"age": 20,
}
# Encode the record
message_encoded = avro_message_serializer.encode_record_with_schema(
"user", avro_user_schema, user_record)
print(message_encoded)
# >>> b'\x00\x00\x00\x00\x01\x1amy_first_name\x18my_last_name('
或使用 json 架构
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import JsonMessageSerializer
client = SchemaRegistryClient("http://127.0.0.1:8081")
json_message_serializer = JsonMessageSerializer(client)
json_schema = schema.JsonSchema({
"definitions" : {
"record:python.test.basic.basic" : {
"description" : "basic schema for tests",
"type" : "object",
"required" : [ "number", "name" ],
"properties" : {
"number" : {
"oneOf" : [ {
"type" : "integer"
}, {
"type" : "null"
} ]
},
"name" : {
"oneOf" : [ {
"type" : "string"
} ]
}
}
}
},
"$ref" : "#/definitions/record:python.test.basic.basic"
})
# Encode the record
basic_record = {
"number": 10,
"name": "a_name",
}
message_encoded = json_message_serializer.encode_record_with_schema(
"basic", json_schema, basic_record)
print(message_encoded)
# >>> b'\x00\x00\x00\x00\x02{"number": 10, "name": "a_name"}'
当使用此库时
通常,我们会遇到这种情况
因此,我们的生产者/消费者每次从 Kafka 主题发送/接收消息时都必须序列化/反序列化消息。在这张图中,我们可以想象一个 Faust
应用程序接收消息(使用 Avro 架构编码)并希望反序列化它们,因此我们可以请求 架构服务器
帮助我们完成。在这种情况下,MessageSerializer
是完美的。
此外,也可能存在这样一种用例,我们只想有一个应用程序来管理 Avro 架构
(注册、更新兼容性、删除旧架构等),因此 SchemaRegistryClient
是完美的。
开发
Poetry 用于安装依赖项并在本地开发
- 安装依赖项:
poetry install --all-extras
- 代码检查:
./scripts/format
- 运行测试:
./scripts/test
为了提交消息,我们使用 commitizen 来标准化提交规则
注意:测试是在使用 docker compose
对 架构服务器
运行的情况下进行的,因此您需要安装 Docker
和 Docker Compose
。
在终端中运行 docker-compose up
。然后在不同的终端中运行测试
./scripts/test
所有额外的参数都将传递给 pytest,例如
./scripts/test ./tests/client/
使用 python shell 进行测试
要使用 python shell 进行测试,您可以使用 docker-compose
运行项目。
- 运行
docker-compose up
。然后,架构注册服务器
将在http://127.0.0.1:8081
上运行,然后您可以使用SchemaRegistryClient
与之交互 - 使用 python 解释器(在命令行中键入
python
获取 python shell) - 与
架构服务器
玩玩
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
# do some operations with the client...
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
client.register("test-deployment", avro_schema)
# >>>> Out[5]: 1
然后,您可以使用浏览器检查架构,方法是在网址 http://127.0.0.1:8081/schemas/ids/1
中访问
项目详情
python_schema_registry_client-2.6.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | e5495899c2bf4fd33bc6689a2068a3423dc94875677f3fd343b6e492a7877ba0 |
|
MD5 | 64cb322d1f92cbe5265d67f74c5566d8 |
|
BLAKE2b-256 | 4a78840c35e9f17b947ca5f3fee6de7ea1ac5ac7e6ff4963892e7b2d589f58f2 |
python_schema_registry_client-2.6.0-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | a0688a9cd6e2a616a79fb46a6615c531cd5f9e2a5145f5c95932f792417731cb |
|
MD5 | 1e35e27efb14cefde649cfb37096870e |
|
BLAKE2b-256 | 0e2fa472b6202071dec88154901d975b91491708ba8b3587c55c6eafc1977751 |