Skip to main content

A lightweight ODM for Apache Kafka with Avro schema support

Project description

FlowODM Logo

FlowODM

PyPI version Python versions Build Status Coverage License Code style: black

A lightweight ODM for Apache Kafka® with Avro schema support.

FlowODM provides a Pydantic v2-based interface for working with Kafka messages and Avro schemas. It supports both synchronous and asynchronous operations, making it ideal for building microservices.

Features

  • Pydantic v2 Models: Define Kafka messages as Pydantic models
  • Avro Schema Support: Auto-generate schemas or validate against existing ones
  • Schema Registry Integration: Full support for Confluent Schema Registry
  • Dual Sync/Async API: Both sync and async methods for all operations
  • Consumer Loop Patterns: Ready-to-use patterns for building microservices
  • Predefined Settings: Optimized configurations for different use cases
  • CLI Tools: Validate schemas, upload to registry, check compatibility

Installation

pip install flowodm

Quick Start

Define a Model

from datetime import datetime
from flowodm import FlowBaseModel, connect

# Connect to Kafka
connect(
    bootstrap_servers="localhost:9092",
    schema_registry_url="http://localhost:8081"
)

# Define your model
class UserEvent(FlowBaseModel):
    class Settings:
        topic = "user-events"
        consumer_group = "my-service"
        key_field = "user_id"

    user_id: str
    action: str
    timestamp: datetime

Produce Messages

# Sync
event = UserEvent(user_id="123", action="login", timestamp=datetime.now())
event.produce()

# Async
await event.aproduce()

# Batch
events = [UserEvent(...) for _ in range(100)]
UserEvent.produce_many(events)

Consume Messages

# Single message
event = UserEvent.consume_one(timeout=5.0)

# Iterator
for event in UserEvent.consume_iter():
    print(f"User {event.user_id} performed {event.action}")

# Async iterator
async for event in UserEvent.aconsume_iter():
    await process(event)

Consumer Loop for Microservices

from flowodm import ConsumerLoop, LongRunningSettings

def process_event(event: UserEvent) -> None:
    # Your processing logic
    print(f"Processing {event.user_id}")

loop = ConsumerLoop(
    model=UserEvent,
    handler=process_event,
    settings=LongRunningSettings(),  # Tolerates long processing
    commit_strategy="before_processing",  # Prevent duplicates in parallel pods
)
loop.run()  # Blocking, handles SIGTERM gracefully

Async Consumer Loop

from flowodm import AsyncConsumerLoop

async def process_event(event: UserEvent) -> None:
    await external_api.submit(event)

loop = AsyncConsumerLoop(
    model=UserEvent,
    handler=process_event,
    max_concurrent=20,  # Process up to 20 messages concurrently
)
await loop.run()

Schema Registry Integration

Validate Models Against Registry

from flowodm import validate_against_registry

result = validate_against_registry(UserEvent, "user-events-value")
if not result.is_valid:
    print(f"Errors: {result.errors}")

Generate Models from Schema

from flowodm import generate_model_from_registry

# Generate Pydantic model from registered schema
UserEvent = generate_model_from_registry(
    subject="user-events-value",
    topic="user-events",
)

CLI Tools

# Validate models against Schema Registry
flowodm validate --models myapp.events --registry

# Upload schema to registry
flowodm upload-schema --avro schemas/user_event.avsc --subject user-events-value

# Check compatibility
flowodm check-compatibility --model myapp.events.UserEvent --level BACKWARD

# List all subjects
flowodm list-subjects

Predefined Settings

FlowODM provides optimized settings for different use cases:

Profile Use Case
LongRunningSettings ML inference, complex processing (10 min timeout)
BatchSettings ETL jobs, data aggregation (large batches)
RealTimeSettings Event-driven, notifications (low latency)
HighThroughputSettings High-volume ingestion (max throughput)
ReliableSettings Financial transactions (exactly-once)
from flowodm import LongRunningSettings, ConsumerLoop

loop = ConsumerLoop(
    model=MyModel,
    handler=my_handler,
    settings=LongRunningSettings(),
)

Configuration

Configure via environment variables:

# Kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_USERNAME=your-api-key
KAFKA_SASL_PASSWORD=your-api-secret

# Schema Registry
SCHEMA_REGISTRY_URL=https://your-registry.confluent.cloud
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=your-sr-api-key:your-sr-api-secret  # Or use separate key/secret:
# SCHEMA_REGISTRY_API_KEY=your-sr-api-key
# SCHEMA_REGISTRY_API_SECRET=your-sr-api-secret

Or programmatically:

from flowodm import connect

connect(
    bootstrap_servers="localhost:9092",
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_username="api-key",
    sasl_password="api-secret",
    schema_registry_url="https://registry.confluent.cloud",
    # Option 1: Combined format
    schema_registry_basic_auth_user_info="sr-key:sr-secret",
    # Option 2: Separate key/secret
    # schema_registry_api_key="sr-key",
    # schema_registry_api_secret="sr-secret",
)

CI/CD Schema Validation

Add schema validation to your CI pipeline:

# .github/workflows/schema-validation.yml
- name: Validate schemas
  run: |
    pip install flowodm
    flowodm validate --models myapp.events --registry --strict

See the documentation for more CI/CD examples.

License

Apache License 2.0

Trademark Notice

Apache Kafka is a registered trademark of the Apache Software Foundation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowodm-0.3.0.tar.gz (49.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowodm-0.3.0-py3-none-any.whl (31.4 kB view details)

Uploaded Python 3

File details

Details for the file flowodm-0.3.0.tar.gz.

File metadata

  • Download URL: flowodm-0.3.0.tar.gz
  • Upload date:
  • Size: 49.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flowodm-0.3.0.tar.gz
Algorithm Hash digest
SHA256 8b6d9100747547de911b846cc4c99b99a11e143f7f0fbba84d431c50c98497d2
MD5 f737a4bf82a393a3552b89e68fcf0333
BLAKE2b-256 593fcf6beb18e9bc4c55772027360e410cbe6fe89cff980dc4dc4c4a0759c01e

See more details on using hashes here.

File details

Details for the file flowodm-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: flowodm-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 31.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flowodm-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a554cf1aadddea3551b43f19de93a5699afc1a5af222203a5efcf18d3d65ff3c
MD5 f3efd800c29ab401a40cd44a199a9369
BLAKE2b-256 5370940ef1bfe0d7ccc0aec7023e5afa4ca60beb58163013bf6cc84cb59dff94

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page