Skip to main content

Reporting tool for kafka topics with retentions, sizes, earliest and latest message timestamps

Project description

A reporting tool for kafka topics.

Show any or all of the following attributes for each topic in a cluster:

  • sizes (DescribeLogDirsRequest)
  • retention policies
  • earliest/latest message timestamps (watermarks)

With larger teams and installations, misconfiguration often results in lots of resource waste. Auditing with this tool has helped.

Usage

All times should be UTC.

Configuration

A dict passed to confluent_kafka's consumer, so it should follow the same format except for group.id. It is automatically set. See tests/helper_files/env.json for a sample conf file. -h for help.

[!NOTE] Tested only with SASL_SSL and SCRAM_SHA_512 u/p on AWS MSK. See Addenda for details

CLI

% kafkareport ~/aws.json --csv report.csv
+----------------+-------+----------------------------------+----------------------------------+--------------+-----------------+---------------------+
| topic          | bytes | earliest                         | latest                           | retention.ms | retention.bytes | delete.retention.ms |
+----------------+-------+----------------------------------+----------------------------------+--------------+-----------------+---------------------+
| kafkareportdue | 2330  | 2024-06-30 20:04:44.486000+00:00 | 2024-06-30 20:04:46.041000+00:00 | 16800001     | 16800001        | 16800001            |
| kafkareportuno | 2268  | 2024-06-30 20:04:42.880000+00:00 | 2024-06-30 20:04:44.431000+00:00 | 16800001     | 16800001        | 16800001            |
+----------------+-------+----------------------------------+----------------------------------+--------------+-----------------+---------------------+

% cat report.csv
topic,bytes,earliest,latest,retention.ms,retention.bytes,delete.retention.ms
kafkareportdue,2330,2024-06-30 20:04:44.486000+00:00,2024-06-30 20:04:46.041000+00:00,16800001,16800001,16800001
kafkareportuno,2268,2024-06-30 20:04:42.880000+00:00,2024-06-30 20:04:44.431000+00:00,16800001,16800001,16800001

Lib

>>> from kafkareport import KafkaReport

>>> conf = {"bootstrap.servers": "localhost:9092", "ssl.endpoint.identification.algorithm": "none"}

>>> report = KafkaReport(conf)

>>> report.get_topicnames()
dict_keys(['kafkareportdue', 'kafkareportuno', '__consumer_offsets'])

>>> for topic in report.get_topicnames():
...     print(topic)
...     report.retentions(topic)
...     report.watermarks(topic)
...
kafkareportdue
{'retention.ms': 16800001, 'retention.bytes': 16800001, 'delete.retention.ms': 16800001}
{'earliest': datetime.datetime(2024, 6, 30, 20, 8, 57, 554000, tzinfo=datetime.timezone.utc), 'latest': datetime.datetime(2024, 6, 30, 20, 8, 59, 99000, tzinfo=datetime.timezone.utc)}
kafkareportuno
{'retention.ms': 16800001, 'retention.bytes': 16800001, 'delete.retention.ms': 16800001}
{'earliest': datetime.datetime(2024, 6, 30, 20, 8, 55, 975000, tzinfo=datetime.timezone.utc), 'latest': datetime.datetime(2024, 6, 30, 20, 8, 57, 503000, tzinfo=datetime.timezone.utc)}
__consumer_offsets
{'retention.ms': 604800000, 'retention.bytes': -1, 'delete.retention.ms': 86400000}
{'earliest': '', 'latest': ''}

>>> report.topic_sizes()
[{'topic': 'kafkareportuno', 'bytes': 2690}, {'topic': 'kafkareportdue', 'bytes': 2328}, {'topic': '__consumer_offsets', 'bytes': 0}]

>>> report.watermarks("kafkareportuno")
{'earliest': datetime.datetime(2024, 6, 30, 20, 8, 55, 975000, tzinfo=datetime.timezone.utc), 'latest': datetime.datetime(2024, 6, 30, 20, 8, 57, 503000, tzinfo=datetime.timezone.utc)}

Development

This is a poetry project, so it should be butter once you get that sorted. Install pre-commit for black on commit, lint on push. Couldn't figure pytype into pre-commit.

Testing

Testing uses localstack, as you can see in the Github actions. docker-compose up should do the trick.

pytest can use --conf=/some/file.json instead of default for localstack. This will manipulate pytest.topics on the kafka servers.

Testing Gotchas

  • Occasionally, e.g. on laptop wake, the kafka container will be in a weird state. docker-compose down --remove-orphans && docker-compose up and always did the trick.

Addenda

  • Watermarks use a thread for each topic partition, but it can still take a while. -v for gory details along the way, KafkaReport(debug=True) for the lib.
  • This project started with confluent_kafka. At time of first writing, neither kafka-python nor confluent_kafka implemented the DescribeLogDirsRequest, available in java. A recent PR for kafka-python supports it, so I used that experimental code. Not pretty, but it works.
    • As a result, there is a janky confluent_kafka to kafka-python AdminClient conf map in logdirs.doit(). It's only tested for sasl auth in MSK on AWS.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafkareport-0.0.0.tar.gz (10.8 kB view hashes)

Uploaded Source

Built Distribution

kafkareport-0.0.0-py3-none-any.whl (11.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page