Materialize a Kafka topic into an in-memory, compacted dict — a GlobalKTable for asyncio Python.
Project description
ktables
Materialize a Kafka topic into an in-memory, compacted dict — a GlobalKTable for asyncio Python.
Every process that opens a KafkaTable replays the topic from the beginning
into a local read-only mapping, then keeps consuming for live updates; a
KafkaTableWriter maintains the topic with keyed upserts and tombstones.
Built for small, broadly-needed reference data — service registries,
capability advertisements, feature flags, config maps — not for large or
high-churn state.
Table of Contents
Background
Kafka Streams (JVM) has two table abstractions over changelog topics: the
partition-sharded KTable, where each application instance holds a slice of
the keys, and GlobalKTable, where every instance bootstraps and maintains a
full local copy — the right shape for lookup data that any instance may need
at any moment. The Python ecosystem has several maintained stream-processing
frameworks, but all of them implement only the sharded shape, with
framework-owned changelog topics and their own process runtimes. ktables fills
the gap with just the global-table piece, as a plain asyncio library over
aiokafka: your topic, your message format, your event loop.
The Kafka semantics the implementation relies on (group-less consumers,
catch-up gating against end offsets, compaction independence) are documented,
with provenance, in the module docstring of
kafka_table.py.
Install
ktables is not yet published to PyPI. Until the first release, vendor the
package — copy the ktables/ directory into your project — and install its
single runtime dependency:
$ pip install aiokafka
Requires Python 3.10+. Pydantic is not required — the .json() presets
accept any class with pydantic-v2's JSON methods.
Usage
Maintain a registry from one service:
from ktables import KafkaTableWriter
writer = KafkaTableWriter.json(
bootstrap_servers="localhost:9092", topic="my.registry", model=ServiceRecord
)
async with writer:
await writer.set("billing", record) # upsert (broker-acked)
...
await writer.delete("billing") # tombstone: removes the key
Consume it from any other process:
from ktables import KafkaTable
table = KafkaTable.json(
bootstrap_servers="localhost:9092", topic="my.registry", model=ServiceRecord
)
async with table: # replays the topic; returns once caught up
record = table.get("billing")
if table.status != "caught_up": # "degraded": catch-up timed out
...
Non-pydantic payloads: construct directly with your own codecs —
KafkaTable(..., value_decoder=bytes_to_value) /
KafkaTableWriter(..., value_encoder=value_to_bytes).
Removing a key on clean shutdown
There is deliberately no delete_on_close option (shutdown-time deletion is
application policy, and no library can promise it on a crash). Compose it:
async with writer:
await writer.set(my_key, my_record)
try:
... # serve
finally:
await writer.delete(my_key) # acked before the producer stops
Locked-down clusters
Both classes ensure their topic exists at start (idempotent create,
compacted). If the application lacks topic-create ACLs, pass
ensure_topic=False and create the topic out-of-band (the module-level
ensure_topic() function is the deploy-time primitive).
Consistency contract
KafkaTable is eventually consistent. Precisely:
- When
start()/async withreturns, contents are complete as of the topic's end offsets at start time — unlessstatus == "degraded"(catch-up timed out; data may be partial). - Thereafter, updates appear within milliseconds of the broker write — but
there is no read-your-own-writes: after
await writer.set(k, v), a table in the same process may briefly still return the old value. - Contents are stable between your awaits (single event loop; only the
reader task mutates). Use
snapshot()for a copy held across awaits. - Correctness does not depend on broker-side compaction: last-write-wins over the full log yields the same dict; compaction only bounds replay time.
A tombstone is a record with a null value (b"" is data, not a tombstone).
If the background reader dies (non-retriable error, e.g. authorization),
contents freeze at the last applied state: status becomes "failed" and
failure holds the exception — gate liveness decisions on status, never on
reads alone. Transient broker outages do not kill the reader; it resumes.
API
KafkaTable[V] — read-only Mapping[str, V]
| Member | Description |
|---|---|
KafkaTable(*, bootstrap_servers, topic, value_decoder, key_decoder=utf-8, catchup_timeout=30.0, poll_timeout_ms=200, ensure_topic=True, topic_configs=None) |
Construct (does not connect). |
KafkaTable.json(*, bootstrap_servers, topic, model, **kwargs) |
Preset wiring model.model_validate_json as the decoder. |
start() / stop() / async with |
Lifecycle. start() raises on double-start, missing topic, or reader death during catch-up; on catch-up timeout it serves degraded. |
table[key], key in table, iter, len, .get(key, default=None) |
Mapping reads. Raise RuntimeError before start(). |
snapshot() |
Shallow-copy dict, safe to hold across awaits. |
status |
"unstarted" | "loading" | "caught_up" | "degraded" | "failed". |
failure |
Exception that killed the reader, else None. |
is_caught_up / wait_until_caught_up(timeout=None) |
Catch-up gate; the wait returns False on timeout or reader death. |
stats |
Frozen ViewStats snapshot (see below). |
Equality is identity and instances are hashable: a running table is a resource handle, not a value.
KafkaTableWriter[V]
| Member | Description |
|---|---|
KafkaTableWriter(*, bootstrap_servers, topic, value_encoder, key_encoder=utf-8, ensure_topic=True, topic_configs=None, enable_idempotence=True) |
Construct. Idempotence implies acks=all (registry-grade durability); opt out for throwaway data. |
KafkaTableWriter.json(*, bootstrap_servers, topic, model=None, **kwargs) |
Preset encoding via model_dump_json() (model is typing-only). |
set(key, value) |
Keyed upsert; awaits broker ack. Re-set periodically as a heartbeat. |
delete(key) |
Publishes a null-value tombstone; awaits broker ack. |
start() / stop() / async with |
Lifecycle; set/delete before start raise RuntimeError. |
The key encoder must be deterministic and stable across processes — on a multi-partition topic, per-key ordering holds only if a key always hashes to the same partition.
Module level
| Member | Description |
|---|---|
ensure_topic(bootstrap_servers, topic, *, num_partitions=1, replication_factor=1, topic_configs=None) -> bool |
Idempotent explicit create; True if this call created it. Defaults are dev-grade — production registries want RF≥3, min.insync.replicas=2, acks=all. |
DEFAULT_TOPIC_CONFIGS |
{"cleanup.policy": "compact"} (read-only mapping). |
ViewStats |
Frozen counters: records_applied, tombstones_applied, keyless_records, key_decode_errors, value_decode_errors, catch_up_seconds, replayed_at_catch_up. |
SupportsJsonModel |
Protocol the .json() presets require (model_dump_json / model_validate_json). |
TableStatus |
The status literal type. |
Contributing
Questions and bug reports are welcome as issues, and PRs are accepted. Please run the test suite before submitting:
$ pytest tests
Unit tests always run; integration tests need a Kafka broker on
localhost:9092 and skip otherwise
(docker run -d -p 9092:9092 apache/kafka:3.9.0).
License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ktables-0.1.0.tar.gz.
File metadata
- Download URL: ktables-0.1.0.tar.gz
- Upload date:
- Size: 39.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
541fc153aa223b4255a338b627f72184bccac6050a48363ac9cfd93c026d8ebc
|
|
| MD5 |
d9bb4c1490f03da1868cae6dcc13efe8
|
|
| BLAKE2b-256 |
9bfe3814f8a103d1d6690fdc7dbae6e81ed44663bb0b412e90bbd90721619d6e
|
Provenance
The following attestation bundles were made for ktables-0.1.0.tar.gz:
Publisher:
publish.yml on ryan-yuuu/ktables
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ktables-0.1.0.tar.gz -
Subject digest:
541fc153aa223b4255a338b627f72184bccac6050a48363ac9cfd93c026d8ebc - Sigstore transparency entry: 1776263815
- Sigstore integration time:
-
Permalink:
ryan-yuuu/ktables@e72adb94aaa4c24038f1a62f7eb5be36574d7dc0 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/ryan-yuuu
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e72adb94aaa4c24038f1a62f7eb5be36574d7dc0 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file ktables-0.1.0-py3-none-any.whl.
File metadata
- Download URL: ktables-0.1.0-py3-none-any.whl
- Upload date:
- Size: 13.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a84c6fab0307fdb3b409c440ba0c724ebe5455517279c9616e11fbc58c2ad97a
|
|
| MD5 |
5cbd3a67372a8a377f5a524184890eb2
|
|
| BLAKE2b-256 |
376cf2d03eaded4c637819649547bad1f3e9b2dde73b3dc738806b70af4437ac
|
Provenance
The following attestation bundles were made for ktables-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on ryan-yuuu/ktables
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ktables-0.1.0-py3-none-any.whl -
Subject digest:
a84c6fab0307fdb3b409c440ba0c724ebe5455517279c9616e11fbc58c2ad97a - Sigstore transparency entry: 1776263876
- Sigstore integration time:
-
Permalink:
ryan-yuuu/ktables@e72adb94aaa4c24038f1a62f7eb5be36574d7dc0 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/ryan-yuuu
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e72adb94aaa4c24038f1a62f7eb5be36574d7dc0 -
Trigger Event:
workflow_dispatch
-
Statement type: