Skip to main content

Generate Avro Schemas from Python classes. Serialize/Deserialize python instances with avro schemas

Project description

Dataclasses Avro Schema

Generate avro schemas from python dataclasses, Pydantic models and Faust Records. Code generation from avro schemas. Serialize/Deserialize python instances with avro schemas

Tests GitHub license codecov python version

Requirements

python 3.10+

Installation

with pip or poetry:

pip install dataclasses-avroschema or poetry add dataclasses-avroschema

Extras

  • pydantic: pip install 'dataclasses-avroschema[pydantic]' or poetry add dataclasses-avroschema --extras "pydantic"
  • faust-streaming: pip install 'dataclasses-avroschema[faust]' or poetry add dataclasses-avroschema --extras "faust"
  • faker: pip install 'dataclasses-avroschema[faker]' or poetry add dataclasses-avroschema --extras "faker"
  • dc-avro: pip install 'dataclasses-avroschema[cli]' or poetry add dataclasses-avroschema --with cli

Note: You can install all extra dependencies with pip install dataclasses-avroschema[faust,pydantic,faker,cli] or poetry add dataclasses-avroschema --extras "pydantic faust faker cli"

Documentation

https://marcosschroh.github.io/dataclasses-avroschema/

Usage

Generating the avro schema

from dataclasses import dataclass
import enum

import typing

from dataclasses_avroschema import AvroModel


class FavoriteColor(str, enum.Enum):
    BLUE = "BLUE"
    YELLOW = "YELLOW"
    GREEN = "GREEN"


@dataclass
class User(AvroModel):
    "An User"
    name: str
    age: int
    pets: typing.List[str]
    accounts: typing.Dict[str, int]
    favorite_colors: FavoriteColor
    country: str = "Argentina"
    address: typing.Optional[str] = None

    class Meta:
        namespace = "User.v1"
        aliases = ["user-v1", "super user"]


print(User.avro_schema())

# {
#    "type": "record",
#    "name": "User",
#    "fields": [
#        {"name": "name", "type": "string"},
#        {"name": "age", "type": "long"},
#        {"name": "pets", "type": {"type": "array", "items": "string", "name": "pet"}},
#        {"name": "accounts", "type": {"type": "map", "values": "long", "name": "account"}},
#        {"name": "favorite_colors", "type": {"type": "enum", "name": "FavoriteColor", "symbols": ["BLUE", "YELLOW", "GREEN"]}},
#        {"name": "country", "type": "string", "default": "Argentina"},
#        {"name": "address", "type": ["null", "string"], "default": null}
#    ], 
#    "doc": "An User",
#    "namespace": "User.v1", 
#    "aliases": ["user-v1", "super user"]
# }

assert User.avro_schema_to_python() == {
    "type": "record",
    "name": "User",
    "doc": "An User",
    "namespace": "User.v1",
    "aliases": ["user-v1", "super user"],
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "long"},
        {"name": "pets", "type": {"type": "array", "items": "string", "name": "pet"}},
        {"name": "accounts", "type": {"type": "map", "values": "long", "name": "account"}},
        {"name": "favorite_colors", "type": {"type": "enum", "name": "FavoriteColor", "symbols": ["BLUE", "YELLOW", "GREEN"]}},
        {"name": "country", "type": "string", "default": "Argentina"},
        {"name": "address", "type": ["null", "string"], "default": None}
    ],
}

Serialization to avro or avro-json and json payload

For serialization is neccesary to use python class/dataclasses instance

from dataclasses import dataclass

import typing

from dataclasses_avroschema import AvroModel


@dataclass
class Address(AvroModel):
    "An Address"
    street: str
    street_number: int


@dataclass
class User(AvroModel):
    "User with multiple Address"
    name: str
    age: int
    addresses: typing.List[Address]

address_data = {
    "street": "test",
    "street_number": 10,
}

# create an Address instance
address = Address(**address_data)

data_user = {
    "name": "john",
    "age": 20,
    "addresses": [address],
}

# create an User instance
user = User(**data_user)

# serialization
assert user.serialize() == b"\x08john(\x02\x08test\x14\x00"

assert user.serialize(
    serialization_type="avro-json"
) == b'{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'

# # Get the json from the instance
assert user.to_json() == '{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'

# # Get a python dict
assert user.to_dict() == {
    "name": "john", 
    "age": 20, 
    "addresses": [
        {"street": "test", "street_number": 10}
    ]
}

