Skip to main content

Opinionated Kafka Python client on top of Confluent python library

Project description

kafkian

code-checks code-checks PyPI

kafkian is a opinionated a high-level consumer and producer on top of confluent-kafka-python/librdkafka and partially inspired by confluent_kafka_helpers. It is intended for use primarily in CQRS/EventSourced systems when usage is mostly limited to producing and consuming encoded messages.

kafkian partially mimics Kafka JAVA API, partially is more pythonic, partially just like the maintainer likes it.

Instead of configuring all the things via properties, most of the things are planned to be configured explicitely and, wneh possible, via dependency injection for easier testing. The configuration dictionaries for both producer and consumer are passed-through directly to underlying confluent producer and consumer, hidden behind a facade.

The library provides a base serializer and deserializer classes, as well as their specialized Avro subclasses, AvroSerializer and AvroDeserializer. This allows having, say, a plain string key and and avro-encoded message, or vice versa. Quite often an avro-encoded string is used as a key, for this purpose we provide AvroStringKeySerializer.

Unlike the Confluent library, we support supplying the specific Avro schema together with the message, just like the Kafka JAVA API. Schemas could be automatically registered with schema registry, also we provide three SubjectNameStrategy, again compatible with Kafka JAVA API.

Usage

Producing messages

1. Initialize the producer

from kafkian import Producer
from kafkian.serde.serialization import AvroSerializer, AvroStringKeySerializer, SubjectNameStrategy

producer = Producer(
    {
        'bootstrap.servers': config.KAFKA_BOOTSTRAP_SERVERS,
    },
    key_serializer=AvroStringKeySerializer(schema_registry_url=config.SCHEMA_REGISTRY_URL),
    value_serializer=AvroSerializer(schema_registry_url=config.SCHEMA_REGISTRY_URL,
                                    subject_name_strategy=SubjectNameStrategy.RecordNameStrategy)
)

2. Define your message schema(s)

from confluent_kafka import avro
from kafkian.serde.avroserdebase import AvroRecord


value_schema_str = """
{
   "namespace": "auth.users",
   "name": "UserCreated",
   "type": "record",
   "fields" : [
     {
       "name" : "uuid",
       "type" : "string"
     },
     {
       "name" : "name",
       "type" : "string"
     },
     {
        "name": "timestamp",
        "type": {
            "type": "long",
            "logicalType": "timestamp-millis"
        }
     }
   ]
}
"""


class UserCreated(AvroRecord):
    _schema = avro.loads(value_schema_str)

3. Produce the message

producer.produce(
    "auth.users.events",
    user.uuid,
    UserCreated({
        "uuid": user.uuid,
        "name": user.name,
        "timestamp": int(user.timestamp.timestamp() * 1000)
    }),
    sync=True
)

Consuming messages

1. Initialize the consumer

CONSUMER_CONFIG = {
    'bootstrap.servers': config.KAFKA_BOOTSTRAP_SERVERS,
    'default.topic.config': {
        'auto.offset.reset': 'latest',
    },
    'group.id': 'notifications'
}

consumer = Consumer(
    CONSUMER_CONFIG,
    topics=["auth.users.events"],
    key_deserializer=AvroDeserializer(schema_registry_url=config.SCHEMA_REGISTRY_URL),
    value_deserializer=AvroDeserializer(schema_registry_url=config.SCHEMA_REGISTRY_URL),
)

2. Consume the messages via the generator

for message in consumer:
    handle_message(message)
    consumer.commit()

Here, message is an instance of Message class, that wraps the original message exposed by the confluent-kafka-python, and you can access the decoded key and value via .key and .value properties respectively.

Notice that deserialization will happen on first access of the properties, so you can properly handle deserialization errors (log it, send to DLQ, etc)

Both key and value are wrapped in a dynamically-generated class, that has the full name same as the corresponding Avro schema full name. In the example above, the value would have class named auth.users.UserCreated.

Avro schemas for the consumed message key and value are accessible via .schema property.

In addition, topic, partition, offset, timestamp, headers properties are available.

Contributing

This library is, as stated, quite opinionated, however, I'm open to suggestions. Write your questions and suggestions as issues here on github!

Running tests

Both unit and system tests are provided.

To run unit-tests, install the requirements and just run

py.test tests/unit/

To run system tests, a Kafka cluster together with a schema registry is required. A Docker compose file is provided, just run

docker-compose up

and once the cluster is up and running, run system tests via

py.test tests/system/

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafkian-0.15.0.tar.gz (19.6 kB view details)

Uploaded Source

Built Distribution

kafkian-0.15.0-py3-none-any.whl (26.6 kB view details)

Uploaded Python 3

File details

Details for the file kafkian-0.15.0.tar.gz.

File metadata

  • Download URL: kafkian-0.15.0.tar.gz
  • Upload date:
  • Size: 19.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for kafkian-0.15.0.tar.gz
Algorithm Hash digest
SHA256 6dbcb5e308337db8c31e49db7cc0c0dd32e3034f87e42423d62b53243b85ede1
MD5 9f129208b5faf0bc52be51ef960ec3d5
BLAKE2b-256 b149864e72eee9e6315530ad24fdaad9c9198e0c963680f04b6da828132224a0

See more details on using hashes here.

File details

Details for the file kafkian-0.15.0-py3-none-any.whl.

File metadata

  • Download URL: kafkian-0.15.0-py3-none-any.whl
  • Upload date:
  • Size: 26.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for kafkian-0.15.0-py3-none-any.whl
Algorithm Hash digest
SHA256 59009a877987f5084ac8484be92bb5bef98d5c1dd2ffbbbf252c6add19164e59
MD5 770dd120d087f9dc05fff1f65c3bc3ee
BLAKE2b-256 1bc08a537fca68d1d8c8dc8daed781a298fa340a1c1b3d0ee54c7c7d16cd7ed6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page