Skip to main content

Re-usable Kafka broker wrapper for microservices

Project description

broker

Reusable Kafka broker wrapper for Python microservices. Built on aiokafka and pydantic.

Installation

pip install broker

Quick Start

Consumer

import asyncio
from pydantic import BaseModel
from kafka.consumers.base_consumer import BaseConsumer

class OrderEvent(BaseModel):
    order_id: int
    amount: float

class OrderConsumer(BaseConsumer[OrderEvent]):
    topic = "orders"
    group_id = "orders-group"
    schema_class = OrderEvent

    async def process_message(self, event: OrderEvent):
        print(f"Processing order {event.order_id}")

async def main():
    consumer = OrderConsumer("localhost:9092")
    async with consumer:
        await asyncio.sleep(60)

Producer

from pydantic import BaseModel
from kafka.producers.base_producer import BaseProducer
from kafka.client.kafka_client import KafkaClient

class OrderEvent(BaseModel):
    order_id: int
    amount: float

class OrderProducer(BaseProducer):
    topic = "orders"

async def main():
    client = KafkaClient("localhost:9092")
    await client.start()

    producer = OrderProducer(client)
    await producer.publish(OrderEvent(order_id=1, amount=99.99))

Features

Retries & Error Policy

class OrderConsumer(BaseConsumer[OrderEvent]):
    topic = "orders"
    group_id = "orders-group"
    schema_class = OrderEvent
    max_processing_retries = 3
    retry_backoff = 1.0       # 1s, 2s, 3s
    on_error = ErrorPolicy.SKIP  # or FAIL (default)

Dead Letter Queue

class OrderConsumer(BaseConsumer[OrderEvent]):
    def __init__(self):
        super().__init__(
            "localhost:9092",
            dlq_handler=self._on_dlq,
        )

    async def _on_dlq(self, value: bytes, error: Exception):
        await my_dlq_producer.send(value)

Schema Versioning

class OrderConsumer(BaseConsumer[OrderEvent]):
    schema_version = 2
    migrations = {
        1: lambda d: {**d, "currency": "USD"},
    }

Messages are serialized as {"_schema_version": N, "data": {...}}. On deserialization, migrations are applied automatically.

Reconnection

class OrderConsumer(BaseConsumer[OrderEvent]):
    max_reconnect_attempts = 10   # 0 = infinite (default)
    reconnect_backoff = 1.0       # exponential: 1s, 2s, 4s, 8s...

Deduplication

class OrderConsumer(BaseConsumer[OrderEvent]):
    def __init__(self):
        super().__init__(
            "localhost:9092",
            deduplicator=self._is_duplicate,
        )

    async def _is_duplicate(self, key: bytes | None, value: bytes | None) -> bool:
        return key in self._seen

Middleware

from kafka.middleware import ConsumerMiddleware

class MetricsMiddleware(ConsumerMiddleware[OrderEvent]):
    async def before_consume(self, event: OrderEvent) -> OrderEvent:
        self._start = time.monotonic()
        return event

    async def after_consume(self, event: OrderEvent):
        duration = time.monotonic() - self._start
        metrics.histogram("process_duration", duration)

    async def on_consume_error(self, event: OrderEvent, error: Exception, value: bytes) -> bool:
        metrics.counter("process_errors").inc()
        return False  # fall through to error policy

class OrderConsumer(BaseConsumer[OrderEvent]):
    def __init__(self):
        super().__init__(
            "localhost:9092",
            middleware=[MetricsMiddleware()],
        )

Health Check

consumer = OrderConsumer("localhost:9092")
status: HealthStatus = consumer.health()
# status.alive, status.messages_consumed

Multi-Topic Consumer

class MultiTopicConsumer(BaseConsumer[OrderEvent]):
    topic = ["orders", "order-events"]
    group_id = "orders-group"
    schema_class = OrderEvent

    async def process_message(self, event: OrderEvent):
        ...

Lifecycle

All main classes (KafkaClient, BaseConsumer, BaseProducer) implement ClientLifecycle:

# Option 1: async with
async with consumer:
    ...

# Option 2: manual
await consumer.start()
try:
    ...
finally:
    await consumer.stop()

BaseConsumer.start() is a long-running coroutine. The async with block runs it in a background task.

API

BaseConsumer[T]

Attribute Default Description
topic Topic(s) to subscribe to (str | list[str])
group_id Consumer group ID
schema_class pydantic.BaseModel subclass for deserialization
enable_auto_commit False Kafka auto-commit
max_processing_retries 3 Retries per message
retry_backoff 1.0 Seconds between retries (× attempt number)
process_timeout 30.0 Timeout per process_message (None = no timeout)
connection_timeout 30.0 Connect/stop timeout
poll_timeout 1.0 Poll wait time
commit_interval_messages 100 Commit batch size
commit_interval_seconds 5.0 Commit batch interval
on_error FAIL FAIL, SKIP
schema_version 1 Schema version for envelope
migrations {} Schema migration functions
max_reconnect_attempts 0 Max reconnects (0 = infinite)
reconnect_backoff 1.0 Base reconnect backoff

BaseProducer

Attribute Default Description
topic Topic to produce to
publish_timeout 10.0 Timeout for send_and_wait (None = no timeout)
schema_version 1 Schema version in envelope

KafkaClient

KafkaClient(bootstrap_servers: str, **producer_options)

Manages a single AIOKafkaProducer instance. Used by BaseProducer.

Exceptions

BrokerError
├── ConsumerError
├── ConnectionError
├── ConfigurationError
├── ProcessingError
└── SerializationError

Development

pip install -e ".[dev]"
pytest tests/
ruff check .
mypy kafka/

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rise_broker-0.1.0.tar.gz (11.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rise_broker-0.1.0-py3-none-any.whl (10.0 kB view details)

Uploaded Python 3

File details

Details for the file rise_broker-0.1.0.tar.gz.

File metadata

  • Download URL: rise_broker-0.1.0.tar.gz
  • Upload date:
  • Size: 11.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.0

File hashes

Hashes for rise_broker-0.1.0.tar.gz
Algorithm Hash digest
SHA256 048d5055f73e79a6e0b4b4955a4d2bbb39cb431de570b021fab9e513f6ecc46e
MD5 81fb76f237ab3c11c8ed63d0d9f4c14c
BLAKE2b-256 5280f2c8cc6d342a48ac73f4ba7106ad8f455e409336b56541bc49e2650300c0

See more details on using hashes here.

File details

Details for the file rise_broker-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: rise_broker-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 10.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.0

File hashes

Hashes for rise_broker-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 daeaef17e1d74298ca63e04f9723ee67512b77a40fede9c56a3767c055d0d786
MD5 67e20f79be4a50211ab59511baf35157
BLAKE2b-256 9b6ea53a0b868847bb4c8e8ce6f520d9ae72a6ecc035481a54b4bf2e81629d95

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page