Skip to main content

Materialize a Kafka topic into an in-memory, compacted dict — a GlobalKTable for asyncio Python.

Project description

ktables

Tests Python 3.10+ License: MIT

Materialize a Kafka topic into an in-memory, compacted dict — a GlobalKTable for asyncio Python.

Every process that opens a KafkaTable replays the topic from the beginning into a local read-only mapping, then keeps consuming for live updates; a KafkaTableWriter maintains the topic with keyed upserts and tombstones. Built for small, broadly-needed reference data — service registries, capability advertisements, feature flags, config maps — not for large or high-churn state.

Table of Contents

Background

Kafka Streams (JVM) has two table abstractions over changelog topics: the partition-sharded KTable, where each application instance holds a slice of the keys, and GlobalKTable, where every instance bootstraps and maintains a full local copy — the right shape for lookup data that any instance may need at any moment. The Python ecosystem has several maintained stream-processing frameworks, but all of them implement only the sharded shape, with framework-owned changelog topics and their own process runtimes. ktables fills the gap with just the global-table piece, as a plain asyncio library over aiokafka: your topic, your message format, your event loop.

The Kafka semantics the implementation relies on (group-less consumers, catch-up gating against end offsets, compaction independence) are documented, with provenance, in the module docstring of kafka_table.py.

Install

ktables is not yet published to PyPI. Until the first release, vendor the package — copy the ktables/ directory into your project — and install its single runtime dependency:

$ pip install aiokafka

Requires Python 3.10+. Pydantic is not required — the .json() presets accept any class with pydantic-v2's JSON methods.

Usage

Maintain a registry from one service:

from ktables import KafkaTableWriter

writer = KafkaTableWriter.json(
    bootstrap_servers="localhost:9092", topic="my.registry", model=ServiceRecord
)
async with writer:
    await writer.set("billing", record)      # upsert (broker-acked)
    ...
    await writer.delete("billing")           # tombstone: removes the key

Consume it from any other process:

from ktables import KafkaTable

table = KafkaTable.json(
    bootstrap_servers="localhost:9092", topic="my.registry", model=ServiceRecord
)
async with table:               # replays the topic; returns once caught up
    record = table.get("billing")
    if table.status != "caught_up":   # "degraded": catch-up timed out
        ...

Non-pydantic payloads: construct directly with your own codecs — KafkaTable(..., value_decoder=bytes_to_value) / KafkaTableWriter(..., value_encoder=value_to_bytes).

Removing a key on clean shutdown

There is deliberately no delete_on_close option (shutdown-time deletion is application policy, and no library can promise it on a crash). Compose it:

async with writer:
    await writer.set(my_key, my_record)
    try:
        ...  # serve
    finally:
        await writer.delete(my_key)   # acked before the producer stops

Locked-down clusters

Both classes ensure their topic exists at start (idempotent create, compacted). If the application lacks topic-create ACLs, pass ensure_topic=False and create the topic out-of-band (the module-level ensure_topic() function is the deploy-time primitive).

Consistency contract

KafkaTable is eventually consistent. Precisely:

  1. When start() / async with returns, contents are complete as of the topic's end offsets at start time — unless status == "degraded" (catch-up timed out; data may be partial).
  2. Thereafter, updates appear within milliseconds of the broker write — but there is no read-your-own-writes: after await writer.set(k, v), a table in the same process may briefly still return the old value.
  3. Contents are stable between your awaits (single event loop; only the reader task mutates). Use snapshot() for a copy held across awaits.
  4. Correctness does not depend on broker-side compaction: last-write-wins over the full log yields the same dict; compaction only bounds replay time.

A tombstone is a record with a null value (b"" is data, not a tombstone). If the background reader dies (non-retriable error, e.g. authorization), contents freeze at the last applied state: status becomes "failed" and failure holds the exception — gate liveness decisions on status, never on reads alone. Transient broker outages do not kill the reader; it resumes.

API

KafkaTable[V] — read-only Mapping[str, V]

