Skip to main content

Intelligent Kafka producer with real-time, lag-aware partition selection

Project description

Kafka Smart Producer

CI PyPI version Python Version License: MIT

Kafka Smart Producer is a Python library that extends confluent-kafka-python with intelligent, real-time, lag-aware partition selection. It solves the "hot partition" problem by monitoring consumer health and routing messages to the healthiest partitions.

🚀 Key Features

  • Intelligent Partition Selection: Routes messages to healthy partitions based on real-time consumer lag monitoring
  • Performance Optimized: Sub-millisecond overhead with smart caching strategies
  • Dual API Support: Both synchronous and asynchronous producer implementations
  • Flexible Architecture: Protocol-based design with extensible lag collection and health calculation
  • Graceful Degradation: Falls back to default partitioner when health data is unavailable
  • Simple Configuration: Minimal setup with sensible defaults, advanced configuration when needed

📦 Installation

pip install kafka-smart-producer

Optional Dependencies

For Redis-based distributed caching:

pip install kafka-smart-producer[redis]

For development:

pip install kafka-smart-producer[dev]

🔧 Quick Start

Minimal Configuration

from kafka_smart_producer import SmartProducer, SmartProducerConfig

# Simple setup - just specify Kafka, topics, and consumer group
config = SmartProducerConfig.from_dict({
    "bootstrap.servers": "localhost:9092",
    "topics": ["orders", "payments"],
    "consumer_group": "order-processors"  # Automatically enables health monitoring
})

# Create producer with automatic health monitoring
with SmartProducer(config) as producer:
    # Messages are automatically routed to healthy partitions
    producer.produce(
        topic="orders",
        key=b"customer-123",
        value=b"order-data"
    )

    # Manual flush for guaranteed delivery
    producer.flush()

Advanced Configuration

# Full control over health monitoring and caching
config = SmartProducerConfig.from_dict({
    "bootstrap.servers": "localhost:9092",
    "topics": ["orders", "payments"],
    "health_manager": {
        "consumer_group": "order-processors",
        "health_threshold": 0.3,      # More sensitive to lag
        "refresh_interval": 3.0,      # Faster refresh
        "max_lag_for_health": 500,    # Lower lag threshold
    },
    "cache": {
        "local_max_size": 2000,
        "local_ttl_seconds": 600,
        "remote_enabled": True,       # Redis caching
        "redis_host": "localhost",
        "redis_port": 6379
    }
})

with SmartProducer(config) as producer:
    # Get health information
    healthy_partitions = producer.health_manager.get_healthy_partitions("orders")
    health_summary = producer.health_manager.get_health_summary()

    # Produce with smart partitioning
    producer.produce(topic="orders", key=b"key", value=b"value")

Async Producer

from kafka_smart_producer import AsyncSmartProducer

async def main():
    config = SmartProducerConfig.from_dict({
        "bootstrap.servers": "localhost:9092",
        "topics": ["orders"],
        "consumer_group": "processors"
    })

    async with AsyncSmartProducer(config) as producer:
        await producer.produce(topic="orders", key=b"key", value=b"value")
        await producer.flush()

# Run with asyncio.run(main())

🏗️ Architecture

Core Components

  1. LagDataCollector Protocol: Fetches consumer lag data from various sources

    • KafkaAdminLagCollector: Uses Kafka AdminClient (default)
    • Extensible for custom data sources (Redis, Prometheus, etc.)
  2. HotPartitionCalculator Protocol: Transforms lag data into health scores

    • ThresholdHotPartitionCalculator: Basic threshold-based scoring (default)
    • Extensible for custom health algorithms
  3. HealthManager: Central coordinator for health monitoring

    • PartitionHealthMonitor: Sync implementation with threading
    • AsyncPartitionHealthMonitor: Async implementation with asyncio

Caching Strategy

  • L1 Cache: In-memory LRU cache for sub-millisecond lookups
  • L2 Cache: Optional Redis-based distributed cache
  • Strategy: Read-through pattern with TTL-based invalidation

🔄 How It Works

  1. Health Monitoring: Background threads/tasks continuously monitor consumer lag for configured topics
  2. Health Scoring: Lag data is converted to health scores (0.0-1.0) using configurable algorithms
  3. Partition Selection: During message production, the producer selects partitions with health scores above the threshold
  4. Caching: Health data is cached to minimize latency impact on message production
  5. Fallback: If no healthy partitions are available, falls back to confluent-kafka's default partitioner

📊 Performance

  • Overhead: <1ms additional latency per message
  • Throughput: Minimal impact on producer throughput
  • Memory: Efficient caching with configurable TTL and size limits
  • Network: Optional Redis caching for distributed deployments

🔧 Configuration Options

SmartProducerConfig

Parameter Type Default Description
kafka_config dict Required Standard confluent-kafka producer config
topics list[str] Required Topics for smart partitioning
consumer_group str None Consumer group for health monitoring (simplified config)
health_manager dict None Detailed health manager configuration
cache dict None Caching configuration
smart_enabled bool True Enable/disable smart partitioning
key_stickiness bool True Enable partition stickiness for keys

Health Manager Configuration

Parameter Type Default Description
consumer_group str Required Consumer group to monitor
health_threshold float 0.5 Minimum health score for healthy partitions
refresh_interval float 5.0 Seconds between health data refreshes
max_lag_for_health int 1000 Maximum lag for 0.0 health score
timeout_seconds float 5.0 Timeout for lag collection operations

🧪 Testing

# Install with dev dependencies
pip install kafka-smart-producer[dev]

# Run tests
pytest

# Run with coverage
pytest --cov=kafka_smart_producer

# Type checking
mypy src/

# Linting
ruff check .

🤝 Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

📝 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙋‍♂️ Support

🔄 Version History

0.1.0 (Initial Release)

  • Core smart partitioning functionality
  • Sync and async producer implementations
  • Health monitoring with threading/asyncio
  • Flexible caching with local and Redis support
  • Protocol-based extensible architecture
  • Comprehensive test suite

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_smart_producer-0.0.1.tar.gz (94.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_smart_producer-0.0.1-py3-none-any.whl (43.5 kB view details)

Uploaded Python 3

File details

Details for the file kafka_smart_producer-0.0.1.tar.gz.

File metadata

  • Download URL: kafka_smart_producer-0.0.1.tar.gz
  • Upload date:
  • Size: 94.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for kafka_smart_producer-0.0.1.tar.gz
Algorithm Hash digest
SHA256 316e8b3f790059a5a1e090fd29fe34185d150f755c6e0af3748d2b2d3752ebf9
MD5 52692cce4624e0b2bb8b5a913bf4fb2c
BLAKE2b-256 727484ac84dba9ebd07982b8ef129df49b338670bf8dc53a49569ad9697590c5

See more details on using hashes here.

Provenance

The following attestation bundles were made for kafka_smart_producer-0.0.1.tar.gz:

Publisher: release.yml on namphv/kafka-smart-producer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kafka_smart_producer-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for kafka_smart_producer-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 752eb2399fa0bff172c4013ccd6cf164334601daa1c15f3b6fbea91f4fb80849
MD5 d7ff25708c651409dfeef740033bdd1d
BLAKE2b-256 ae111b199ca61b1309c0139af6dc2a06047d33aee856d3c37f2a4233a22dcc92

See more details on using hashes here.

Provenance

The following attestation bundles were made for kafka_smart_producer-0.0.1-py3-none-any.whl:

Publisher: release.yml on namphv/kafka-smart-producer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page