Skip to main content

A lightweight ODM for Apache Kafka with Avro schema support

Project description

FlowODM

PyPI version Python versions Build Status Coverage License Code style: black

A lightweight ODM for Apache Kafka® with Avro schema support.

FlowODM provides a Pydantic v2-based interface for working with Kafka messages and Avro schemas. It supports both synchronous and asynchronous operations, making it ideal for building microservices.

Features

  • Pydantic v2 Models: Define Kafka messages as Pydantic models
  • Avro Schema Support: Auto-generate schemas or validate against existing ones
  • Schema Registry Integration: Full support for Confluent Schema Registry
  • Dual Sync/Async API: Both sync and async methods for all operations
  • Consumer Loop Patterns: Ready-to-use patterns for building microservices
  • Predefined Settings: Optimized configurations for different use cases
  • CLI Tools: Validate schemas, upload to registry, check compatibility

Installation

pip install flowodm

Quick Start

Define a Model

from datetime import datetime
from flowodm import FlowBaseModel, connect

# Connect to Kafka
connect(
    bootstrap_servers="localhost:9092",
    schema_registry_url="http://localhost:8081"
)

# Define your model
class UserEvent(FlowBaseModel):
    class Settings:
        topic = "user-events"
        consumer_group = "my-service"
        key_field = "user_id"

    user_id: str
    action: str
    timestamp: datetime

Produce Messages

# Sync
event = UserEvent(user_id="123", action="login", timestamp=datetime.now())
event.produce()

# Async
await event.aproduce()

# Batch
events = [UserEvent(...) for _ in range(100)]
UserEvent.produce_many(events)

Consume Messages

# Single message
event = UserEvent.consume_one(timeout=5.0)

# Iterator
for event in UserEvent.consume_iter():
    print(f"User {event.user_id} performed {event.action}")

# Async iterator
async for event in UserEvent.aconsume_iter():
    await process(event)

Consumer Loop for Microservices

from flowodm import ConsumerLoop, LongRunningSettings

def process_event(event: UserEvent) -> None:
    # Your processing logic
    print(f"Processing {event.user_id}")

loop = ConsumerLoop(
    model=UserEvent,
    handler=process_event,
    settings=LongRunningSettings(),  # Tolerates long processing
)
loop.run()  # Blocking, handles SIGTERM gracefully

Async Consumer Loop

from flowodm import AsyncConsumerLoop

async def process_event(event: UserEvent) -> None:
    await external_api.submit(event)

loop = AsyncConsumerLoop(
    model=UserEvent,
    handler=process_event,
    max_concurrent=20,  # Process up to 20 messages concurrently
)
await loop.run()

Schema Registry Integration

Validate Models Against Registry

from flowodm import validate_against_registry

result = validate_against_registry(UserEvent, "user-events-value")
if not result.is_valid:
    print(f"Errors: {result.errors}")

Generate Models from Schema

from flowodm import generate_model_from_registry

# Generate Pydantic model from registered schema
UserEvent = generate_model_from_registry(
    subject="user-events-value",
    topic="user-events",
)

CLI Tools

# Validate models against Schema Registry
flowodm validate --models myapp.events --registry

# Upload schema to registry
flowodm upload-schema --avro schemas/user_event.avsc --subject user-events-value

# Check compatibility
flowodm check-compatibility --model myapp.events.UserEvent --level BACKWARD

# List all subjects
flowodm list-subjects

Predefined Settings

FlowODM provides optimized settings for different use cases:

Profile Use Case
LongRunningSettings ML inference, complex processing (10 min timeout)
BatchSettings ETL jobs, data aggregation (large batches)
RealTimeSettings Event-driven, notifications (low latency)
HighThroughputSettings High-volume ingestion (max throughput)
ReliableSettings Financial transactions (exactly-once)
from flowodm import LongRunningSettings, ConsumerLoop

loop = ConsumerLoop(
    model=MyModel,
    handler=my_handler,
    settings=LongRunningSettings(),
)

Configuration

Configure via environment variables:

# Kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_USERNAME=your-api-key
KAFKA_SASL_PASSWORD=your-api-secret

# Schema Registry
SCHEMA_REGISTRY_URL=https://your-registry.confluent.cloud
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=your-sr-api-key:your-sr-api-secret  # Or use separate key/secret:
# SCHEMA_REGISTRY_API_KEY=your-sr-api-key
# SCHEMA_REGISTRY_API_SECRET=your-sr-api-secret

Or programmatically:

from flowodm import connect

connect(
    bootstrap_servers="localhost:9092",
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_username="api-key",
    sasl_password="api-secret",
    schema_registry_url="https://registry.confluent.cloud",
    # Option 1: Combined format
    schema_registry_basic_auth_user_info="sr-key:sr-secret",
    # Option 2: Separate key/secret
    # schema_registry_api_key="sr-key",
    # schema_registry_api_secret="sr-secret",
)

CI/CD Schema Validation

Add schema validation to your CI pipeline:

# .github/workflows/schema-validation.yml
- name: Validate schemas
  run: |
    pip install flowodm
    flowodm validate --models myapp.events --registry --strict

See the documentation for more CI/CD examples.

License

Apache License 2.0

Trademark Notice

Apache Kafka is a registered trademark of the Apache Software Foundation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowodm-0.1.0.tar.gz (33.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowodm-0.1.0-py3-none-any.whl (29.7 kB view details)

Uploaded Python 3

File details

Details for the file flowodm-0.1.0.tar.gz.

File metadata

  • Download URL: flowodm-0.1.0.tar.gz
  • Upload date:
  • Size: 33.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flowodm-0.1.0.tar.gz
Algorithm Hash digest
SHA256 06630c7bbdbf44a4a1935a28e50b6fb2a89769d1563aa4fc51024bb11ee19413
MD5 a832f579cddc87067656099a38dfd446
BLAKE2b-256 3b5eeb2b0ba3348719e99e1444ebb4c29b9fa619f74f9244a243cf64c1546b40

See more details on using hashes here.

File details

Details for the file flowodm-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: flowodm-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 29.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flowodm-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0187df188ebe2b3087e574e240be418b247dadd80f37f86049b09d46443d3bcd
MD5 a1abf96eb59500bd77d30bc1073456ee
BLAKE2b-256 faf890c5060c531ccfcfcfa95c599f035ca5e4d7b8729c79f1d07a8d2eb89243

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page