Kafka Integration for Muffin framework
Project description
Muffin-Kafka
Muffin-Kafka is an Apache Kafka integration plugin
for the Muffin web framework, built on top of aiokafka.
🚀 Features
- Async Kafka integration using
aiokafka - Single or batch message consumption — stream messages one-by-one or read in batches
- Per-topic task model — each topic is consumed in an isolated asyncio task
- Simple handler registration using
@plugin.handle_topics(...) - Manual or auto-commit support, custom group IDs
- Producer support via
send - Built-in monitoring with offsets, lag, and poll delay
- Healthcheck manage command for liveness probes and observability
- Optional error handler via
@plugin.handle_error(...)
✅ Requirements
- Python ≥ 3.10
- Muffin ≥ 0.71
- Kafka cluster or broker (local or cloud)
📦 Installation
pip install muffin-kafka
⚙️ Usage
from muffin import Application
from muffin_kafka import Kafka
app = Application("example")
# Initialize the plugin with config options
kafka = Kafka(app, bootstrap_servers="localhost:9092", produce=True, listen=True)
🧩 Registering Handlers
Use @kafka.handle_topics(...) to register a handler for specific Kafka topics:
@kafka.handle_topics("events.user", "events.auth")
async def handle_event(message):
data = message.value.decode()
print("Received:", data)
You can also register a global error handler:
@kafka.handle_error
async def on_error(exc):
print("Kafka error:", exc)
📤 Sending Messages
You can send messages to Kafka topics using the send method:
# Send a message
await kafka.send("events.user", {"action": "signup"}, key="user123")
🔄 Healthcheck
Run the kafka-healthcheck management command to check consumer lag:
python app.py kafka-healthcheck
Or programmatically via Muffin's manage command:
# Check specific topics
await app.manage.commands["kafka-healthcheck"]("events.user", max_lag=1000)
📊 Monitoring
If monitor=True is passed, the plugin will log:
- Committed offsets
- Latest offsets
- Poll timestamps
- Per-partition lag and delay
This data can be extended for Prometheus/Grafana metrics or alerting.
⚙️ Configuration Options
You can pass configuration options either as keyword arguments to the plugin:
kafka = Kafka(app, bootstrap_servers="localhost:9092", produce=True)
Or set them via Muffin's config system (e.g. .env, YAML):
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092",
"KAFKA_PRODUCE": True,
Supported Options
| Option | Type | Default | Description |
|---|---|---|---|
bootstrap_servers |
str |
"localhost:9092" |
Kafka broker connection string |
group_id |
str |
None |
Kafka consumer group ID |
client_id |
str |
"muffin" |
Kafka client ID |
produce |
bool |
False |
Enable Kafka producer |
listen |
bool |
True |
Enable consumers (message listening) |
monitor |
bool |
False |
Enable internal consumer monitor |
batch_size |
int |
None |
Read messages in batches (uses getmany()) |
monitor_interval |
int |
60 |
Monitor frequency in seconds |
auto_offset_reset |
str |
"earliest" |
Where to start if no committed offset |
enable_auto_commit |
bool |
False |
Auto-commit offsets after each message/batch. When False, the plugin commits manually after all handlers succeed. |
max_poll_records |
int |
None |
Max records to poll in one batch |
request_timeout_ms |
int |
30000 |
Request timeout |
retry_backoff_ms |
int |
1000 |
Retry interval on failure |
security_protocol |
str |
"PLAINTEXT" |
Kafka protocol (SSL, SASL_PLAINTEXT, …) |
sasl_mechanism |
str |
"PLAIN" |
SASL auth mechanism |
sasl_plain_username |
str |
None |
SASL auth user |
sasl_plain_password |
str |
None |
SASL auth password |
ssl_cafile |
str |
None |
Path to trusted CA certs |
🐞 Bug Tracker
Found a bug or have a feature request? Please open an issue at: https://github.com/klen/muffin-kafka/issues
🤝 Contributing
Pull requests are welcome! Development happens here: https://github.com/klen/muffin-kafka
🪪 License
MIT – See LICENSE for full details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file muffin_kafka-2.1.5.tar.gz.
File metadata
- Download URL: muffin_kafka-2.1.5.tar.gz
- Upload date:
- Size: 8.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.14 {"installer":{"name":"uv","version":"0.11.14","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
28f9981705bdef7a051169c4a11d634f455763b2bfc74041cd245bdfb7202a76
|
|
| MD5 |
249507c5b8f3b693e382825d4b1f896c
|
|
| BLAKE2b-256 |
d207838123eba332ebe628a278cf6149f47595af85602ca31c974f3ff27c1d74
|
File details
Details for the file muffin_kafka-2.1.5-py3-none-any.whl.
File metadata
- Download URL: muffin_kafka-2.1.5-py3-none-any.whl
- Upload date:
- Size: 11.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.14 {"installer":{"name":"uv","version":"0.11.14","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a6e39d7d571787de82d5f3be1678ae29637641848336027980c1e3dcfae3b274
|
|
| MD5 |
38de463fc781fd9433aa32a08440cbd1
|
|
| BLAKE2b-256 |
7c7dc2bc4e5a56dbf06fc9aea008654ae18c44937d51600bb25804941ccd8b44
|