Skip to main content

A lightweight ODM for Apache Kafka with Avro schema support

Project description

FlowODM Logo

FlowODM

PyPI version Python versions Build Status Coverage License Code style: black

A lightweight ODM for Apache Kafka® with Avro schema support.

FlowODM provides a Pydantic v2-based interface for working with Kafka messages and Avro schemas. It supports both synchronous and asynchronous operations, making it ideal for building microservices.

Features

  • Pydantic v2 Models: Define Kafka messages as Pydantic models
  • Avro Schema Support: Auto-generate schemas or validate against existing ones
  • Schema Registry Integration: Full support for Confluent Schema Registry
  • Dual Sync/Async API: Both sync and async methods for all operations
  • Consumer Loop Patterns: Ready-to-use patterns for building microservices
  • Predefined Settings: Optimized configurations for different use cases
  • CLI Tools: Validate schemas, upload to registry, check compatibility

Installation

pip install flowodm

Quick Start

Define a Model

from datetime import datetime
from flowodm import FlowBaseModel, connect

# Connect to Kafka
connect(
    bootstrap_servers="localhost:9092",
    schema_registry_url="http://localhost:8081"
)

# Define your model
class UserEvent(FlowBaseModel):
    class Settings:
        topic = "user-events"
        consumer_group = "my-service"
        key_field = "user_id"

    user_id: str
    action: str
    timestamp: datetime

Produce Messages

# Sync
event = UserEvent(user_id="123", action="login", timestamp=datetime.now())
event.produce()

# Async
await event.aproduce()

# Batch
events = [UserEvent(...) for _ in range(100)]
UserEvent.produce_many(events)

Consume Messages

# Single message
event = UserEvent.consume_one(timeout=5.0)

# Iterator
for event in UserEvent.consume_iter():
    print(f"User {event.user_id} performed {event.action}")

# Async iterator
async for event in UserEvent.aconsume_iter():
    await process(event)

Consumer Loop for Microservices

from flowodm import ConsumerLoop, LongRunningSettings

def process_event(event: UserEvent) -> None:
    # Your processing logic
    print(f"Processing {event.user_id}")

loop = ConsumerLoop(
    model=UserEvent,
    handler=process_event,
    settings=LongRunningSettings(),  # Tolerates long processing
    commit_strategy="before_processing",  # Prevent duplicates in parallel pods
)
loop.run()  # Blocking, handles SIGTERM gracefully

Async Consumer Loop

from flowodm import AsyncConsumerLoop

async def process_event(event: UserEvent) -> None:
    await external_api.submit(event)

loop = AsyncConsumerLoop(
    model=UserEvent,
    handler=process_event,
    max_concurrent=20,  # Process up to 20 messages concurrently
)
await loop.run()

Schema Registry Integration

Validate Models Against Registry

from flowodm import validate_against_registry

result = validate_against_registry(UserEvent, "user-events-value")
if not result.is_valid:
    print(f"Errors: {result.errors}")

Generate Models from Schema

from flowodm import generate_model_from_registry

# Generate Pydantic model from registered schema
UserEvent = generate_model_from_registry(
    subject="user-events-value",
    topic="user-events",
)

CLI Tools

# Validate models against Schema Registry
flowodm validate --models myapp.events --registry

# Upload schema to registry
flowodm upload-schema --avro schemas/user_event.avsc --subject user-events-value

# Check compatibility
flowodm check-compatibility --model myapp.events.UserEvent --level BACKWARD

# List all subjects
flowodm list-subjects

Predefined Settings

FlowODM provides optimized settings for different use cases:

Profile Use Case
LongRunningSettings ML inference, complex processing (10 min timeout)
BatchSettings ETL jobs, data aggregation (large batches)
RealTimeSettings Event-driven, notifications (low latency)
HighThroughputSettings High-volume ingestion (max throughput)
ReliableSettings Financial transactions (exactly-once)
from flowodm import LongRunningSettings, ConsumerLoop

loop = ConsumerLoop(
    model=MyModel,
    handler=my_handler,
    settings=LongRunningSettings(),
)

Configuration

Configure via environment variables:

# Kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_USERNAME=your-api-key
KAFKA_SASL_PASSWORD=your-api-secret

# Schema Registry
SCHEMA_REGISTRY_URL=https://your-registry.confluent.cloud
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=your-sr-api-key:your-sr-api-secret  # Or use separate key/secret:
# SCHEMA_REGISTRY_API_KEY=your-sr-api-key
# SCHEMA_REGISTRY_API_SECRET=your-sr-api-secret

Or programmatically:

from flowodm import connect

connect(
    bootstrap_servers="localhost:9092",
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_username="api-key",
    sasl_password="api-secret",
    schema_registry_url="https://registry.confluent.cloud",
    # Option 1: Combined format
    schema_registry_basic_auth_user_info="sr-key:sr-secret",
    # Option 2: Separate key/secret
    # schema_registry_api_key="sr-key",
    # schema_registry_api_secret="sr-secret",
)

CI/CD Schema Validation

Add schema validation to your CI pipeline:

# .github/workflows/schema-validation.yml
- name: Validate schemas
  run: |
    pip install flowodm
    flowodm validate --models myapp.events --registry --strict

See the documentation for more CI/CD examples.

License

Apache License 2.0

Trademark Notice

Apache Kafka is a registered trademark of the Apache Software Foundation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowodm-0.2.2.tar.gz (48.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowodm-0.2.2-py3-none-any.whl (31.1 kB view details)

Uploaded Python 3

File details

Details for the file flowodm-0.2.2.tar.gz.

File metadata

  • Download URL: flowodm-0.2.2.tar.gz
  • Upload date:
  • Size: 48.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flowodm-0.2.2.tar.gz
Algorithm Hash digest
SHA256 0202e43deba8102c1cda08c9c1a2188aec63ff8590b7fa22ffcae0641a89a98f
MD5 67ff83324daeafc4177554c7a4d45555
BLAKE2b-256 d8ca5d41a2c7b07d5e82a96ad3af7a3aa38c90a079e494bc8f52209d621f78af

See more details on using hashes here.

File details

Details for the file flowodm-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: flowodm-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 31.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flowodm-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 2621afef22415b17ed6111b8c300fd645ee29ce8e861840d825cc7e8bbd7f2e2
MD5 3b49c96594b2f6dfe439123a8bd6a54f
BLAKE2b-256 0cc37b79b49b8715f8fd207e5c8a4de01b6d894dfa127a8fd181920490d9c7bb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page