An Avro SerDe implementation that integrates with the confluent
Project description
ConfluentAvro
An Avro SerDe implementation that integrates with the confluent schema registry and serializes and deserializes data according to the defined confluent wire format.
View Demo
·
Report Bug
·
Request Feature
Getting Started
Background
To solve schema management issues and ensure compatibility in the development of Kafka-based applications, the confluent team introduced the schema registry to store and share the schema between the different apps and apply compatibility checks on each newly registered schema. To make the schema sharing easy, they extend the Avro binary format by prepending the schema id before the actual record instead of including the full schema.
-» You can find more about Confluent and Schema Registry in Confluent documentation.
Implementation
ConfluentAvro implemented according to the above specification. Before publishing to Kafka topic, the library prepends the schema id to the generated Avro binary and when consuming from Kafka, it retrieves the schema id and fetches the schema from the registry before deserializing the actual data.
The underline API will automatically register new schemas used for the data serialization and will fetch the corresponding schema when deserializing it. Newly registered schemas and fetched schemas are both cached locally to speed up the process for future records.
Instead of including the full schema in the output, only a schema id generated by the registry is included. Registering the same schema twice is idempotent, so no coordination is needed.
» The ConfluentAvro's bullet points:
- Supports the confluent wire format
- Integrates with the confluent schema registry
- Implements caching at the schema registry level
- The underline decoder/encoder is built once for the same schema and reused for all upcoming records
- Can be integrated with different Kafka clients
Built With
- fastavro (check fastavro benchmark)
- requests
Installation
» pip install confluent_avro
Usage
Check examples for a fully working demo.
Consumer App Example:
from kafka import KafkaConsumer
from confluent_avro.schema_registry import SchemaRegistry
from confluent_avro.schema_registry.auth import RegistryHTTPBasicAuth
from confluent_avro.serde import AvroKeyValueSerde
KAFKA_TOPIC = "confluent_avro-example-topic"
registry_client = SchemaRegistry(
"https://myschemaregistry.com",
RegistryHTTPBasicAuth("username", "password"),
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)
avroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)
consumer = KafkaConsumer(
KAFKA_TOPIC,
group_id="random_group_id",
bootstrap_servers=["localhost:9092",]
)
for msg in consumer:
v = avroSerde.value.deserialize(msg.value)
k = avroSerde.key.deserialize(msg.key)
print(msg.offset, msg.partition, k, v)
Producer App Example:
from kafka import KafkaProducer
from confluent_avro.schema_registry import SchemaRegistry
from confluent_avro.schema_registry.auth import RegistryHTTPBasicAuth
from confluent_avro.serde import AvroKeyValueSerde
KAFKA_TOPIC = "confluent_avro-example-topic"
registry_client = SchemaRegistry(
"https://myschemaregistry.com",
RegistryHTTPBasicAuth("username", "password"),
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)
avroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)
producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
producer.send(
KAFKA_TOPIC,
key=avroSerde.key.serialize({...}, key_schema),
value=avroSerde.value.serialize({...}, value_schema),
)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for confluent_avro-1.6.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | a1d0874ceac438272e2d4a5d403f285974057da49cd2d3210d3cc3605b7e876a |
|
MD5 | c60693a12523674c970c7bee546cdcfd |
|
BLAKE2b-256 | d48bf742615d567f26cb7a1f9c83fec43259f47610d3954f6acb4d28f1192011 |