A FastAPI-style Python framework for Kafka-based applications
Project description
Kafka Framework
A FastAPI-inspired framework for building Kafka applications in Python with a focus on developer experience and robust features.
Features
- FastAPI-style routing with decorators
- Dependency injection system
- Pluggable serialization (JSON, Protobuf, Avro)
- Priority-based message processing
- Configurable retry mechanism with exception filtering
- Dead Letter Queue (DLQ) support
- Async/await patterns using aiokafka
- Type hints throughout
Installation
Basic installation:
pip install kafka-framework
With Avro support:
pip install kafka-framework[avro]
With all extras:
pip install kafka-framework[all]
Quick Start
from kafka_framework import KafkaApp, TopicRouter, Depends
from kafka_framework.serialization import JSONSerializer
# Create the app instance
app = KafkaApp(
bootstrap_servers=["localhost:9092"],
group_id="my-consumer-group",
serializer=JSONSerializer()
)
# Create a router
router = TopicRouter()
# Define dependencies
async def get_db():
# Return database connection
return {"connection": "db"}
def get_config():
return {"env": "production"}
# Define event handlers
@router.topic_event("orders", "order_created", priority=1)
async def handle_order_created(message, db=Depends(get_db), config=Depends(get_config)):
order = message.value
print(f"Processing order {order['id']} with config {config}")
# Process order...
@router.topic_event(
"orders",
"order_cancelled",
priority=2,
retry_attempts=3,
dlq_topic="orders_dlq"
)
async def handle_order_cancelled(message):
order = message.value
print(f"Cancelling order {order['id']}")
# Cancel order...
# Include router in app
app.include_router(router)
# Run the app
async def main():
async with app.lifespan():
await app.start()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Advanced Features
Priority-based Processing
Messages are processed based on handler priority (higher numbers first):
@router.topic_event("notifications", "high_priority", priority=10)
async def handle_high_priority(message):
# Processed first
pass
@router.topic_event("notifications", "low_priority", priority=1)
async def handle_low_priority(message):
# Processed after high priority
pass
Retry Mechanism
Configure retries with exponential backoff:
from kafka_framework.kafka import RetryConfig
from kafka_framework.exceptions import RetryableError
retry_config = RetryConfig(
max_attempts=3,
initial_delay=1.0,
max_delay=60.0,
exponential_base=2.0,
exceptions=[RetryableError]
)
@router.topic_event(
"payments",
"payment_processed",
retry_attempts=3,
retry_config=retry_config
)
async def handle_payment(message):
# Will retry up to 3 times with exponential backoff
pass
Dead Letter Queue (DLQ)
Handle failed messages with DLQ:
@router.topic_event(
"orders",
"order_created",
dlq_topic="orders_dlq"
)
async def handle_order(message):
# Failed messages will be sent to orders_dlq topic
pass
Custom Serialization
Use Avro serialization (requires kafka-framework[avro]):
from kafka_framework.serialization import AvroSerializer
schema = {
"type": "record",
"name": "Order",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
app = KafkaApp(
bootstrap_servers=["localhost:9092"],
serializer=AvroSerializer(
schema_registry_url="http://localhost:8081",
schema_str=json.dumps(schema)
)
)
Message Headers
Access message headers and metadata:
@router.topic_event("orders", "order_created")
async def handle_order(message):
# Access message data
order_data = message.value
# Access message headers
print(f"Data version: {message.headers.data_version}")
print(f"Timestamp: {message.headers.timestamp}")
# Access retry information (if being retried)
if message.headers.retry:
print(f"Retry count: {message.headers.retry.retry_count}")
print(f"Last retry: {message.headers.retry.last_retried_timestamp}")
Configuration
Consumer Configuration
app = KafkaApp(
bootstrap_servers=["localhost:9092"],
group_id="my-group",
config={
"consumer_config": {
"auto_offset_reset": "earliest",
"enable_auto_commit": True,
"max_poll_records": 500
}
}
)
Producer Configuration
app = KafkaApp(
bootstrap_servers=["localhost:9092"],
config={
"producer_config": {
"acks": "all",
"compression_type": "gzip",
"max_request_size": 1048576
}
}
)
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kafka_framework-0.1.4.tar.gz.
File metadata
- Download URL: kafka_framework-0.1.4.tar.gz
- Upload date:
- Size: 34.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1db5c416b3c7a137a693593bafd99b804bad121641ca2df3f845316f84fac386
|
|
| MD5 |
d83cbf84bcbb1e96b76ab8ae4584146d
|
|
| BLAKE2b-256 |
f2a3f57d5a978e2535caa439546ff41875be23e23516ae0b1517ff5e681bc8e7
|
File details
Details for the file kafka_framework-0.1.4-py3-none-any.whl.
File metadata
- Download URL: kafka_framework-0.1.4-py3-none-any.whl
- Upload date:
- Size: 22.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
19cc9e74f353e52bb038b420dde73d38ff616f2d37618cd421b1108e192bd67a
|
|
| MD5 |
214e4eb0166378c14ea061a11c7c1eed
|
|
| BLAKE2b-256 |
6ac41e0971d4d8875611613f6d29bd29e09fb5966f5f4b066cd188127179e900
|