Skip to main content

An Avro SerDe implementation that integrates with the confluent

Project description

PyPI - Python Version Build Status Maintainability codecov PyPI version PyPI - License


ConfluentAvro

An Avro SerDe implementation that integrates with the confluent schema registry and serializes and deserializes data according to the defined confluent wire format.

View Demo · Report Bug · Request Feature

Getting Started

Background

To solve schema management issues and ensure compatibility in the development of Kafka-based applications, the confluent team introduced the schema registry to store and share the schema between the different apps and apply compatibility checks on each newly registered schema. To make the schema sharing easy, they extend the Avro binary format by prepending the schema id before the actual record instead of including the full schema.

-» You can find more about Confluent and Schema Registry in Confluent documentation.

Implementation

ConfluentAvro implemented according to the above specification. Before publishing to Kafka topic, the library prepends the schema id to the generated Avro binary and when consuming from Kafka, it retrieves the schema id and fetches the schema from the registry before deserializing the actual data.

The underline API will automatically register new schemas used for the data serialization and will fetch the corresponding schema when deserializing it. Newly registered schemas and fetched schemas are both cached locally to speed up the process for future records.

» The ConfluentAvro's bullet points:

  • Supports the confluent wire format
  • Integrates with the confluent schema registry
  • Retries with exponential backoff if connection to registry failed
  • Implements caching at the schema registry level
  • The underline decoder/encoder is built once for the same schema and reused for all upcoming records
  • Can be integrated with different Kafka clients

Built With

Installation

» pip install confluent-avro

Usage

Check examples for a fully working demo.

Consumer App Example:
from kafka import KafkaConsumer

from confluent_avro import AvroKeyValueSerde, SchemaRegistry
from confluent_avro.schema_registry import HTTPBasicAuth

KAFKA_TOPIC = "confluent_avro-example-topic"

registry_client = SchemaRegistry(
    "https://myschemaregistry.com",
    HTTPBasicAuth("username", "password"),
    headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)
avroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)

consumer = KafkaConsumer(
    KAFKA_TOPIC,
    group_id="random_group_id",
    bootstrap_servers=["localhost:9092",]
)

for msg in consumer:
    v = avroSerde.value.deserialize(msg.value)
    k = avroSerde.key.deserialize(msg.key)
    print(msg.offset, msg.partition, k, v)
Producer App Example:
from kafka import KafkaProducer

from confluent_avro import AvroKeyValueSerde, SchemaRegistry
from confluent_avro.schema_registry import HTTPBasicAuth

KAFKA_TOPIC = "confluent_avro-example-topic"

registry_client = SchemaRegistry(
    "https://myschemaregistry.com",
    HTTPBasicAuth("username", "password"),
    headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)

avroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)

producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
producer.send(
    KAFKA_TOPIC,
    key=avroSerde.key.serialize({...}, key_schema),
    value=avroSerde.value.serialize({...}, value_schema),
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

confluent_avro-1.8.0.tar.gz (18.8 kB view details)

Uploaded Source

Built Distribution

confluent_avro-1.8.0-py3-none-any.whl (14.3 kB view details)

Uploaded Python 3

File details

Details for the file confluent_avro-1.8.0.tar.gz.

File metadata

  • Download URL: confluent_avro-1.8.0.tar.gz
  • Upload date:
  • Size: 18.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-requests/2.23.0

File hashes

Hashes for confluent_avro-1.8.0.tar.gz
Algorithm Hash digest
SHA256 2928da95009cebc9532c8e7af0c4b16add314e1e5542daeb297932bd2338ef86
MD5 81b17f0b7b01599708ed00214599fcbd
BLAKE2b-256 f9d4c125703c80b1afeeec8310e872cd21b4f743ee9bd642f349c843db479a89

See more details on using hashes here.

File details

Details for the file confluent_avro-1.8.0-py3-none-any.whl.

File metadata

File hashes

Hashes for confluent_avro-1.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 37d7eafb975219a655c41ee2b95cb2fab030ff11d5af6773a6b9166e84591bfd
MD5 4f836ef862783dd2c1a493a390a4898e
BLAKE2b-256 b9d2d6fb2595c6997f1e4c289de457f8f12582a60789e366c3b3c43d7ae97ee4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page