Deserialization

Deserialization could take place with an instance dataclass or the dataclass itself. Can return the dict representation or a new class instance

import typing
import dataclasses

from dataclasses_avroschema import AvroModel


@dataclasses.dataclass
class Address(AvroModel):
    "An Address"
    street: str
    street_number: int

@dataclasses.dataclass
class User(AvroModel):
    "User with multiple Address"
    name: str
    age: int
    addresses: typing.List[Address]

avro_binary = b"\x08john(\x02\x08test\x14\x00"
avro_json_binary = b'{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'

# return a new class instance!!
assert User.deserialize(avro_binary) == User(
    name='john', 
    age=20,
    addresses=[Address(street='test', street_number=10)]
)

# return a python dict
assert User.deserialize(avro_binary, create_instance=False) == {
    "name": "john",
    "age": 20,
    "addresses": [
        {"street": "test", "street_number": 10}
    ]
}

# return a new class instance!!
assert User.deserialize(avro_json_binary, serialization_type="avro-json") == User(
    name='john',
    age=20,
    addresses=[Address(street='test', street_number=10)]
)

# return a python dict
assert User.deserialize(
    avro_json_binary,
    serialization_type="avro-json",
    create_instance=False
) == {"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}

Pydantic integration

To add dataclasses-avroschema functionality to pydantic you only need to replace BaseModel by AvroBaseModel:

import typing
import enum

from dataclasses_avroschema.pydantic import AvroBaseModel

from pydantic import Field, ValidationError


class FavoriteColor(str, enum.Enum):
    BLUE = "BLUE"
    YELLOW = "YELLOW"
    GREEN = "GREEN"


class UserAdvance(AvroBaseModel):
    name: str
    age: int
    pets: typing.List[str] = Field(default_factory=lambda: ["dog", "cat"])
    accounts: typing.Dict[str, int] = Field(default_factory=lambda: {"key": 1})
    has_car: bool = False
    favorite_colors: FavoriteColor = FavoriteColor.BLUE
    country: str = "Argentina"
    address: typing.Optional[str] = None

    class Meta:
        schema_doc = False


assert UserAdvance.avro_schema_to_python() == {
    "type": "record",
    "name": "UserAdvance",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "long"},
        {"name": "pets", "type": {"type": "array", "items": "string", "name": "pet"}, "default": ["dog", "cat"]},
        {"name": "accounts", "type": {"type": "map", "values": "long", "name": "account"}, "default": {"key": 1}},
        {"name": "has_car", "type": "boolean", "default": False},{"name": "favorite_colors", "type": {"type": "enum", "name": "FavoriteColor", "symbols": ["BLUE", "YELLOW", "GREEN"]}, "default": "BLUE"},
        {"name": "country", "type": "string", "default": "Argentina"}, {"name": "address", "type": ["null", "string"], "default": None}
    ]
}

print(UserAdvance.json_schema())

# {
#   "$defs": {"FavoriteColor": {"enum": ["BLUE", "YELLOW", "GREEN"], "title": "FavoriteColor", "type": "string"}},
#   "properties": {
#       "name": {"title": "Name", "type": "string"},
#       "age": {"title": "Age", "type": "integer"},
#       "pets": {"items": {"type": "string"}, "title": "Pets", "type": "array"},
#       "accounts": {"additionalProperties": {"type": "integer"}, "title": "Accounts", "type": "object"},
#       "has_car": {"default": false, "title": "Has Car", "type": "boolean"},
#       "favorite_colors": {"allOf": [{"$ref": "#/$defs/FavoriteColor"}], "default": "BLUE"},
#       "country": {"default": "Argentina", "title": "Country", "type": "string"},
#       "address": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Address"}
#   }, 
#   "required": ["name", "age"],
#   "title": "UserAdvance",
#   "type": "object"
# }"""

user = UserAdvance(name="bond", age=50)

# pydantic
assert user.dict() == {
    'name': 'bond',
    'age': 50,
    'pets': ['dog', 'cat'],
    'accounts': {'key': 1},
    'has_car': False,
    'favorite_colors': FavoriteColor.BLUE,
    'country': 'Argentina',
    'address': None
}

# pydantic
print(user.json())

assert user.json() == '{"name":"bond","age":50,"pets":["dog","cat"],"accounts":{"key":1},"has_car":false,"favorite_colors":"BLUE","country":"Argentina","address":null}'

# pydantic
try:
    user = UserAdvance(name="bond")
