Skip to main content

Avro record class and specific record reader generator

Project description

AVRO-GEN

Build Status codecov

Avro record class and specific record reader generator.

Current Avro implementation in Python is completely typelss and operates on dicts. While in many cases this is convenient and pythonic, not being able to discover the schema by looking at the code, not enforcing schema during record constructions, and not having any context help from the IDE could hamper developer performance and introduce bugs.

This project aims to rectify this situation by providing a generator for constructing concrete record classes and constructing a reader which wraps Avro DatumReader and returns concrete classes instead of dicts. In order not to violate Avro internals, this functionality is built strictly on top of the DatumReader and all the specific record classes dict wrappers which define accessor properties with proper type hints for each field in the schema. For this exact reason the generator does not provide an overloaded DictWriter; each specific record appears just to be a regular dictionary.

This is a fork of https://github.com/rbystrit/avro_gen. It adds better Python 3 support, including types, better namespace handling, support for documentation generation, and JSON (de-)serialization.

pip install avro-gen3
Usage:
schema_json = "....."
output_directory = "....."
from avrogen import write_schema_files

write_schema_files(schema_json, output_directory)

The generator will create output directory if it does not exist and put generated files there. The generated files will be:

OUTPUT_DIR

  • __init__.py
  • schema_classes.py
  • submodules*

In order to deal with Avro namespaces, since python doesn't support circular imports, the generator will emit all records into schema_classes.py as nested classes. The top level class there will be SchemaClasses, whose children will be classes representing namespaces. Each namespace class will in turn contain classes for records belonging to that namespace.

Consider following schema:

 {"type": "record", "name": "tweet", "namespace": "com.twitter.avro", "fields": [{"name": "ID", "type": "long" }

Then schema_classes.py would contain:

class SchemaClasses(object):
    class com(object):
        class twitter(object):
            class acro(object):
                class tweetClass(DictWrapper):
                    def __init__(self, inner_dict=None):
                        ....
                    @property
                    def ID(self):
                        """
                        :rtype: long
                        """
                        return self._inner_dict.get('ID', None)

                    @ID.setter
                    def ID(self, value):
                        #"""
                        #:param long value:
                        #"""
                        self._inner_dict['ID'] = value                        

In order to map specific record types and namespaces to modules, so that proper importing can be supported, there generator will create a sub-module under the output directory for each namespace which will export names of all types contained in that namespace. Types declared with empty namespace will be exported from the root module.

So for the example above, output directory will look as follows:

OUTPUT_DIR

  • __init__.py
  • schema_classes.py
  • com
  • twitter
    • avro
      • __init__.py

The contents of OUTPUT_DIR/com/twitter/avro/__init__.py will be:

from ....schema_classes import SchemaClasses
tweet = SchemaClasses.com.twitter.avro.tweet

So in your code you will be able to say:

from OUTPUT_DIR.com.twitter.avro import tweet
from OUTPUT_DIR import SpecificDatumReader as TweetReader, SCHEMA as your_schema
from avro import datafile, io
my_tweet = tweet()

my_tweet.ID = 1
with open('somefile', 'w+b') as f:
    writer = datafile.DataFileWriter(f,io.DatumWriter(), your_schema)
    writer.append(my_tweet)
    writer.close()

with open('somefile', 'rb') as f:
    reader = datafile.DataFileReader(f,TweetReader(readers_schema=your_schema))
    my_tweet1 = next(reader)
    reader.close()

Avro protocol support

Avro protocol support is implemented the same way as schema support. To generate classes for a protocol:

protocol_json = "....."
output_directory = "....."
from avrogen import write_protocol_files

write_protocol_files(protocol_json, output_directory)

The structure of the generated code will be exactly same as for schema, but in addition to regular types, *Request types will be generated in the root namespace of the protocol for each each message defined.

Logical types support

Avrogen implements logical types on top of standard avro package and supports generation of classes thus typed. To enable logical types support, pass use_logical_types=True to schema and protocol generators. If custom logical types are implemented and such types map to types other than simple types or datetime.* or decimal.* then pass custom_imports parameter to generator functions so that your types are imported. Types implemented out of the box are:

  • decimal (using string representation only)
  • date
  • time-millis
  • time-micros
  • timestamp-millis
  • timestamp-micros

To register your custom logical type, inherit from avrogen.logical.LogicalTypeProcessor, implement abstract methods, and add an instance to avrogen.logical.DEFAULT_LOGICAL_TYPES dictionary under the name of your logical type. A sample implementation looks as follows:

class DateLogicalTypeProcessor(LogicalTypeProcessor):
    _matching_types = {'int', 'long', 'float', 'double'}

    def can_convert(self, writers_schema):
        return isinstance(writers_schema, schema.PrimitiveSchema) and writers_schema.type == 'int'

    def validate(self, expected_schema, datum):
        return isinstance(datum, datetime.date)

    def convert(self, writers_schema, value):
        if not isinstance(value, datetime.date):
            raise Exception("Wrong type for date conversion")
        return (value - EPOCH_DATE).total_seconds() // SECONDS_IN_DAY

    def convert_back(self, writers_schema, readers_schema, value):
        return EPOCH_DATE + datetime.timedelta(days=int(value))

    def does_match(self, writers_schema, readers_schema):
        if isinstance(writers_schema, schema.PrimitiveSchema):
            if writers_schema.type in DateLogicalTypeProcessor._matching_types:
                return True
        return False

    def typename(self):
        return 'datetime.date'

    def initializer(self, value=None):
        return ((
                    'logical.DateLogicalTypeProcessor().convert_back(None, None, %s)' % value) if value is not None
                else 'datetime.datetime.today().date()')

To read/write data with logical type support, use generated SpecificDatumReader and a LogicalDatumWriter from avro.logical.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

avro-gen3-0.3.5.tar.gz (19.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

avro_gen3-0.3.5-py3-none-any.whl (26.1 kB view details)

Uploaded Python 3

File details

Details for the file avro-gen3-0.3.5.tar.gz.

File metadata

  • Download URL: avro-gen3-0.3.5.tar.gz
  • Upload date:
  • Size: 19.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/3.7.2 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.8.6

File hashes

Hashes for avro-gen3-0.3.5.tar.gz
Algorithm Hash digest
SHA256 2d0cc44c66bdff6c3df38203b77d0549174473339cbe5c29da01e4c6a981759c
MD5 3bb98b0d1b4887f61d9d9b66f82d8a5b
BLAKE2b-256 73f4309d4e8194c805a6ac7d65843472810f7b395b9423340e3e13a7b0859d6d

See more details on using hashes here.

File details

Details for the file avro_gen3-0.3.5-py3-none-any.whl.

File metadata

  • Download URL: avro_gen3-0.3.5-py3-none-any.whl
  • Upload date:
  • Size: 26.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/3.7.2 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.8.6

File hashes

Hashes for avro_gen3-0.3.5-py3-none-any.whl
Algorithm Hash digest
SHA256 f56b965d9b9072d26347d72c1d7dde931c4858b1f8abfa38a2a499a559fd4c52
MD5 ea27db9f4f44729e1cb67aea66b54dee
BLAKE2b-256 05026f5e606a645f9a41a2f1929a00a4fd35a8a32f4f67ca95bab51060dd6653

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page