Framework with scalable Kafka consumer/producer logic for IDU FastAPI services.
Project description
🦦 otteroad
Kafka framework for FastAPI microservices in the IDU (Institute of Design and Urban Studies).
** The name was inspired by this text.
✨ Overview
otteroad is a Kafka framework designed for FastAPI-based microservices. It simplifies integration with Apache Kafka and supports:
- ✅ Unified consumer & producer APIs
- ✅ AVRO + Schema Registry support via Pydantic
- ✅ Pluggable settings from
.env,.yaml, or custom config - ✅ Structured event handlers with lifecycle hooks
- ✅ Flexible handler registry and extensible processing pipeline
- ✅ Designed for FastAPI services but works standalone
📦 Installation
pip install otteroad
Or via poetry:
poetry add otteroad
⚙️ Configuration
Kafka settings are defined via two classes:
KafkaConsumerSettingsKafkaProducerSettings
They can be created from any source:
from otteroad import KafkaConsumerSettings, KafkaProducerSettings
consumer_settings = KafkaConsumerSettings.from_env()
producer_settings = KafkaProducerSettings.from_yaml("config/kafka.yaml")
# define pydantic model/dataclass/dict/etc.
config = {"bootstrap.servers": "localhost: 9092"}
settings = KafkaProducerSettings.from_custom_config(config)
📡 Event Models (AVRO + Schema Registry)
Use AvroEventModel as the base for your event schemas. These are strict, typed messages validated via Pydantic.
from typing import ClassVar
from pydantic import Field
from otteroad.avro import AvroEventModel
class TerritoryCreated(AvroEventModel):
"""Model for message indicates that a territory has been created."""
topic: ClassVar[str] = "urban.events"
namespace: ClassVar[str] = "territories"
schema_version: ClassVar[int] = 1
schema_compatibility: ClassVar[str] = "BACKWARD"
territory_id: int = Field(..., description="new territory identifier")
🧠 Handlers
Handlers process typed events. Extend BaseMessageHandler and implement core logic in handle(). Optional hooks: pre_process, post_process, on_startup, on_shutdown, handle_error.
ℹ️ Note for IDU services: It is strongly recommended to use only models from the
models/directory to ensure schema consistency and maintainability across services.
from otteroad.consumer import BaseMessageHandler
from otteroad.models import TerritoryCreated # please, use only models from the models/ directory
class TerritoryCreatedHandler(BaseMessageHandler[TerritoryCreated]):
async def handle(self, event, ctx):
print(f"Territory created: {event.territory_id}")
async def on_startup(self): ...
async def on_shutdown(self): ...
🔄 Consumer
KafkaConsumerService manages lifecycle and worker threads; KafkaConsumerWorker pulls messages, resolves handlers and runs processing logic.
from otteroad import KafkaConsumerService
service = KafkaConsumerService(consumer_settings)
service.register_handler(TerritoryCreatedHandler())
service.add_worker(topics=["urban.events"]).start()
Under the hood, the pipeline is:
receive message -> validate -> pre_process -> handle -> post_process
If an error occurs, custom error handling or DQL logic can be added.
🚀 Producer
Use KafkaProducerClient to send strongly typed Avro events:
from otteroad import KafkaProducerClient
from otteroad.models import TerritoryCreated
async def send_event():
async with KafkaProducerClient(producer_settings) as producer:
event = TerritoryCreated(territory_id=1)
await producer.send(event)
🧩 FastAPI Integration
For a simple integration example with FastAPI, see:
📄 Example
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file otteroad-0.1.4.tar.gz.
File metadata
- Download URL: otteroad-0.1.4.tar.gz
- Upload date:
- Size: 33.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.11.9 Windows/10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f77e64b1d40fb839c5270996c6638aa7b3acd83119896bc4a04c0d377981b2f
|
|
| MD5 |
67ecba48afebace648022800f58a35d3
|
|
| BLAKE2b-256 |
5a04b35fd3905061c492cbab0f1cb713c8293f790aa9cd456215555283462a11
|
File details
Details for the file otteroad-0.1.4-py3-none-any.whl.
File metadata
- Download URL: otteroad-0.1.4-py3-none-any.whl
- Upload date:
- Size: 52.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.11.9 Windows/10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0e65f02c4ab1d5fd9baf20d53f97811af6b7578a0ab9b25a24783a6a18033375
|
|
| MD5 |
bfa6e3c8791f9946d82eb457a94c9edb
|
|
| BLAKE2b-256 |
0a3606afae17b45a9c1eae37c234184cfad0c3d963e48300508e514213e05bfd
|