Skip to main content

Framework with scalable Kafka consumer/producer logic for IDU FastAPI services.

Project description

🦦 otteroad

code style: black PyPI version CI codecov License: Apache 2.0

logo

Kafka framework for FastAPI microservices in the IDU (Institute of Design and Urban Studies).

** The name was inspired by this text.


✨ Overview

otteroad is a Kafka framework designed for FastAPI-based microservices. It simplifies integration with Apache Kafka and supports:

  • ✅ Unified consumer & producer APIs
  • ✅ AVRO + Schema Registry support via Pydantic
  • ✅ Pluggable settings from .env, .yaml, or custom config
  • ✅ Structured event handlers with lifecycle hooks
  • ✅ Flexible handler registry and extensible processing pipeline
  • ✅ Designed for FastAPI services but works standalone

📦 Installation

pip install otteroad

Or via poetry:

poetry add otteroad

⚙️ Configuration

Kafka settings are defined via two classes:

  • KafkaConsumerSettings
  • KafkaProducerSettings

They can be created from any source:

from otteroad import KafkaConsumerSettings, KafkaProducerSettings

consumer_settings = KafkaConsumerSettings.from_env()
producer_settings = KafkaProducerSettings.from_yaml("config/kafka.yaml")

# define pydantic model/dataclass/dict/etc.
config = {"bootstrap.servers": "localhost: 9092"}
settings = KafkaProducerSettings.from_custom_config(config)

📡 Event Models (AVRO + Schema Registry)

Use AvroEventModel as the base for your event schemas. These are strict, typed messages validated via Pydantic.

from typing import ClassVar
from pydantic import Field
from otteroad.avro import AvroEventModel


class TerritoryCreated(AvroEventModel):
    """Model for message indicates that a territory has been created."""

    topic: ClassVar[str] = "urban.events"
    namespace: ClassVar[str] = "territories"
    schema_version: ClassVar[int] = 1
    schema_compatibility: ClassVar[str] = "BACKWARD"

    territory_id: int = Field(..., description="new territory identifier")

🧠 Handlers

Handlers process typed events. Extend BaseMessageHandler and implement core logic in handle(). Optional hooks: pre_process, post_process, on_startup, on_shutdown, handle_error.

ℹ️ Note for IDU services: It is strongly recommended to use only models from the models/ directory to ensure schema consistency and maintainability across services.

from otteroad.consumer import BaseMessageHandler
from otteroad.models import TerritoryCreated  # please, use only models from the models/ directory

class TerritoryCreatedHandler(BaseMessageHandler[TerritoryCreated]):
    async def handle(self, event, ctx):
        print(f"Territory created: {event.territory_id}")
        
    async def on_startup(self): ...
    
    async def on_shutdown(self): ...

🔄 Consumer

KafkaConsumerService manages lifecycle and worker threads; KafkaConsumerWorker pulls messages, resolves handlers and runs processing logic.

from otteroad import KafkaConsumerService

service = KafkaConsumerService(consumer_settings)
service.register_handler(TerritoryCreatedHandler())
service.add_worker(topics=["urban.events"]).start()

Under the hood, the pipeline is:

receive message -> validate -> pre_process -> handle -> post_process

If an error occurs, custom error handling or DQL logic can be added.


🚀 Producer

Use KafkaProducerClient to send strongly typed Avro events:

from otteroad import KafkaProducerClient
from otteroad.models import TerritoryCreated

async def send_event():
    async with KafkaProducerClient(producer_settings) as producer:
        event = TerritoryCreated(territory_id=1)
        await producer.send(event)

🧩 FastAPI Integration

For a simple integration example with FastAPI, see:

📄 Example

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

otteroad-0.1.3.post1.tar.gz (33.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

otteroad-0.1.3.post1-py3-none-any.whl (52.8 kB view details)

Uploaded Python 3

File details

Details for the file otteroad-0.1.3.post1.tar.gz.

File metadata

  • Download URL: otteroad-0.1.3.post1.tar.gz
  • Upload date:
  • Size: 33.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.11.9 Windows/10

File hashes

Hashes for otteroad-0.1.3.post1.tar.gz
Algorithm Hash digest
SHA256 8823a407d15737b055d897ea02ff6e98254237262aae35339eb79449d0ea1e40
MD5 8f3796e726ffc6716b1c9e2872d754d6
BLAKE2b-256 6346ad946291174b4609b2951b2ab949ad7c9109a8d3fc1ab8dbb6455c879a79

See more details on using hashes here.

File details

Details for the file otteroad-0.1.3.post1-py3-none-any.whl.

File metadata

  • Download URL: otteroad-0.1.3.post1-py3-none-any.whl
  • Upload date:
  • Size: 52.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.11.9 Windows/10

File hashes

Hashes for otteroad-0.1.3.post1-py3-none-any.whl
Algorithm Hash digest
SHA256 173c39af165d7200f5d81bc6fcb54460451d4bb8ed06fd2852900ba647c5a408
MD5 2f7939035022a9d267a60751b5d15037
BLAKE2b-256 4ffecfa5949d9079aa581d66713ed190a9e100f5aca5f4d70c579765f7cddb11

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page