从Python类生成Avro模式。使用avro模式序列化/反序列化Python实例
项目描述
Dataclasses Avro Schema Generator
从Python数据类生成avro schemas。从avro schemas生成代码。使用avro schemas序列化/反序列化Python实例
需求
python 3.8+
安装
使用 pip
或 poetry
pip install dataclasses-avroschema
或 poetry add dataclasses-avroschema
额外功能
- pydantic:
pip install 'dataclasses-avroschema[pydantic]'
或poetry add dataclasses-avroschema --extras "pydantic"
- faust-streaming:
pip install 'dataclasses-avroschema[faust]'
或poetry add dataclasses-avroschema --extras "faust"
- faker:
pip install 'dataclasses-avroschema[faker]'
或poetry add dataclasses-avroschema --extras "faker"
- dc-avro:
pip install 'dataclasses-avroschema[cli]'
或poetry add dataclasses-avroschema --with cli
注意:您可以使用 pip install dataclasses-avroschema[faust,pydantic,faker,cli]
或 poetry add dataclasses-avroschema --extras "pydantic faust faker cli"
安装所有额外依赖项
文档
https://marcosschroh.github.io/dataclasses-avroschema/
用法
生成avro模式
from dataclasses import dataclass
import enum
import typing
from dataclasses_avroschema import AvroModel
class FavoriteColor(str, enum.Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
@dataclass
class User(AvroModel):
"An User"
name: str
age: int
pets: typing.List[str]
accounts: typing.Dict[str, int]
favorite_colors: FavoriteColor
country: str = "Argentina"
address: typing.Optional[str] = None
class Meta:
namespace = "User.v1"
aliases = ["user-v1", "super user"]
print(User.avro_schema())
# {
# "type": "record",
# "name": "User",
# "fields": [
# {"name": "name", "type": "string"},
# {"name": "age", "type": "long"},
# {"name": "pets", "type": {"type": "array", "items": "string", "name": "pet"}},
# {"name": "accounts", "type": {"type": "map", "values": "long", "name": "account"}},
# {"name": "favorite_colors", "type": {"type": "enum", "name": "FavoriteColor", "symbols": ["BLUE", "YELLOW", "GREEN"]}},
# {"name": "country", "type": "string", "default": "Argentina"},
# {"name": "address", "type": ["null", "string"], "default": null}
# ],
# "doc": "An User",
# "namespace": "User.v1",
# "aliases": ["user-v1", "super user"]
# }
assert User.avro_schema_to_python() == {
"type": "record",
"name": "User",
"doc": "An User",
"namespace": "User.v1",
"aliases": ["user-v1", "super user"],
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "long"},
{"name": "pets", "type": {"type": "array", "items": "string", "name": "pet"}},
{"name": "accounts", "type": {"type": "map", "values": "long", "name": "account"}},
{"name": "favorite_colors", "type": {"type": "enum", "name": "FavoriteColor", "symbols": ["BLUE", "YELLOW", "GREEN"]}},
{"name": "country", "type": "string", "default": "Argentina"},
{"name": "address", "type": ["null", "string"], "default": None}
],
}
序列化到avro或avro-json以及json负载
序列化需要使用 Python 类/数据类实例
from dataclasses import dataclass
import typing
from dataclasses_avroschema import AvroModel
@dataclass
class Address(AvroModel):
"An Address"
street: str
street_number: int
@dataclass
class User(AvroModel):
"User with multiple Address"
name: str
age: int
addresses: typing.List[Address]
address_data = {
"street": "test",
"street_number": 10,
}
# create an Address instance
address = Address(**address_data)
data_user = {
"name": "john",
"age": 20,
"addresses": [address],
}
# create an User instance
user = User(**data_user)
# serialization
assert user.serialize() == b"\x08john(\x02\x08test\x14\x00"
assert user.serialize(
serialization_type="avro-json"
) == b'{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'
# # Get the json from the instance
assert user.to_json() == '{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'
# # Get a python dict
assert user.to_dict() == {
"name": "john",
"age": 20,
"addresses": [
{"street": "test", "street_number": 10}
]
}
反序列化
反序列化可以使用数据类实例或数据类本身。可以返回字典表示或新的类实例
import typing
import dataclasses
from dataclasses_avroschema import AvroModel
@dataclasses.dataclass
class Address(AvroModel):
"An Address"
street: str
street_number: int
@dataclasses.dataclass
class User(AvroModel):
"User with multiple Address"
name: str
age: int
addresses: typing.List[Address]
avro_binary = b"\x08john(\x02\x08test\x14\x00"
avro_json_binary = b'{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'
# return a new class instance!!
assert User.deserialize(avro_binary) == User(
name='john',
age=20,
addresses=[Address(street='test', street_number=10)]
)
# return a python dict
assert User.deserialize(avro_binary, create_instance=False) == {
"name": "john",
"age": 20,
"addresses": [
{"street": "test", "street_number": 10}
]
}
# return a new class instance!!
assert User.deserialize(avro_json_binary, serialization_type="avro-json") == User(
name='john',
age=20,
addresses=[Address(street='test', street_number=10)]
)
# return a python dict
assert User.deserialize(
avro_json_binary,
serialization_type="avro-json",
create_instance=False
) == {"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}
Pydantic 集成
要将 dataclasses-avroschema
功能添加到 pydantic
,只需将 BaseModel
替换为 AvroBaseModel
import typing
import enum
from dataclasses_avroschema.pydantic import AvroBaseModel
from pydantic import Field, ValidationError
class FavoriteColor(str, enum.Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
class UserAdvance(AvroBaseModel):
name: str
age: int
pets: typing.List[str] = Field(default_factory=lambda: ["dog", "cat"])
accounts: typing.Dict[str, int] = Field(default_factory=lambda: {"key": 1})
has_car: bool = False
favorite_colors: FavoriteColor = FavoriteColor.BLUE
country: str = "Argentina"
address: typing.Optional[str] = None
class Meta:
schema_doc = False
assert UserAdvance.avro_schema_to_python() == {
"type": "record",
"name": "UserAdvance",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "long"},
{"name": "pets", "type": {"type": "array", "items": "string", "name": "pet"}, "default": ["dog", "cat"]},
{"name": "accounts", "type": {"type": "map", "values": "long", "name": "account"}, "default": {"key": 1}},
{"name": "has_car", "type": "boolean", "default": False},{"name": "favorite_colors", "type": {"type": "enum", "name": "FavoriteColor", "symbols": ["BLUE", "YELLOW", "GREEN"]}, "default": "BLUE"},
{"name": "country", "type": "string", "default": "Argentina"}, {"name": "address", "type": ["null", "string"], "default": None}
]
}
print(UserAdvance.json_schema())
# {
# "$defs": {"FavoriteColor": {"enum": ["BLUE", "YELLOW", "GREEN"], "title": "FavoriteColor", "type": "string"}},
# "properties": {
# "name": {"title": "Name", "type": "string"},
# "age": {"title": "Age", "type": "integer"},
# "pets": {"items": {"type": "string"}, "title": "Pets", "type": "array"},
# "accounts": {"additionalProperties": {"type": "integer"}, "title": "Accounts", "type": "object"},
# "has_car": {"default": false, "title": "Has Car", "type": "boolean"},
# "favorite_colors": {"allOf": [{"$ref": "#/$defs/FavoriteColor"}], "default": "BLUE"},
# "country": {"default": "Argentina", "title": "Country", "type": "string"},
# "address": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Address"}
# },
# "required": ["name", "age"],
# "title": "UserAdvance",
# "type": "object"
# }"""
user = UserAdvance(name="bond", age=50)
# pydantic
assert user.dict() == {
'name': 'bond',
'age': 50,
'pets': ['dog', 'cat'],
'accounts': {'key': 1},
'has_car': False,
'favorite_colors': FavoriteColor.BLUE,
'country': 'Argentina',
'address': None
}
# pydantic
print(user.json())
assert user.json() == '{"name":"bond","age":50,"pets":["dog","cat"],"accounts":{"key":1},"has_car":false,"favorite_colors":"BLUE","country":"Argentina","address":null}'
# pydantic
try:
user = UserAdvance(name="bond")
except ValidationError as exc:
...
# dataclasses-avroschema
event = user.serialize()
assert event == b'\x08bondd\x04\x06dog\x06cat\x00\x02\x06key\x02\x00\x00\x00\x12Argentina\x00'
assert UserAdvance.deserialize(data=event) == UserAdvance(
name='bond',
age=50,
pets=['dog', 'cat'],
accounts={'key': 1},
has_car=False,
favorite_colors=FavoriteColor.BLUE,
country='Argentina',
address=None
)
与 Python 流驱动程序(kafka 和 redis)的示例
在 示例 文件夹中,您可以找到 3 个不同的 kafka 示例,其中一个使用 aiokafka (async
) 显示了最简单的使用案例,即当 AvroModel
实例被序列化并通过 kafka 发送,然后消费事件。其他两个示例是使用 kafka-python 驱动的 sync
,其中显示了 avro-json
序列化和 schema evolution
(《FULL 兼容性)。此外,还有两个使用 redis streams
的 redis
示例,分别使用 walrus 和 redisgears-py
工厂和固定值
Dataclasses Avro Schema 还包括一个 factory
功能,因此您可以生成 fast
Python 实例并使用它们,例如测试您的数据流管道。可以使用 fake
方法生成实例。
注意:此功能默认未启用,并且您需要安装 faker
扩展。您可以使用 pip install 'dataclasses-avroschema[faker]'
安装它
import typing
import dataclasses
from dataclasses_avroschema import AvroModel
@dataclasses.dataclass
class Address(AvroModel):
"An Address"
street: str
street_number: int
@dataclasses.dataclass
class User(AvroModel):
"User with multiple Address"
name: str
age: int
addresses: typing.List[Address]
Address.fake()
# >>>> Address(street='PxZJILDRgbXyhWrrPWxQ', street_number=2067)
User.fake()
# >>>> User(name='VGSBbOGfSGjkMDnefHIZ', age=8974, addresses=[Address(street='vNpPYgesiHUwwzGcmMiS', street_number=4790)])
功能
- 基本类型:int、long、double、float、布尔型、字符串和 null 支持
- 复杂类型:枚举、数组、映射、固定、联合和记录支持
-
typing.Annotated
支持 -
typing.Literal
支持 - 逻辑类型:日期、时间(毫秒和微秒)、日期时间(毫秒和微秒)、UUID 支持
- 模式关系(一对一、一对多)
- 递归模式
- 从
faust.Record
生成 Avro 模式 - 实例序列化与生成的
avro schema
对应 - 数据反序列化。返回 Python 字典或类实例
- 从 Python 类实例生成 JSON
- 情况模式
- 从
avsc
文件生成模型 - 与
kafka
驱动程序的集成示例:aiokafka、kafka-python - 与
redis
驱动程序的集成示例:walrus 和 redisgears-py - 工厂实例
- Pydantic 集成
开发
需要 Poetry 来安装依赖项和本地开发
- 安装依赖项:
poetry install --all-extras
- 代码代码审查:
./scripts/format
- 运行测试:
./scripts/test
- 测试文档:
./scripts/test-documentation
为了提交消息,我们使用 commitizen 以标准化提交规则的方式
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装软件包的更多信息。
源分布
构建分发
dataclasses_avroschema-0.63.2.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 6e5f25c9db874a071874f05d6d46d2afd5c7417630fde2f8153cea25667e1296 |
|
MD5 | ce1730ee3e49f2eb95627103dedde3d9 |
|
BLAKE2b-256 | 320a622350fe51224301a3457a4e84ef48077426da2fc59b46c4cd9010a76512 |
dataclasses_avroschema-0.63.2-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 03ffa7fe2d9ac6d7424766b4fae1cd64d7596224d87be2b83dd44d1ab6f9ac5c |
|
MD5 | e612f70b47b108e41c70383f1e08db63 |
|
BLAKE2b-256 | 2f7b035ce333b192c4baa499fb9f5494b756ae08abb3353ae8862e6508c80ae3 |