except ValidationError as exc:
    ...

# dataclasses-avroschema
event = user.serialize()
assert event == b'\x08bondd\x04\x06dog\x06cat\x00\x02\x06key\x02\x00\x00\x00\x12Argentina\x00'

assert UserAdvance.deserialize(data=event) == UserAdvance(
    name='bond',
    age=50, 
    pets=['dog', 'cat'],
    accounts={'key': 1},
    has_car=False, 
    favorite_colors=FavoriteColor.BLUE,
    country='Argentina', 
    address=None
)

Examples with python streaming drivers (kafka and redis)

Under examples folder you can find 3 differents kafka examples, one with aiokafka (async) showing the simplest use case when a AvroModel instance is serialized and sent it thorught kafka, and the event is consumed. The other two examples are sync using the kafka-python driver, where the avro-json serialization and schema evolution (FULL compatibility) is shown. Also, there are two redis examples using redis streams with walrus and redisgears-py

Factory and fixtures

Dataclasses Avro Schema also includes a factory feature, so you can generate fast python instances and use them, for example, to test your data streaming pipelines. Instances can be generated using the fake method.

Note: This feature is not enabled by default and requires you have the faker extra installed. You may install it with pip install 'dataclasses-avroschema[faker]'

import typing
import dataclasses

from dataclasses_avroschema import AvroModel


@dataclasses.dataclass
class Address(AvroModel):
    "An Address"
    street: str
    street_number: int


@dataclasses.dataclass
class User(AvroModel):
    "User with multiple Address"
    name: str
    age: int
    addresses: typing.List[Address]


Address.fake()
# >>>> Address(street='PxZJILDRgbXyhWrrPWxQ', street_number=2067)

User.fake()
# >>>> User(name='VGSBbOGfSGjkMDnefHIZ', age=8974, addresses=[Address(street='vNpPYgesiHUwwzGcmMiS', street_number=4790)])

Features

  • Primitive types: int, long, double, float, boolean, string and null support
  • Complex types: enum, array, map, fixed, unions and records support
  • typing.Annotated supported
  • typing.Literal supported
  • Logical Types: date, time (millis and micro), datetime (millis and micro), uuid support
  • Schema relations (oneToOne, oneToMany)
  • Recursive Schemas
  • Generate Avro Schemas from faust.Record
  • Instance serialization correspondent to avro schema generated
  • Data deserialization. Return python dict or class instance
  • Generate json from python class instance
  • Case Schemas
  • Generate models from avsc files
  • Examples of integration with kafka drivers: aiokafka, kafka-python
  • Example of integration with redis drivers: walrus and redisgears-py
  • Factory instances
  • Pydantic integration

Development

Poetry is needed to install the dependencies and develope locally

  1. Install dependencies: poetry install --all-extras
  2. Code linting: ./scripts/format
  3. Run tests: ./scripts/test
  4. Tests documentation: ./scripts/test-documentation

For commit messages we use commitizen in order to standardize a way of committing rules

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dataclasses_avroschema-0.66.3.tar.gz (46.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dataclasses_avroschema-0.66.3-py3-none-any.whl (59.2 kB view details)

Uploaded Python 3

File details

Details for the file dataclasses_avroschema-0.66.3.tar.gz.

File metadata

  • Download URL: dataclasses_avroschema-0.66.3.tar.gz
  • Upload date:
  • Size: 46.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.14.3 Linux/6.17.0-1008-azure

File hashes

Hashes for dataclasses_avroschema-0.66.3.tar.gz
Algorithm Hash digest
SHA256 4b6d35a4d5b2b6f56b8e2b2e74d5d5728893193051fd2955793c208db924127a
MD5 3e895752af455c1eeb40e2b3dc9c1e9d
BLAKE2b-256 2936bb5ec185bddd1554dc766c8a05ea1f9498bdf70e6a5ef1b24e8d75315111

See more details on using hashes here.

File details

Details for the file dataclasses_avroschema-0.66.3-py3-none-any.whl.

File metadata

File hashes

Hashes for dataclasses_avroschema-0.66.3-py3-none-any.whl
Algorithm Hash digest
SHA256 24bd39768e207951cea9a9f7315eb319fb65a17ed786518f3acdea49a67fc21b
MD5 e22ffbccc28bfe151a46120d8426b729
BLAKE2b-256 98b030b77cc46fc7581023d8dbd2be8e4bbe71abcca925721c3be8ee34091d9a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page