跳转到主要内容

从Python类生成Avro模式。使用avro模式序列化/反序列化Python实例

项目描述

Dataclasses Avro Schema Generator

从Python数据类生成avro schemas。从avro schemas生成代码。使用avro schemas序列化/反序列化Python实例

Tests GitHub license codecov python version

需求

python 3.8+

安装

使用 pippoetry

pip install dataclasses-avroschemapoetry add dataclasses-avroschema

额外功能

  • pydantic: pip install 'dataclasses-avroschema[pydantic]'poetry add dataclasses-avroschema --extras "pydantic"
  • faust-streaming: pip install 'dataclasses-avroschema[faust]'poetry add dataclasses-avroschema --extras "faust"
  • faker: pip install 'dataclasses-avroschema[faker]'poetry add dataclasses-avroschema --extras "faker"
  • dc-avro: pip install 'dataclasses-avroschema[cli]'poetry add dataclasses-avroschema --with cli

注意:您可以使用 pip install dataclasses-avroschema[faust,pydantic,faker,cli]poetry add dataclasses-avroschema --extras "pydantic faust faker cli" 安装所有额外依赖项

文档

https://marcosschroh.github.io/dataclasses-avroschema/

用法

生成avro模式

from dataclasses import dataclass
import enum

import typing

from dataclasses_avroschema import AvroModel


class FavoriteColor(str, enum.Enum):
    BLUE = "BLUE"
    YELLOW = "YELLOW"
    GREEN = "GREEN"


@dataclass
class User(AvroModel):
    "An User"
    name: str
    age: int
    pets: typing.List[str]
    accounts: typing.Dict[str, int]
    favorite_colors: FavoriteColor
    country: str = "Argentina"
    address: typing.Optional[str] = None

    class Meta:
        namespace = "User.v1"
        aliases = ["user-v1", "super user"]


print(User.avro_schema())

# {
#    "type": "record",
#    "name": "User",
#    "fields": [
#        {"name": "name", "type": "string"},
#        {"name": "age", "type": "long"},
#        {"name": "pets", "type": {"type": "array", "items": "string", "name": "pet"}},
#        {"name": "accounts", "type": {"type": "map", "values": "long", "name": "account"}},
#        {"name": "favorite_colors", "type": {"type": "enum", "name": "FavoriteColor", "symbols": ["BLUE", "YELLOW", "GREEN"]}},
#        {"name": "country", "type": "string", "default": "Argentina"},
#        {"name": "address", "type": ["null", "string"], "default": null}
#    ], 
#    "doc": "An User",
#    "namespace": "User.v1", 
#    "aliases": ["user-v1", "super user"]
# }

assert User.avro_schema_to_python() == {
    "type": "record",
    "name": "User",
    "doc": "An User",
    "namespace": "User.v1",
    "aliases": ["user-v1", "super user"],
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "long"},
        {"name": "pets", "type": {"type": "array", "items": "string", "name": "pet"}},
        {"name": "accounts", "type": {"type": "map", "values": "long", "name": "account"}},
        {"name": "favorite_colors", "type": {"type": "enum", "name": "FavoriteColor", "symbols": ["BLUE", "YELLOW", "GREEN"]}},
        {"name": "country", "type": "string", "default": "Argentina"},
        {"name": "address", "type": ["null", "string"], "default": None}
    ],
}

序列化到avro或avro-json以及json负载

序列化需要使用 Python 类/数据类实例

from dataclasses import dataclass

import typing

from dataclasses_avroschema import AvroModel


@dataclass
class Address(AvroModel):
    "An Address"
    street: str
    street_number: int


@dataclass
class User(AvroModel):
    "User with multiple Address"
    name: str
    age: int
    addresses: typing.List[Address]

address_data = {
    "street": "test",
    "street_number": 10,
}

# create an Address instance
address = Address(**address_data)

data_user = {
    "name": "john",
    "age": 20,
    "addresses": [address],
}

# create an User instance
user = User(**data_user)

# serialization
assert user.serialize() == b"\x08john(\x02\x08test\x14\x00"

assert user.serialize(
    serialization_type="avro-json"
) == b'{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'

