Intelligent Kafka producer with real-time, lag-aware partition selection
Project description
Kafka Smart Producer
Kafka Smart Producer is a Python library that extends confluent-kafka-python with intelligent, real-time, lag-aware partition selection. It solves the "hot partition" problem by monitoring consumer health and routing messages to the healthiest partitions.
🚀 Key Features
- Intelligent Partition Selection: Routes messages to healthy partitions based on real-time consumer lag monitoring
- Performance Optimized: Sub-millisecond overhead with smart caching strategies
- Dual API Support: Both synchronous and asynchronous producer implementations
- Flexible Architecture: Protocol-based design with extensible lag collection and health calculation
- Graceful Degradation: Falls back to default partitioner when health data is unavailable
- Simple Configuration: Minimal setup with sensible defaults, advanced configuration when needed
📦 Installation
pip install kafka-smart-producer
Optional Dependencies
For Redis-based distributed caching:
pip install kafka-smart-producer[redis]
For development:
pip install kafka-smart-producer[dev]
🔧 Quick Start
Minimal Configuration
from kafka_smart_producer import SmartProducer, SmartProducerConfig
# Simple setup - just specify Kafka, topics, and consumer group
config = SmartProducerConfig.from_dict({
"bootstrap.servers": "localhost:9092",
"topics": ["orders", "payments"],
"consumer_group": "order-processors" # Automatically enables health monitoring
})
# Create producer with automatic health monitoring
with SmartProducer(config) as producer:
# Messages are automatically routed to healthy partitions
producer.produce(
topic="orders",
key=b"customer-123",
value=b"order-data"
)
# Manual flush for guaranteed delivery
producer.flush()
Advanced Configuration
# Full control over health monitoring and caching
config = SmartProducerConfig.from_dict({
"bootstrap.servers": "localhost:9092",
"topics": ["orders", "payments"],
"health_manager": {
"consumer_group": "order-processors",
"health_threshold": 0.3, # More sensitive to lag
"refresh_interval": 3.0, # Faster refresh
"max_lag_for_health": 500, # Lower lag threshold
},
"cache": {
"local_max_size": 2000,
"local_ttl_seconds": 600,
"remote_enabled": True, # Redis caching
"redis_host": "localhost",
"redis_port": 6379
}
})
with SmartProducer(config) as producer:
# Get health information
healthy_partitions = producer.health_manager.get_healthy_partitions("orders")
health_summary = producer.health_manager.get_health_summary()
# Produce with smart partitioning
producer.produce(topic="orders", key=b"key", value=b"value")
Async Producer
from kafka_smart_producer import AsyncSmartProducer
async def main():
config = SmartProducerConfig.from_dict({
"bootstrap.servers": "localhost:9092",
"topics": ["orders"],
"consumer_group": "processors"
})
async with AsyncSmartProducer(config) as producer:
await producer.produce(topic="orders", key=b"key", value=b"value")
await producer.flush()
# Run with asyncio.run(main())
🏗️ Architecture
Core Components
-
LagDataCollector Protocol: Fetches consumer lag data from various sources
KafkaAdminLagCollector: Uses Kafka AdminClient (default)- Extensible for custom data sources (Redis, Prometheus, etc.)
-
HotPartitionCalculator Protocol: Transforms lag data into health scores
ThresholdHotPartitionCalculator: Basic threshold-based scoring (default)- Extensible for custom health algorithms
-
HealthManager: Central coordinator for health monitoring
PartitionHealthMonitor: Sync implementation with threadingAsyncPartitionHealthMonitor: Async implementation with asyncio
Caching Strategy
- L1 Cache: In-memory LRU cache for sub-millisecond lookups
- L2 Cache: Optional Redis-based distributed cache
- Strategy: Read-through pattern with TTL-based invalidation
🔄 How It Works
- Health Monitoring: Background threads/tasks continuously monitor consumer lag for configured topics
- Health Scoring: Lag data is converted to health scores (0.0-1.0) using configurable algorithms
- Partition Selection: During message production, the producer selects partitions with health scores above the threshold
- Caching: Health data is cached to minimize latency impact on message production
- Fallback: If no healthy partitions are available, falls back to confluent-kafka's default partitioner
📊 Performance
- Overhead: <1ms additional latency per message
- Throughput: Minimal impact on producer throughput
- Memory: Efficient caching with configurable TTL and size limits
- Network: Optional Redis caching for distributed deployments
🔧 Configuration Options
SmartProducerConfig
| Parameter | Type | Default | Description |
|---|---|---|---|
kafka_config |
dict | Required | Standard confluent-kafka producer config |
topics |
list[str] | Required | Topics for smart partitioning |
consumer_group |
str | None | Consumer group for health monitoring (simplified config) |
health_manager |
dict | None | Detailed health manager configuration |
cache |
dict | None | Caching configuration |
smart_enabled |
bool | True | Enable/disable smart partitioning |
key_stickiness |
bool | True | Enable partition stickiness for keys |
Health Manager Configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
consumer_group |
str | Required | Consumer group to monitor |
health_threshold |
float | 0.5 | Minimum health score for healthy partitions |
refresh_interval |
float | 5.0 | Seconds between health data refreshes |
max_lag_for_health |
int | 1000 | Maximum lag for 0.0 health score |
timeout_seconds |
float | 5.0 | Timeout for lag collection operations |
🧪 Testing
# Install with dev dependencies
pip install kafka-smart-producer[dev]
# Run tests
pytest
# Run with coverage
pytest --cov=kafka_smart_producer
# Type checking
mypy src/
# Linting
ruff check .
🤝 Contributing
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
📝 License
This project is licensed under the MIT License - see the LICENSE file for details.
🙋♂️ Support
🔄 Version History
0.1.0 (Initial Release)
- Core smart partitioning functionality
- Sync and async producer implementations
- Health monitoring with threading/asyncio
- Flexible caching with local and Redis support
- Protocol-based extensible architecture
- Comprehensive test suite
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kafka_smart_producer-0.0.1.tar.gz.
File metadata
- Download URL: kafka_smart_producer-0.0.1.tar.gz
- Upload date:
- Size: 94.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
316e8b3f790059a5a1e090fd29fe34185d150f755c6e0af3748d2b2d3752ebf9
|
|
| MD5 |
52692cce4624e0b2bb8b5a913bf4fb2c
|
|
| BLAKE2b-256 |
727484ac84dba9ebd07982b8ef129df49b338670bf8dc53a49569ad9697590c5
|
Provenance
The following attestation bundles were made for kafka_smart_producer-0.0.1.tar.gz:
Publisher:
release.yml on namphv/kafka-smart-producer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kafka_smart_producer-0.0.1.tar.gz -
Subject digest:
316e8b3f790059a5a1e090fd29fe34185d150f755c6e0af3748d2b2d3752ebf9 - Sigstore transparency entry: 272865398
- Sigstore integration time:
-
Permalink:
namphv/kafka-smart-producer@ed4bb52126b10f156a902052b26d852cfd627768 -
Branch / Tag:
refs/tags/v0.0.1 - Owner: https://github.com/namphv
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@ed4bb52126b10f156a902052b26d852cfd627768 -
Trigger Event:
release
-
Statement type:
File details
Details for the file kafka_smart_producer-0.0.1-py3-none-any.whl.
File metadata
- Download URL: kafka_smart_producer-0.0.1-py3-none-any.whl
- Upload date:
- Size: 43.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
752eb2399fa0bff172c4013ccd6cf164334601daa1c15f3b6fbea91f4fb80849
|
|
| MD5 |
d7ff25708c651409dfeef740033bdd1d
|
|
| BLAKE2b-256 |
ae111b199ca61b1309c0139af6dc2a06047d33aee856d3c37f2a4233a22dcc92
|
Provenance
The following attestation bundles were made for kafka_smart_producer-0.0.1-py3-none-any.whl:
Publisher:
release.yml on namphv/kafka-smart-producer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kafka_smart_producer-0.0.1-py3-none-any.whl -
Subject digest:
752eb2399fa0bff172c4013ccd6cf164334601daa1c15f3b6fbea91f4fb80849 - Sigstore transparency entry: 272865400
- Sigstore integration time:
-
Permalink:
namphv/kafka-smart-producer@ed4bb52126b10f156a902052b26d852cfd627768 -
Branch / Tag:
refs/tags/v0.0.1 - Owner: https://github.com/namphv
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@ed4bb52126b10f156a902052b26d852cfd627768 -
Trigger Event:
release
-
Statement type: