cezzis-kafka is a lightweight Python library for working with Kafka topics and event streams. It simplifies producing, consuming, and managing messages, enabling seamless integration with distributed systems and real-time data pipelines. Designed for flexibility and ease of use, cezzis-kafka supports both synchronous and asynchronous workflows.
Project description
🚀 Cezzis Kafka
A lightweight, production-ready Python library for working with Apache Kafka. Simplifies message consumption and processing with built-in error handling, multi-process support, and structured logging.
✨ Features
- 🔄 Easy Consumer Management - Simple, intuitive API for Kafka message consumption
- 🏗️ Abstract Processor Interface - Clean separation of concerns with
IKafkaMessageProcessor - ⚡ Multi-Process Support - Built-in support for parallel consumer processes
- 🛡️ Robust Error Handling - Comprehensive error handling with automatic retries
- 📊 Structured Logging - Rich, contextual logging for observability
- 🔌 Confluent Kafka Integration - Built on the reliable
confluent-kafkaclient
📦 Installation
Using Poetry (Recommended)
poetry add cezzis-kafka
Using pip
pip install cezzis-kafka
🚀 Quick Start
1. Create Your Message Processor
Implement the IKafkaMessageProcessor interface to define how messages should be processed:
from cezzis_kafka import IKafkaMessageProcessor, KafkaConsumerSettings
from confluent_kafka import Consumer, Message
class MyMessageProcessor(IKafkaMessageProcessor):
def __init__(self, settings: KafkaConsumerSettings):
self._settings = settings
@staticmethod
def CreateNew(kafka_settings: KafkaConsumerSettings) -> "MyMessageProcessor":
return MyMessageProcessor(kafka_settings)
def kafka_settings(self) -> KafkaConsumerSettings:
return self._settings
def consumer_creating(self) -> None:
"""Handle actions when consumer is being created."""
print("Creating consumer...")
def consumer_created(self, consumer: Consumer | None) -> None:
"""Handle actions when consumer has been created."""
print(f"Consumer created: {consumer}")
def message_received(self, msg: Message) -> None:
"""Process a received Kafka message."""
print(f"Processing: {msg.value().decode('utf-8')}")
def message_error_received(self, msg: Message) -> None:
"""Handle message errors."""
print(f"Error in message: {msg.error()}")
def consumer_subscribed(self) -> None:
"""Handle actions when consumer is subscribed."""
print("Consumer subscribed to topic")
def consumer_stopping(self) -> None:
"""Handle actions when consumer is stopping."""
print("Consumer stopping...")
def message_partition_reached(self, msg: Message) -> None:
"""Handle partition EOF events."""
print(f"Reached end of partition: {msg.partition()}")
2. Configure and Start the Consumer
from cezzis_kafka import KafkaConsumerSettings, start_consumer
from multiprocessing import Event
# Configure Kafka settings
settings = KafkaConsumerSettings(
consumer_id=1,
bootstrap_servers="localhost:9092",
consumer_group="my-consumer-group",
topic_name="my-topic",
num_consumers=1
)
# Create processor instance
processor = MyMessageProcessor.CreateNew(settings)
# Start consuming messages
stop_event = Event()
start_consumer(stop_event, processor)
3. Multi-Process Consumer
Run multiple consumer processes for better throughput using spawn_consumers:
from cezzis_kafka import spawn_consumers
from multiprocessing import Event
# Create a stop event for graceful shutdown
stop_event = Event()
# Spawn 3 consumer processes
spawn_consumers(
factory_type=MyMessageProcessor,
num_consumers=3,
stop_event=stop_event,
bootstrap_servers="localhost:9092",
consumer_group="my-consumer-group",
topic_name="my-topic"
)
The spawn_consumers function will:
- Create the specified number of consumer processes
- Assign each a unique
consumer_id(0, 1, 2, ...) - Start all processes and wait for them to complete
- Handle process lifecycle and logging automatically
📚 API Reference
KafkaConsumerSettings
Configuration class for Kafka consumers.
Attributes:
consumer_id(int): Unique identifier for the consumer instancebootstrap_servers(str): Comma-separated list of Kafka broker addressesconsumer_group(str): Consumer group ID for coordinated consumptiontopic_name(str): Name of the Kafka topic to consume fromnum_consumers(int): Number of consumer processes to run
IKafkaMessageProcessor
Abstract base class for implementing custom message processors.
Abstract Methods:
CreateNew(kafka_settings) -> IKafkaMessageProcessor- Factory method for creating processor instanceskafka_settings() -> KafkaConsumerSettings- Returns the Kafka consumer settingsconsumer_creating() -> None- Lifecycle hook called when consumer is being createdconsumer_created(consumer: Consumer | None) -> None- Lifecycle hook called when consumer has been createdmessage_received(msg: Message) -> None- Process a received Kafka messagemessage_error_received(msg: Message) -> None- Handle errors in received messagesconsumer_subscribed() -> None- Lifecycle hook called when consumer subscribes to topicconsumer_stopping() -> None- Lifecycle hook called when consumer is stoppingmessage_partition_reached(msg: Message) -> None- Handle partition EOF events
spawn_consumers(factory_type, num_consumers, stop_event, bootstrap_servers, consumer_group, topic_name)
Spawns multiple Kafka consumer processes under a single consumer group for parallel message processing.
Parameters:
factory_type(Type[IKafkaMessageProcessor]): The processor class with aCreateNewfactory methodnum_consumers(int): Number of consumer processes to spawnstop_event(Event): Multiprocessing event to signal consumer shutdownbootstrap_servers(str): Comma-separated list of Kafka broker addressesconsumer_group(str): Consumer group ID for coordinated consumptiontopic_name(str): Name of the Kafka topic to consume from
Example:
spawn_consumers(
factory_type=MyMessageProcessor,
num_consumers=3,
stop_event=stop_event,
bootstrap_servers="localhost:9092",
consumer_group="my-group",
topic_name="my-topic"
)
start_consumer(stop_event, processor)
Starts a single Kafka consumer that polls for messages and processes them using the provided processor.
Parameters:
stop_event(Event): Multiprocessing event to signal consumer shutdownprocessor(IKafkaMessageProcessor): Message processor implementation
Example:
processor = MyMessageProcessor.CreateNew(settings)
start_consumer(stop_event, processor)
🛠️ Development
Prerequisites
- Python 3.12+
- Poetry
- Docker (optional, for local Kafka)
Setup Development Environment
# Clone the repository
git clone https://github.com/mtnvencenzo/cezzis-pycore.git
cd cezzis-pycore/kafka-packages
# Install dependencies
make install
# Activate virtual environment
poetry shell
Running Tests
# Run all tests
make test
# Run with coverage
pytest --cov=cezzis_kafka --cov-report=html
Code Quality
# Run linting and formatting
make standards
# Run individually
make ruff-check # Check code style
make ruff-format # Format code
Build Package
# Build distribution packages
poetry build
🧪 Testing with Local Kafka
Using Docker Compose
# Start local Kafka cluster
docker run -d \
--name kafka \
-p 9092:9092 \
-e KAFKA_ENABLE_KRAFT=yes \
-e KAFKA_CFG_PROCESS_ROLES=broker,controller \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
bitnami/kafka:latest
# Create a test topic
docker exec kafka kafka-topics.sh \
--create \
--topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
🤝 Contributing
We welcome contributions! Please see our Contributing Guide for details.
Development Workflow
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Run tests and linting (
make test && make standards) - Commit your changes (
git commit -m 'feat: add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🔗 Links
- Documentation: [Coming Soon]
- Issue Tracker: GitHub Issues
- Source Code: GitHub
📞 Support
- 📧 Email: rvecchi@gmail.com
- 🐛 Issues: GitHub Issues
- 💬 Discussions: GitHub Discussions
🙏 Acknowledgments
Built with:
- Confluent Kafka Python - The underlying Kafka client
- Poetry - Dependency management and packaging
Made with ❤️ by the Cezzis team
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cezzis_kafka-0.0.1.tar.gz.
File metadata
- Download URL: cezzis_kafka-0.0.1.tar.gz
- Upload date:
- Size: 9.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4c364348de746c462526bc43e5ffc38baa02dac85acf51b2c2373d8bed9c6abc
|
|
| MD5 |
ffe9ecec67a0a0864d1e3df935ff959d
|
|
| BLAKE2b-256 |
c9916c96320058451dd6b96d28141bf6e79e1f7fa18c7e731b259024ebf2f589
|
File details
Details for the file cezzis_kafka-0.0.1-py3-none-any.whl.
File metadata
- Download URL: cezzis_kafka-0.0.1-py3-none-any.whl
- Upload date:
- Size: 8.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
64ddb2ab9b4ffaba426e1ebf6eb59a45dd245e9b4f9876e631ff9335a565b78d
|
|
| MD5 |
6e42745c52047a5b08e53fa90bf97629
|
|
| BLAKE2b-256 |
1eaa1851ae68093a4c9ba04dde97e2e63f9383bd5ae2b72df30df7004bfde153
|