# # Get the json from the instance
assert user.to_json() == '{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'

# # Get a python dict
assert user.to_dict() == {
    "name": "john", 
    "age": 20, 
    "addresses": [
        {"street": "test", "street_number": 10}
    ]
}

反序列化

反序列化可以使用数据类实例或数据类本身。可以返回字典表示或新的类实例

import typing
import dataclasses

from dataclasses_avroschema import AvroModel


@dataclasses.dataclass
class Address(AvroModel):
    "An Address"
    street: str
    street_number: int

@dataclasses.dataclass
class User(AvroModel):
    "User with multiple Address"
    name: str
    age: int
    addresses: typing.List[Address]

avro_binary = b"\x08john(\x02\x08test\x14\x00"
avro_json_binary = b'{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'

# return a new class instance!!
assert User.deserialize(avro_binary) == User(
    name='john', 
    age=20,
    addresses=[Address(street='test', street_number=10)]
)

# return a python dict
assert User.deserialize(avro_binary, create_instance=False) == {
    "name": "john",
    "age": 20,
    "addresses": [
        {"street": "test", "street_number": 10}
    ]
}

# return a new class instance!!
assert User.deserialize(avro_json_binary, serialization_type="avro-json") == User(
    name='john',
    age=20,
    addresses=[Address(street='test', street_number=10)]
)

# return a python dict
assert User.deserialize(
    avro_json_binary,
    serialization_type="avro-json",
    create_instance=False
) == {"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}

Pydantic 集成

要将 dataclasses-avroschema 功能添加到 pydantic,只需将 BaseModel 替换为 AvroBaseModel

import typing
import enum

from dataclasses_avroschema.pydantic import AvroBaseModel

from pydantic import Field, ValidationError


class FavoriteColor(str, enum.Enum):
    BLUE = "BLUE"
    YELLOW = "YELLOW"
    GREEN = "GREEN"


class UserAdvance(AvroBaseModel):
    name: str
    age: int
    pets: typing.List[str] = Field(default_factory=lambda: ["dog", "cat"])
    accounts: typing.Dict[str, int] = Field(default_factory=lambda: {"key": 1})
    has_car: bool = False
    favorite_colors: FavoriteColor = FavoriteColor.BLUE
    country: str = "Argentina"
    address: typing.Optional[str] = None

    class Meta:
        schema_doc = False


assert UserAdvance.avro_schema_to_python() == {
    "type": "record",
    "name": "UserAdvance",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "long"},
        {"name": "pets", "type": {"type": "array", "items": "string", "name": "pet"}, "default": ["dog", "cat"]},
        {"name": "accounts", "type": {"type": "map", "values": "long", "name": "account"}, "default": {"key": 1}},
        {"name": "has_car", "type": "boolean", "default": False},{"name": "favorite_colors", "type": {"type": "enum", "name": "FavoriteColor", "symbols": ["BLUE", "YELLOW", "GREEN"]}, "default": "BLUE"},
        {"name": "country", "type": "string", "default": "Argentina"}, {"name": "address", "type": ["null", "string"], "default": None}
    ]
}

print(UserAdvance.json_schema())

# {
#   "$defs": {"FavoriteColor": {"enum": ["BLUE", "YELLOW", "GREEN"], "title": "FavoriteColor", "type": "string"}},
#   "properties": {
#       "name": {"title": "Name", "type": "string"},
#       "age": {"title": "Age", "type": "integer"},
#       "pets": {"items": {"type": "string"}, "title": "Pets", "type": "array"},
#       "accounts": {"additionalProperties": {"type": "integer"}, "title": "Accounts", "type": "object"},
#       "has_car": {"default": false, "title": "Has Car", "type": "boolean"},
#       "favorite_colors": {"allOf": [{"$ref": "#/$defs/FavoriteColor"}], "default": "BLUE"},
#       "country": {"default": "Argentina", "title": "Country", "type": "string"},
#       "address": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Address"}
#   }, 
#   "required": ["name", "age"],
#   "title": "UserAdvance",
#   "type": "object"
# }"""

user = UserAdvance(name="bond", age=50)

