Skip to main content

Reporting tool for kafka topics with retentions, sizes, earliest and latest message timestamps

Project description

A reporting tool for kafka topics.

Show any or all of the following attributes for each topic in a cluster:

  • sizes (DescribeLogDirsRequest)
  • retention policies
  • earliest/latest message timestamps (watermarks)

With larger teams and installations, misconfiguration often results in lots of resource waste. Auditing with this tool has helped.

Installation

pip install kafkareport

Usage

All times should be UTC.

Configuration

Json file of a dict passed to confluent_kafka's consumer, so it should follow the same format except for group.id. It is automatically set. See tests/helper_files/env.json for a sample conf file.

[!NOTE] Tested only with SASL_SSL and SCRAM_SHA_512 u/p on AWS MSK. See Addenda for details

CLI

-h for help.

% kafkareport ~/aws.json --csv report.csv
+----------------+-------+----------------------------------+----------------------------------+--------------+-----------------+---------------------+
| topic          | bytes | earliest                         | latest                           | retention.ms | retention.bytes | delete.retention.ms |
+----------------+-------+----------------------------------+----------------------------------+--------------+-----------------+---------------------+
| kafkareportdue | 2330  | 2024-06-30 20:04:44.486000+00:00 | 2024-06-30 20:04:46.041000+00:00 | 16800001     | 16800001        | 16800001            |
| kafkareportuno | 2268  | 2024-06-30 20:04:42.880000+00:00 | 2024-06-30 20:04:44.431000+00:00 | 16800001     | 16800001        | 16800001            |
+----------------+-------+----------------------------------+----------------------------------+--------------+-----------------+---------------------+

% cat report.csv
topic,bytes,earliest,latest,retention.ms,retention.bytes,delete.retention.ms
kafkareportdue,2330,2024-06-30 20:04:44.486000+00:00,2024-06-30 20:04:46.041000+00:00,16800001,16800001,16800001
kafkareportuno,2268,2024-06-30 20:04:42.880000+00:00,2024-06-30 20:04:44.431000+00:00,16800001,16800001,16800001

Lib

Docs

>>> from kafkareport import KafkaReport

>>> conf = {"bootstrap.servers": "localhost:9092", "ssl.endpoint.identification.algorithm": "none"}

>>> report = KafkaReport(conf)

>>> report.get_topicnames()
dict_keys(['kafkareportdue', 'kafkareportuno', '__consumer_offsets'])

>>> for topic in report.get_topicnames():
...     print(topic)
...     report.retentions(topic)
...     report.watermarks(topic)
...
kafkareportdue
{'retention.ms': 16800001, 'retention.bytes': 16800001, 'delete.retention.ms': 16800001}
{'earliest': datetime.datetime(2024, 6, 30, 20, 8, 57, 554000, tzinfo=datetime.timezone.utc), 'latest': datetime.datetime(2024, 6, 30, 20, 8, 59, 99000, tzinfo=datetime.timezone.utc)}
kafkareportuno
{'retention.ms': 16800001, 'retention.bytes': 16800001, 'delete.retention.ms': 16800001}
{'earliest': datetime.datetime(2024, 6, 30, 20, 8, 55, 975000, tzinfo=datetime.timezone.utc), 'latest': datetime.datetime(2024, 6, 30, 20, 8, 57, 503000, tzinfo=datetime.timezone.utc)}
__consumer_offsets
{'retention.ms': 604800000, 'retention.bytes': -1, 'delete.retention.ms': 86400000}
{'earliest': '', 'latest': ''}

>>> report.topic_sizes()
[{'topic': 'kafkareportuno', 'bytes': 2690}, {'topic': 'kafkareportdue', 'bytes': 2328}, {'topic': '__consumer_offsets', 'bytes': 0}]

>>> report.watermarks("kafkareportuno")
{'earliest': datetime.datetime(2024, 6, 30, 20, 8, 55, 975000, tzinfo=datetime.timezone.utc), 'latest': datetime.datetime(2024, 6, 30, 20, 8, 57, 503000, tzinfo=datetime.timezone.utc)}

Development

This is a poetry project, so it should be butter once you get that sorted. Install pre-commit for black on commit, lint on push. Couldn't figure pytype into pre-commit.

pre-commit install --hook-type pre-push for lint pre-push.

Testing

Testing runs against kafka/zookeper containers, as you can see in the Github actions. docker-compose up should do the trick.

pytest can use --conf=/some/file.json instead of default for localstack. This will manipulate pytest.topics on the kafka servers.

Testing Gotchas

  • Occasionally, e.g. on laptop wake, the kafka container will be in a weird state. docker-compose down --remove-orphans && docker-compose up always did the trick.

Addenda

  • Watermarks use a thread for each topic partition, but it can still take a while. -v for gory details along the way, KafkaReport(debug=True) for the lib.
  • This project started with confluent_kafka. At time of first writing, neither kafka-python nor confluent_kafka implemented the DescribeLogDirsRequest, available in java. A recent PR for kafka-python supports it, so I used that experimental code. Not pretty, but it works.
    • As a result, there is a janky confluent_kafka to kafka-python AdminClient conf map in logdirs.doit(). It's only tested for sasl auth in MSK on AWS.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafkareport-0.0.2.tar.gz (11.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafkareport-0.0.2-py3-none-any.whl (11.3 kB view details)

Uploaded Python 3

File details

Details for the file kafkareport-0.0.2.tar.gz.

File metadata

  • Download URL: kafkareport-0.0.2.tar.gz
  • Upload date:
  • Size: 11.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.11.9 Darwin/23.5.0

File hashes

Hashes for kafkareport-0.0.2.tar.gz
Algorithm Hash digest
SHA256 81c3057a352e27a66564d6685dfa8d9ab5481aa722c67821f03e24c1963a151e
MD5 7fac240d0805f14bfce3e78183261a5f
BLAKE2b-256 bd36df9a397a764fdf329d82916cff579a849482eb3bfa81767b16d6f8f7bec8

See more details on using hashes here.

File details

Details for the file kafkareport-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: kafkareport-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 11.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.11.9 Darwin/23.5.0

File hashes

Hashes for kafkareport-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 33b2ea4902af5698059b090bf52440c63358058b9a17186bde6789c5c1351158
MD5 de164778b048e821d1c102a8bca9b063
BLAKE2b-256 9cd5287ffb94843ed75b4428c4cdcc8f54b728391ea00744c687975154edd27f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page