Member Description
KafkaTable(*, bootstrap_servers, topic, value_decoder, key_decoder=utf-8, catchup_timeout=30.0, poll_timeout_ms=200, ensure_topic=True, topic_configs=None) Construct (does not connect).
KafkaTable.json(*, bootstrap_servers, topic, model, **kwargs) Preset wiring model.model_validate_json as the decoder.
start() / stop() / async with Lifecycle. start() raises on double-start, missing topic, or reader death during catch-up; on catch-up timeout it serves degraded.
table[key], key in table, iter, len, .get(key, default=None) Mapping reads. Raise RuntimeError before start().
snapshot() Shallow-copy dict, safe to hold across awaits.
status "unstarted" | "loading" | "caught_up" | "degraded" | "failed".
failure Exception that killed the reader, else None.
is_caught_up / wait_until_caught_up(timeout=None) Catch-up gate; the wait returns False on timeout or reader death.
stats Frozen ViewStats snapshot (see below).

Equality is identity and instances are hashable: a running table is a resource handle, not a value.

KafkaTableWriter[V]

Member Description
KafkaTableWriter(*, bootstrap_servers, topic, value_encoder, key_encoder=utf-8, ensure_topic=True, topic_configs=None, enable_idempotence=True) Construct. Idempotence implies acks=all (registry-grade durability); opt out for throwaway data.
KafkaTableWriter.json(*, bootstrap_servers, topic, model=None, **kwargs) Preset encoding via model_dump_json() (model is typing-only).
set(key, value) Keyed upsert; awaits broker ack. Re-set periodically as a heartbeat.
delete(key) Publishes a null-value tombstone; awaits broker ack.
start() / stop() / async with Lifecycle; set/delete before start raise RuntimeError.

The key encoder must be deterministic and stable across processes — on a multi-partition topic, per-key ordering holds only if a key always hashes to the same partition.

Module level

Member Description
ensure_topic(bootstrap_servers, topic, *, num_partitions=1, replication_factor=1, topic_configs=None) -> bool Idempotent explicit create; True if this call created it. Defaults are dev-grade — production registries want RF≥3, min.insync.replicas=2, acks=all.
DEFAULT_TOPIC_CONFIGS {"cleanup.policy": "compact"} (read-only mapping).
ViewStats Frozen counters: records_applied, tombstones_applied, keyless_records, key_decode_errors, value_decode_errors, catch_up_seconds, replayed_at_catch_up.
SupportsJsonModel Protocol the .json() presets require (model_dump_json / model_validate_json).
TableStatus The status literal type.

Contributing

Questions and bug reports are welcome as issues, and PRs are accepted. Please run the test suite before submitting:

$ pytest tests

Unit tests always run; integration tests need a Kafka broker on localhost:9092 and skip otherwise (docker run -d -p 9092:9092 apache/kafka:3.9.0).

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ktables-0.1.0.tar.gz (39.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ktables-0.1.0-py3-none-any.whl (13.3 kB view details)

Uploaded Python 3

File details

Details for the file ktables-0.1.0.tar.gz.

File metadata

  • Download URL: ktables-0.1.0.tar.gz
  • Upload date:
  • Size: 39.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for ktables-0.1.0.tar.gz
Algorithm Hash digest
SHA256 541fc153aa223b4255a338b627f72184bccac6050a48363ac9cfd93c026d8ebc
MD5 d9bb4c1490f03da1868cae6dcc13efe8
BLAKE2b-256 9bfe3814f8a103d1d6690fdc7dbae6e81ed44663bb0b412e90bbd90721619d6e

See more details on using hashes here.

Provenance

The following attestation bundles were made for ktables-0.1.0.tar.gz:

Publisher: publish.yml on ryan-yuuu/ktables

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ktables-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: ktables-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 13.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for ktables-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a84c6fab0307fdb3b409c440ba0c724ebe5455517279c9616e11fbc58c2ad97a
MD5 5cbd3a67372a8a377f5a524184890eb2
BLAKE2b-256 376cf2d03eaded4c637819649547bad1f3e9b2dde73b3dc738806b70af4437ac

See more details on using hashes here.

Provenance

The following attestation bundles were made for ktables-0.1.0-py3-none-any.whl:

Publisher: publish.yml on ryan-yuuu/ktables

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page