# pydantic
assert user.dict() == {
    'name': 'bond',
    'age': 50,
    'pets': ['dog', 'cat'],
    'accounts': {'key': 1},
    'has_car': False,
    'favorite_colors': FavoriteColor.BLUE,
    'country': 'Argentina',
    'address': None
}

# pydantic
print(user.json())

assert user.json() == '{"name":"bond","age":50,"pets":["dog","cat"],"accounts":{"key":1},"has_car":false,"favorite_colors":"BLUE","country":"Argentina","address":null}'

# pydantic
try:
    user = UserAdvance(name="bond")
except ValidationError as exc:
    ...

# dataclasses-avroschema
event = user.serialize()
assert event == b'\x08bondd\x04\x06dog\x06cat\x00\x02\x06key\x02\x00\x00\x00\x12Argentina\x00'

assert UserAdvance.deserialize(data=event) == UserAdvance(
    name='bond',
    age=50, 
    pets=['dog', 'cat'],
    accounts={'key': 1},
    has_car=False, 
    favorite_colors=FavoriteColor.BLUE,
    country='Argentina', 
    address=None
)

与 Python 流驱动程序(kafka 和 redis)的示例

示例 文件夹中,您可以找到 3 个不同的 kafka 示例,其中一个使用 aiokafka (async) 显示了最简单的使用案例,即当 AvroModel 实例被序列化并通过 kafka 发送,然后消费事件。其他两个示例是使用 kafka-python 驱动的 sync,其中显示了 avro-json 序列化和 schema evolution(《FULL 兼容性)。此外,还有两个使用 redis streamsredis 示例,分别使用 walrusredisgears-py

工厂和固定值

Dataclasses Avro Schema 还包括一个 factory 功能,因此您可以生成 fast Python 实例并使用它们,例如测试您的数据流管道。可以使用 fake 方法生成实例。

注意:此功能默认未启用,并且您需要安装 faker 扩展。您可以使用 pip install 'dataclasses-avroschema[faker]' 安装它

import typing
import dataclasses

from dataclasses_avroschema import AvroModel


@dataclasses.dataclass
class Address(AvroModel):
    "An Address"
    street: str
    street_number: int


@dataclasses.dataclass
class User(AvroModel):
    "User with multiple Address"
    name: str
    age: int
    addresses: typing.List[Address]


Address.fake()
# >>>> Address(street='PxZJILDRgbXyhWrrPWxQ', street_number=2067)

User.fake()
# >>>> User(name='VGSBbOGfSGjkMDnefHIZ', age=8974, addresses=[Address(street='vNpPYgesiHUwwzGcmMiS', street_number=4790)])

功能

  • 基本类型:int、long、double、float、布尔型、字符串和 null 支持
  • 复杂类型:枚举、数组、映射、固定、联合和记录支持
  • typing.Annotated 支持
  • typing.Literal 支持
  • 逻辑类型:日期、时间(毫秒和微秒)、日期时间(毫秒和微秒)、UUID 支持
  • 模式关系(一对一、一对多)
  • 递归模式
  • faust.Record 生成 Avro 模式
  • 实例序列化与生成的 avro schema 对应
  • 数据反序列化。返回 Python 字典或类实例
  • 从 Python 类实例生成 JSON
  • 情况模式
  • avsc 文件生成模型
  • kafka 驱动程序的集成示例:aiokafkakafka-python
  • redis 驱动程序的集成示例:walrusredisgears-py
  • 工厂实例
  • Pydantic 集成

开发

需要 Poetry 来安装依赖项和本地开发

  1. 安装依赖项: poetry install --all-extras
  2. 代码代码审查: ./scripts/format
  3. 运行测试: ./scripts/test
  4. 测试文档: ./scripts/test-documentation

为了提交消息,我们使用 commitizen 以标准化提交规则的方式

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装软件包的更多信息。

源分布

dataclasses_avroschema-0.63.2.tar.gz (42.4 kB 查看哈希值)

上传时间

构建分发

dataclasses_avroschema-0.63.2-py3-none-any.whl (53.9 kB 查看哈希值)

上传时间 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面