A CLI tool to dump and replay Kafka messages using Parquet
Project description
kafka-replay-cli
A lightweight, CLI tool for dumping and replaying Kafka messages using Parquet files. Built for observability, debugging, and safe testing of event streams.
Use it to:
- Safely test event streams without impacting production.
- Debug complex Kafka flows by replaying exact message sets.
- Apply filters, transformations, and throttling for precise control.
Features
- Dump Kafka topics into Parquet files
- Replay messages from Parquet back into Kafka
- Filter replays by timestamp range and key
- Optional throttling during replay
- Apply custom transform hooks to modify or skip messages
- Preview replays without sending messages using
--dry-run - Control output verbosity with
--verboseand--quiet - Query message dumps with DuckDB SQL
Installation
pip install kafka-replay-cli
Requires Python 3.8 or newer.
Kafka Broker Requirements
You must have access to a running Kafka broker.
By default, the CLI will attempt to connect to localhost:9092, but you can specify any broker using the --bootstrap-servers option:
--bootstrap-servers my.kafka.broker:9092
Usage
Dump messages from a topic to Parquet
kafka-replay-cli dump \
--topic test-topic \
--output test.parquet \
--bootstrap-servers localhost:9092 \
--max-messages 1000
Advanced Dump Settings
--fetch-max-bytes 1000000 # Max bytes to fetch from broker per poll
Replay messages from a Parquet file
kafka-replay-cli replay \
--input test.parquet \
--topic replayed-topic \
--bootstrap-servers localhost:9092 \
--throttle-ms 100
Advanced Producer Settings
You can fine-tune how the replay produces messages to Kafka:
--acks all # Wait for all replicas to acknowledge (0, 1, or all)
--compression-type gzip # Compress messages (gzip, snappy, lz4, zstd)
--linger-ms 10 # Delay to batch more messages (in milliseconds)
--producer-batch-size 5000 # Max batch size in bytes for Kafka producer
Preview messages without sending (--dry-run)
kafka-replay-cli replay \
--input test.parquet \
--topic replayed-topic \
--dry-run
By default, shows up to 5 sample messages that would be replayed.
Use --dry-run-limit to adjust the number of preview messages.
--dry-run-limit 10
Adjust verbosity
--verbose # Show detailed logs, including skipped messages
--quiet # Suppress all output except errors and final summary
Example:
kafka-replay-cli replay \
--input test.parquet \
--topic replayed-topic \
--dry-run \
--verbose
Add timestamp and key filters
kafka-replay-cli replay \
--input test.parquet \
--topic replayed-topic \
--start-ts "2024-01-01T00:00:00Z" \
--end-ts "2024-01-02T00:00:00Z" \
--key-filter "user-123"
Add partition and offset filters
--partition 1 # Only replay messages from partition 1
--offset-start 1000 # Replay offsets >= 1000
--offset-end 2000 # Replay offsets <= 2000
Example:
kafka-replay-cli replay \
--input test.parquet \
--topic replayed-topic \
--partition 0 \
--offset-start 100 \
--offset-end 200
Control batch size
--batch-size 500
Controls how many messages are processed per batch during replay.
Transform Messages Before Replay
You can modify, enrich, or skip Kafka messages during replay by passing a custom Python script that defines a transform(msg) function.
Basic Example
File: hooks/example_transform.py
def transform(msg):
if msg["value"]:
msg["value"] = msg["value"].upper()
return msg
Run with:
kafka-replay-cli replay \
--input messages.parquet \
--topic replayed-topic \
--transform-script hooks/example_transform.py
Skip Messages
If your function returns None, the message will be skipped.
Use --verbose to see skip notices.
def transform(msg):
if b'"event":"login"' not in msg["value"]:
return None
return msg
Message Format
Each msg is a dictionary:
{
"timestamp": datetime,
"key": bytes,
"value": bytes,
"partition": int,
"offset": int
}
You can modify key and value, or add additional fields.
Query Messages with DuckDB
Run SQL queries directly on dumped Parquet files:
kafka-replay-cli query \
--input test.parquet \
--sql "SELECT timestamp, CAST(key AS VARCHAR) FROM input WHERE CAST(value AS VARCHAR) LIKE '%login%'"
Note: Kafka key and value are stored as binary (BLOB) for fidelity.
To search or filter them, use CAST(... AS VARCHAR).
Output query results to file
kafka-replay-cli query \
--input test.parquet \
--sql "SELECT key FROM input" \
--output results.json
License
MIT
This project is not affiliated with or endorsed by the Apache Kafka project.
Maintainer
Konstantinas Mamonas
Feel free to fork, open issues, or suggest improvements.
Version
Use:
kafka-replay-cli version
to check the installed version.
Roadmap
See the ROADMAP for upcoming features and plans.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kafka_replay_cli-0.4.0.tar.gz.
File metadata
- Download URL: kafka_replay_cli-0.4.0.tar.gz
- Upload date:
- Size: 14.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fba62f56837cb112aac32be5991f0ed6fe9763d17ceb9302f634b495db97cdc0
|
|
| MD5 |
e8c79560f2b74af47631d1fa50482bd0
|
|
| BLAKE2b-256 |
7f6d83b9fc7e9cf50570af037ecec75e16e1093cbf20bca30e12009594969f31
|
File details
Details for the file kafka_replay_cli-0.4.0-py3-none-any.whl.
File metadata
- Download URL: kafka_replay_cli-0.4.0-py3-none-any.whl
- Upload date:
- Size: 10.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f702cd2c30f12b6e0f38dc58db2290df57d467092cd68cad23603ff81c3fa5f5
|
|
| MD5 |
ae07e9a0577024627d0113b8789740c0
|
|
| BLAKE2b-256 |
a05009d4c1b01de1a80485189e034f6f1484b86b29cb984962fbc0f170cd0e74
|