A lightweight ODM for Apache Kafka with Avro schema support
Project description
FlowODM
A lightweight ODM for Apache Kafka® with Avro schema support.
FlowODM provides a Pydantic v2-based interface for working with Kafka messages and Avro schemas. It supports both synchronous and asynchronous operations, making it ideal for building microservices.
Features
- Pydantic v2 Models: Define Kafka messages as Pydantic models
- Avro Schema Support: Auto-generate schemas or validate against existing ones
- Schema Registry Integration: Full support for Confluent Schema Registry
- Dual Sync/Async API: Both sync and async methods for all operations
- Consumer Loop Patterns: Ready-to-use patterns for building microservices
- Predefined Settings: Optimized configurations for different use cases
- CLI Tools: Validate schemas, upload to registry, check compatibility
Installation
pip install flowodm
Quick Start
Define a Model
from datetime import datetime
from flowodm import FlowBaseModel, connect
# Connect to Kafka
connect(
bootstrap_servers="localhost:9092",
schema_registry_url="http://localhost:8081"
)
# Define your model
class UserEvent(FlowBaseModel):
class Settings:
topic = "user-events"
consumer_group = "my-service"
key_field = "user_id"
user_id: str
action: str
timestamp: datetime
Produce Messages
# Sync
event = UserEvent(user_id="123", action="login", timestamp=datetime.now())
event.produce()
# Async
await event.aproduce()
# Batch
events = [UserEvent(...) for _ in range(100)]
UserEvent.produce_many(events)
Consume Messages
# Single message
event = UserEvent.consume_one(timeout=5.0)
# Iterator
for event in UserEvent.consume_iter():
print(f"User {event.user_id} performed {event.action}")
# Async iterator
async for event in UserEvent.aconsume_iter():
await process(event)
Consumer Loop for Microservices
from flowodm import ConsumerLoop, LongRunningSettings
def process_event(event: UserEvent) -> None:
# Your processing logic
print(f"Processing {event.user_id}")
loop = ConsumerLoop(
model=UserEvent,
handler=process_event,
settings=LongRunningSettings(), # Tolerates long processing
commit_strategy="before_processing", # Prevent duplicates in parallel pods
)
loop.run() # Blocking, handles SIGTERM gracefully
Async Consumer Loop
from flowodm import AsyncConsumerLoop
async def process_event(event: UserEvent) -> None:
await external_api.submit(event)
loop = AsyncConsumerLoop(
model=UserEvent,
handler=process_event,
max_concurrent=20, # Process up to 20 messages concurrently
)
await loop.run()
Schema Registry Integration
Validate Models Against Registry
from flowodm import validate_against_registry
result = validate_against_registry(UserEvent, "user-events-value")
if not result.is_valid:
print(f"Errors: {result.errors}")
Generate Models from Schema
from flowodm import generate_model_from_registry
# Generate Pydantic model from registered schema
UserEvent = generate_model_from_registry(
subject="user-events-value",
topic="user-events",
)
CLI Tools
# Validate models against Schema Registry
flowodm validate --models myapp.events --registry
# Upload schema to registry
flowodm upload-schema --avro schemas/user_event.avsc --subject user-events-value
# Check compatibility
flowodm check-compatibility --model myapp.events.UserEvent --level BACKWARD
# List all subjects
flowodm list-subjects
Predefined Settings
FlowODM provides optimized settings for different use cases:
| Profile | Use Case |
|---|---|
LongRunningSettings |
ML inference, complex processing (10 min timeout) |
BatchSettings |
ETL jobs, data aggregation (large batches) |
RealTimeSettings |
Event-driven, notifications (low latency) |
HighThroughputSettings |
High-volume ingestion (max throughput) |
ReliableSettings |
Financial transactions (exactly-once) |
from flowodm import LongRunningSettings, ConsumerLoop
loop = ConsumerLoop(
model=MyModel,
handler=my_handler,
settings=LongRunningSettings(),
)
Configuration
Configure via environment variables:
# Kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_USERNAME=your-api-key
KAFKA_SASL_PASSWORD=your-api-secret
# Schema Registry
SCHEMA_REGISTRY_URL=https://your-registry.confluent.cloud
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=your-sr-api-key:your-sr-api-secret # Or use separate key/secret:
# SCHEMA_REGISTRY_API_KEY=your-sr-api-key
# SCHEMA_REGISTRY_API_SECRET=your-sr-api-secret
Or programmatically:
from flowodm import connect
connect(
bootstrap_servers="localhost:9092",
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_username="api-key",
sasl_password="api-secret",
schema_registry_url="https://registry.confluent.cloud",
# Option 1: Combined format
schema_registry_basic_auth_user_info="sr-key:sr-secret",
# Option 2: Separate key/secret
# schema_registry_api_key="sr-key",
# schema_registry_api_secret="sr-secret",
)
CI/CD Schema Validation
Add schema validation to your CI pipeline:
# .github/workflows/schema-validation.yml
- name: Validate schemas
run: |
pip install flowodm
flowodm validate --models myapp.events --registry --strict
See the documentation for more CI/CD examples.
License
Apache License 2.0
Trademark Notice
Apache Kafka is a registered trademark of the Apache Software Foundation.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowodm-0.3.1.tar.gz.
File metadata
- Download URL: flowodm-0.3.1.tar.gz
- Upload date:
- Size: 50.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ec4b8092ba2a32751ee00dba3766d40c54020bbf16559af19c4c29ac169e5c21
|
|
| MD5 |
f2d3d1c7b9a095b79f5ce472a22b6ff6
|
|
| BLAKE2b-256 |
6bea026bdedc7ec5f1df07b42a2d8e4fae405cc41998f743da820523b4a1a175
|
File details
Details for the file flowodm-0.3.1-py3-none-any.whl.
File metadata
- Download URL: flowodm-0.3.1-py3-none-any.whl
- Upload date:
- Size: 31.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
80093a6f239d2b36caa26ffc99f689106524affc88c186eb169e5887cd018baa
|
|
| MD5 |
3d14281144ae779ce44c58f438a185c1
|
|
| BLAKE2b-256 |
e93d472f725e72a2f6cbc505c54c7bcdb9f1024da79a6c57f72da55a4c3cf5a7
|