Skip to main content

A CLI tool to dump and replay Kafka messages using Parquet

Project description

kafka-replay-cli

A lightweight, CLI tool for dumping and replaying Kafka messages using Parquet files. Built for observability, debugging, and safe testing of event streams.


Features

  • Dump Kafka topics into Parquet files
  • Replay messages from Parquet back into Kafka
  • Filter replays by timestamp range and key
  • Optional throttling during replay
  • Apply custom transform hooks to modify or skip messages
  • Preview replays without sending messages using --dry-run
  • Control output verbosity with --verbose and --quiet
  • Query message dumps with DuckDB SQL

Installation

pip install kafka-replay-cli

Requires Python 3.8 or newer.


Kafka Broker Requirements

You must have access to a running Kafka broker.

By default, the CLI will attempt to connect to localhost:9092, but you can specify any broker using the --bootstrap-servers option:

--bootstrap-servers my.kafka.broker:9092

Usage

Dump messages from a topic to Parquet

kafka-replay-cli dump \
  --topic test-topic \
  --output test.parquet \
  --bootstrap-servers localhost:9092 \
  --max-messages 1000

Replay messages from a Parquet file

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --bootstrap-servers localhost:9092 \
  --throttle-ms 100

Preview messages without sending (--dry-run)

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --dry-run

By default, shows up to 5 sample messages that would be replayed.


Adjust verbosity

--verbose  # Show detailed logs, including skipped messages
--quiet    # Suppress all output except errors and final summary

Example:

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --dry-run \
  --verbose

Add timestamp and key filters

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --start-ts "2024-01-01T00:00:00Z" \
  --end-ts "2024-01-02T00:00:00Z" \
  --key-filter "user-123"

Transform Messages Before Replay

You can modify, enrich, or skip Kafka messages during replay by passing a custom Python script that defines a transform(msg) function.

Basic Example

File: hooks/example_transform.py

def transform(msg):
    if msg["value"]:
        msg["value"] = msg["value"].upper()
    return msg

Run with:

kafka-replay-cli replay \
  --input messages.parquet \
  --topic replayed-topic \
  --transform-script hooks/example_transform.py

Skip Messages

If your function returns None, the message will be skipped.
Use --verbose to see skip notices.

def transform(msg):
    if b'"event":"login"' not in msg["value"]:
        return None
    return msg

Message Format

Each msg is a dictionary:

{
  "timestamp": datetime,
  "key": bytes,
  "value": bytes,
  "partition": int,
  "offset": int
}

You can modify key and value, or add additional fields.


Query Messages with DuckDB

Run SQL queries directly on dumped Parquet files:

kafka-replay-cli query \
  --input test.parquet \
  --sql "SELECT timestamp, CAST(key AS VARCHAR) FROM input WHERE CAST(value AS VARCHAR) LIKE '%login%'"

Note: Kafka key and value are stored as binary (BLOB) for fidelity.
To search or filter them, use CAST(... AS VARCHAR).


Output query results to file

kafka-replay-cli query \
  --input test.parquet \
  --sql "SELECT key FROM input" \
  --output results.json

License

MIT

This project is not affiliated with or endorsed by the Apache Kafka project.


Maintainer

Konstantinas Mamonas

Feel free to fork, open issues, or suggest improvements.


Version

Use:

kafka-replay-cli version

to check the installed version.


Roadmap

See the ROADMAP for upcoming features and plans.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_replay_cli-0.2.0.tar.gz (9.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_replay_cli-0.2.0-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file kafka_replay_cli-0.2.0.tar.gz.

File metadata

  • Download URL: kafka_replay_cli-0.2.0.tar.gz
  • Upload date:
  • Size: 9.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.3

File hashes

Hashes for kafka_replay_cli-0.2.0.tar.gz
Algorithm Hash digest
SHA256 c1733df64fcea3434d8ea4582f4abef2875ae5cbb0481de9d4300e60cacbdeb8
MD5 2a0bf4b97e18360b03bd55ea1f8aeff6
BLAKE2b-256 5412a45fc8d365fcedc9842c67605688d09c743915709cf5d2a6987bedc15819

See more details on using hashes here.

File details

Details for the file kafka_replay_cli-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for kafka_replay_cli-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ad9e6e69a2dcfc3907dde9535be68fbb8c6dd24deba092eeea2046b5022d4eb5
MD5 136219bbf0d9c3afb61b3569d11d9821
BLAKE2b-256 c3261e0f11277a2e062b0ddfedbd1c75f88cc90931a0a2259986bbd3244acf2e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page