Skip to main content

A CLI tool to dump and replay Kafka messages using Parquet

Project description

kafka-replay-cli

A lightweight, CLI tool for dumping and replaying Kafka messages using Parquet files. Built for observability, debugging, and safe testing of event streams.

Use it to:

  • Safely test event streams without impacting production.
  • Debug complex Kafka flows by replaying exact message sets.
  • Apply filters, transformations, and throttling for precise control.

Features

  • Dump Kafka topics into Parquet files
  • Replay messages from Parquet back into Kafka
  • Filter replays by timestamp range and key
  • Optional throttling during replay
  • Apply custom transform hooks to modify or skip messages
  • Preview replays without sending messages using --dry-run
  • Control output verbosity with --verbose and --quiet
  • Query message dumps with DuckDB SQL

Installation

pip install kafka-replay-cli

Requires Python 3.8 or newer.


Kafka Broker Requirements

You must have access to a running Kafka broker.

By default, the CLI will attempt to connect to localhost:9092, but you can specify any broker using the --bootstrap-servers option:

--bootstrap-servers my.kafka.broker:9092

Usage

Dump messages from a topic to Parquet

kafka-replay-cli dump \
  --topic test-topic \
  --output test.parquet \
  --bootstrap-servers localhost:9092 \
  --max-messages 1000

Advanced Dump Settings

--fetch-max-bytes 1000000   # Max bytes to fetch from broker per poll

Replay messages from a Parquet file

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --bootstrap-servers localhost:9092 \
  --throttle-ms 100 

Advanced Producer Settings

You can fine-tune how the replay produces messages to Kafka:

--acks all                 # Wait for all replicas to acknowledge (0, 1, or all)
--compression-type gzip    # Compress messages (gzip, snappy, lz4, zstd)
--linger-ms 10             # Delay to batch more messages (in milliseconds)
--producer-batch-size 5000 # Max batch size in bytes for Kafka producer

Preview messages without sending (--dry-run)

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --dry-run

By default, shows up to 5 sample messages that would be replayed. Use --dry-run-limit to adjust the number of preview messages.

--dry-run-limit 10

Adjust verbosity

--verbose  # Show detailed logs, including skipped messages
--quiet    # Suppress all output except errors and final summary

Example:

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --dry-run \
  --verbose

Add timestamp and key filters

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --start-ts "2024-01-01T00:00:00Z" \
  --end-ts "2024-01-02T00:00:00Z" \
  --key-filter "user-123"

Add partition and offset filters

--partition 1            # Only replay messages from partition 1
--offset-start 1000      # Replay offsets >= 1000
--offset-end 2000        # Replay offsets <= 2000

Example:

kafka-replay-cli replay \
  --input test.parquet \
  --topic replayed-topic \
  --partition 0 \
  --offset-start 100 \
  --offset-end 200

Control batch size

--batch-size 500

Controls how many messages are processed per batch during replay.


Transform Messages Before Replay

You can modify, enrich, or skip Kafka messages during replay by passing a custom Python script that defines a transform(msg) function.

Basic Example

File: hooks/example_transform.py

def transform(msg):
    if msg["value"]:
        msg["value"] = msg["value"].upper()
    return msg

Run with:

kafka-replay-cli replay \
  --input messages.parquet \
  --topic replayed-topic \
  --transform-script hooks/example_transform.py

Skip Messages

If your function returns None, the message will be skipped.
Use --verbose to see skip notices.

def transform(msg):
    if b'"event":"login"' not in msg["value"]:
        return None
    return msg

Message Format

Each msg is a dictionary:

{
  "timestamp": datetime,
  "key": bytes,
  "value": bytes,
  "partition": int,
  "offset": int
}

You can modify key and value, or add additional fields.


Query Messages with DuckDB

Run SQL queries directly on dumped Parquet files:

kafka-replay-cli query \
  --input test.parquet \
  --sql "SELECT timestamp, CAST(key AS VARCHAR) FROM input WHERE CAST(value AS VARCHAR) LIKE '%login%'"

Note: Kafka key and value are stored as binary (BLOB) for fidelity.
To search or filter them, use CAST(... AS VARCHAR).


Output query results to file

kafka-replay-cli query \
  --input test.parquet \
  --sql "SELECT key FROM input" \
  --output results.json

License

MIT

This project is not affiliated with or endorsed by the Apache Kafka project.


Maintainer

Konstantinas Mamonas

Feel free to fork, open issues, or suggest improvements.


Version

Use:

kafka-replay-cli version

to check the installed version.


Roadmap

See the ROADMAP for upcoming features and plans.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_replay_cli-0.4.0.tar.gz (14.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_replay_cli-0.4.0-py3-none-any.whl (10.8 kB view details)

Uploaded Python 3

File details

Details for the file kafka_replay_cli-0.4.0.tar.gz.

File metadata

  • Download URL: kafka_replay_cli-0.4.0.tar.gz
  • Upload date:
  • Size: 14.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.3

File hashes

Hashes for kafka_replay_cli-0.4.0.tar.gz
Algorithm Hash digest
SHA256 fba62f56837cb112aac32be5991f0ed6fe9763d17ceb9302f634b495db97cdc0
MD5 e8c79560f2b74af47631d1fa50482bd0
BLAKE2b-256 7f6d83b9fc7e9cf50570af037ecec75e16e1093cbf20bca30e12009594969f31

See more details on using hashes here.

File details

Details for the file kafka_replay_cli-0.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for kafka_replay_cli-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f702cd2c30f12b6e0f38dc58db2290df57d467092cd68cad23603ff81c3fa5f5
MD5 ae07e9a0577024627d0113b8789740c0
BLAKE2b-256 a05009d4c1b01de1a80485189e034f6f1484b86b29cb984962fbc0f170cd0e74

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page