Skip to main content

A CLI tool to dump and replay Kafka messages using Parquet

Project description

kafka-replay-cli

A lightweight, CLI tool for dumping and replaying Kafka messages using Parquet files. Built for observability, debugging, and safe testing of event streams.


Features

  • Dump Kafka topics into Parquet files
  • Replay messages from Parquet back into Kafka
  • Filter replays by timestamp range and key
  • Optional throttling during replay
  • Apply custom transform hooks to modify or skip messages
  • Preview replays without sending messages using --dry-run
  • Control output verbosity with --verbose and --quiet
  • Query message dumps with DuckDB SQL

Installation

pip install kafka-replay-cli

Requires Python 3.8 or newer.


Kafka Broker Requirements

You must have access to a running Kafka broker.

By default, the CLI will attempt to connect to localhost:9092, but you can specify any broker using the --bootstrap-servers option:

--bootstrap-servers my.kafka.broker:9092

Usage

Dump messages from a topic to Parquet

kafka-replay-cli dump \
  --topic test-topic \
  --output test.parquet \
  --bootstrap-servers localhost:9092 \
  --max-messages 1000

Replay messages from a Parquet file

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --bootstrap-servers localhost:9092 \
  --throttle-ms 100

Preview messages without sending (--dry-run)

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --dry-run

By default, shows up to 5 sample messages that would be replayed. Use --dry-run-limit to adjust the number of preview messages.

--dry-run-limit 10

Adjust verbosity

--verbose  # Show detailed logs, including skipped messages
--quiet    # Suppress all output except errors and final summary

Example:

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --dry-run \
  --verbose

Add timestamp and key filters

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --start-ts "2024-01-01T00:00:00Z" \
  --end-ts "2024-01-02T00:00:00Z" \
  --key-filter "user-123"

Add partition and offset filters

--partition 1            # Only replay messages from partition 1
--offset-start 1000      # Replay offsets >= 1000
--offset-end 2000        # Replay offsets <= 2000

Example:

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --partition 0 \
  --offset-start 100 \
  --offset-end 200

Control batch size

--batch-size 500

Controls how many messages are processed per batch during replay.


Transform Messages Before Replay

You can modify, enrich, or skip Kafka messages during replay by passing a custom Python script that defines a transform(msg) function.

Basic Example

File: hooks/example_transform.py

def transform(msg):
    if msg["value"]:
        msg["value"] = msg["value"].upper()
    return msg

Run with:

kafka-replay-cli replay \
  --input messages.parquet \
  --topic replayed-topic \
  --transform-script hooks/example_transform.py

Skip Messages

If your function returns None, the message will be skipped.
Use --verbose to see skip notices.

def transform(msg):
    if b'"event":"login"' not in msg["value"]:
        return None
    return msg

Message Format

Each msg is a dictionary:

{
  "timestamp": datetime,
  "key": bytes,
  "value": bytes,
  "partition": int,
  "offset": int
}

You can modify key and value, or add additional fields.


Query Messages with DuckDB

Run SQL queries directly on dumped Parquet files:

kafka-replay-cli query \
  --input test.parquet \
  --sql "SELECT timestamp, CAST(key AS VARCHAR) FROM input WHERE CAST(value AS VARCHAR) LIKE '%login%'"

Note: Kafka key and value are stored as binary (BLOB) for fidelity.
To search or filter them, use CAST(... AS VARCHAR).


Output query results to file

kafka-replay-cli query \
  --input test.parquet \
  --sql "SELECT key FROM input" \
  --output results.json

License

MIT

This project is not affiliated with or endorsed by the Apache Kafka project.


Maintainer

Konstantinas Mamonas

Feel free to fork, open issues, or suggest improvements.


Version

Use:

kafka-replay-cli version

to check the installed version.


Roadmap

See the ROADMAP for upcoming features and plans.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_replay_cli-0.3.0.tar.gz (13.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_replay_cli-0.3.0-py3-none-any.whl (9.6 kB view details)

Uploaded Python 3

File details

Details for the file kafka_replay_cli-0.3.0.tar.gz.

File metadata

  • Download URL: kafka_replay_cli-0.3.0.tar.gz
  • Upload date:
  • Size: 13.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.3

File hashes

Hashes for kafka_replay_cli-0.3.0.tar.gz
Algorithm Hash digest
SHA256 c4c1bfb70d55789303d2b346315565020f2c9081d1a248dec1800be527c2b994
MD5 fa8d7ef120c57db8b8707bb59bc0925a
BLAKE2b-256 f46b5312426069481baa76e7509f1b31e8221eda52f48d5478ea678a146d67e7

See more details on using hashes here.

File details

Details for the file kafka_replay_cli-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for kafka_replay_cli-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 eceacc19c06d266c984a80a5c986131d1db62552633a65f6c77310a423ee3252
MD5 6289d480c26082bdda020e910a56943d
BLAKE2b-256 4acc01d10ea646ea69a32ba40251ed3435239118e88827b075adcae269b6be1b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page