Pure-Python async Kinesis producer. KPL-equivalent without a native daemon.
Project description
aiokpl
Pure-Python async Kinesis producer. KPL-equivalent without a native daemon.
v0.2 available — full pipeline, vendor-neutral metrics with first-party CloudWatch / OpenTelemetry / Datadog sinks, and a thread-safe
SyncProducerfor non-async callers. All exercised end-to-end againstkinesis-mock. See the Roadmap.
A library that respects the shard as the unit of optimization, measures time before size, and treats failures as user-visible information — not noise to hide.
Status
v0.2 — feature-complete. Full async pipeline (Producer →
Aggregator → Limiter → Collector → Sender → Retrier), opt-in
vendor-neutral metrics with first-party sinks, and a thread-safe
SyncProducer for non-async callers. Not on PyPI yet — install via the
repo URL.
| Phase | Scope | Status |
|---|---|---|
| 0 | Repo scaffolding, design docs | ✅ Done |
| 1 | Aggregation codec (KPL wire format) | ✅ Done |
| 2 | ShardMap + prediction | ✅ Done |
| 3 | Reducer, Aggregator, Collector | ✅ Done |
| 4 | Limiter + TokenBucket | ✅ Done |
| 5 | Sender + Retrier | ✅ Done |
| 6 | Producer + lifecycle (first usable release: v0.1) | ✅ Done |
| 7 | CloudWatch metrics (opt-in, default off) | ✅ Done |
| 8 | Sync bridge (SyncProducer) |
✅ Done |
Why this exists
AWS ships the official Kinesis Producer Library as a native C++ binary
(amazon-kinesis-producer) wrapped in Java/.NET sidecars. The Python ecosystem
has never had a real KPL — only:
aws-kinesis-agg— Python codec for the aggregation format. Useful, but not a producer. You still callboto3.put_recordsyourself.kiner,kinesis-python,kinesis-producer(ludia) — abandoned community attempts. All thin batchers over boto3.
aiokpl is a clean-room reimplementation in idiomatic async Python — built
on anyio so the same code runs on both asyncio and trio — that
preserves what's worth preserving from the C++ KPL (shard-aware pipeline,
deadline-driven batching, smart retry classification, byte-exact aggregation)
and drops what was an accident of C++ (IPC, named pipes, child process,
custom spinlocks, static binaries, packaging hell).
It is not a wrapper around the C++ binary. It is a reimplementation of its ideas in a language where you don't need a daemon.
Core principles
- The shard is the unit of optimization, not the stream.
- Predict before asking — the sharding algorithm is deterministic.
- Batching is governed by deadlines, not sizes.
- Each stage has one responsibility and one downstream callback.
- Failures are data, not exceptions.
- Bounded latency beats maximum throughput.
The full design rationale lives in CLAUDE.md.
Features
What v0.2 ships
- Backend-agnostic core via
anyio— codec, ShardMap, Reducer, Aggregator, Collector, Limiter, and TokenBucket all run on eitherasyncioortrio. TheProducer/SyncProducer/Sender/Retrier edge is asyncio-only becauseaiobotocoreis asyncio-only. - Async-first API built on
anyioand (for the network layer)aiobotocore. - Byte-exact KPL aggregation on the wire — KCL consumers deaggregate transparently.
- Shard prediction via
md5(partition_key)+ cachedListShards, O(log N) lookup withbisect. - Per-shard rate limiting with a multi-stream token bucket (1000 records/s + 1 MiB/s, matching Kinesis hard limits).
- Deadline-driven batching at two levels: UserRecord → AggregatedRecord and
AggregatedRecord →
PutRecordsbatch. - Smart retry classification distinguishing throttle, transient, wrong-shard (with split detection), and expired.
- Per-record attempt history returned to the caller — every retry is visible.
- Bounded backpressure via
max_outstanding_records. - Graceful shutdown via
async with+flush(). - Vendor-neutral metrics behind a
MetricsSinkProtocol. DefaultNullSinkis zero overhead. First-party sinks:InMemorySinkfor tests,CloudWatchSinkbundled (sinceaiobotocoreis already a dep), plusOpenTelemetrySinkandDatadogSinkbehind theaiokpl[otel]/aiokpl[datadog]extras. Metric names match the C++ KPL constants verbatim. - Synchronous bridge —
SyncProducerwraps the async core behindanyio.from_thread.start_blocking_portal()so Flask/Django/Jupyter callers can submit records without an event loop. Thread-safeput_record, boundedwait(timeout=)andflush(timeout=).
See the Non-goals section below for what aiokpl
deliberately doesn't do.
Intended usage
import anyio
from aiokpl import Producer, Config
async def main():
cfg = Config(
region="us-east-1",
aggregation_enabled=True,
record_max_buffered_time_ms=100,
record_ttl_ms=30_000,
fail_if_throttled=False,
)
async with Producer(cfg) as producer:
outcome = await producer.put_record(
stream="my-stream",
partition_key="user-123",
data=b"hello",
)
result = await outcome.wait()
if result.success:
print(result.shard_id, result.sequence_number)
else:
print("failed:", result.attempts[-1].error_code)
anyio.run(main) # works on asyncio (default) or pass backend="trio"
put_record() returns an awaitable future. It resolves when the record
reaches a terminal state — success or final failure — and the attempts
list always carries the full retry history.
Sync usage
For callers without an async event loop (Flask/Django handlers, scripts,
Jupyter), use SyncProducer:
from aiokpl import Config, SyncProducer
with SyncProducer(Config(region="us-east-1")) as producer:
outcome = producer.put_record(
stream="my-stream",
partition_key="user-123",
data=b"hello",
)
result = outcome.wait(timeout=5.0)
Under the hood it spins up an anyio event loop on a background thread via
anyio.from_thread.start_blocking_portal() and runs the async Producer
there. put_record is thread-safe; wait() and flush() accept timeouts.
See Phase 8 docs.
Architecture (target)
UserRecord
│ producer.put_record()
▼
Aggregator ──► AggregatedRecord (per predicted shard, deadline-driven)
▼
Limiter ──► throttled to 1000 rec/s + 1 MiB/s per shard
▼
Collector ──► PutRecordsBatch (500 records / 5 MiB / 256 KiB-per-shard)
▼
Sender ──► aiobotocore.put_records (async)
▼
Retrier ──► classify outcome (throttle / transient / wrong-shard / expired)
▼
finish_user_record → resolves the user's awaitable future
Same pipeline as the C++ KPL, in idiomatic anyio primitives. See
CLAUDE.md for the C++↔Python translation
table.
Roadmap
Phased on purpose. Each phase ships something testable on its own.
Phase 1 — Aggregation codec ✅
aiokpl/aggregation.py: encode/decode the KPL aggregated record format.aiokpl/hashing.py: partition-key → 128-bit hash, explicit hash key parsing.- Conformance tests against
aws-kinesis-aggand golden bytes captured from the C++ KPL.
Phase 2 — ShardMap ✅
- Async refresh, state machine,
bisect_leftlookup,invalidate()from the retrier, exponential backoff (1s → 30s), background cleanup of closed shards after 60s. - Tests with
motoforListShardspaginated.
Phase 3 — Reducer, Aggregator, Collector ✅
- Generic deadline-driven batcher (
reducer.py) — the core abstraction reused twice. aggregator.pyproduces aggregated records per predicted shard, falling back to single-record mode when the ShardMap isn't ready.collector.pyproducesPutRecordsbatches with the 256 KiB/shard short-circuit.
Phase 4 — Limiter + TokenBucket ✅
token_bucket.py: multi-stream, query-on-demand growth, no sleep.limiter.py: per-shardShardLimiterwith a 25 ms drain loop.
Phase 5 — Sender + Retrier ✅
- Glue to
aiobotocore.put_records. - The full classification table — every row covered in unit tests, including wrong-shard-after-split.
Phase 6 — Producer + lifecycle → v0.1 release ✅
- Per-stream pipeline wiring, graceful shutdown, backpressure semaphore, configurable knobs.
Phase 7 — CloudWatch metrics ✅
- In-process counters per metric / stream / shard / error code, rolling
60 s window, periodic upload via
aiobotocore. Opt-in viaConfig.metrics_level; defaultNONEis a zero-overhead no-op.
Phase 8 — Sync bridge ✅
SyncProducerwraps the asyncProducerbehindanyio.from_thread.start_blocking_portal()so plain blocking code (Flask/Django/Jupyter/scripts) can submit records and wait for outcomes without an async event loop. Thread-safeput_record, timeouts onwait()andflush().
Non-goals
- Wrapping the C++ KPL daemon. Solved problem solved differently.
- Compatibility with the KPL IPC protobuf (
messages.proto). We only match the on-the-wire aggregation format Kinesis sees. - KCL / consumer side. Producer only.
- Python < 3.10.
- Sync-first API. Sync support exists (
SyncProducer) but is a bridge over the async core, not the primary surface.
Development
Requirements: Python 3.10+ and uv (or pip).
git clone <repo>
cd aiokpl
uv venv && source .venv/bin/activate
uv pip install -e ".[dev]"
pytest
CI matrix: Python 3.10 / 3.11 / 3.12 / 3.13. ruff + mypy + pytest.
Reference
Implementation context — including the philosophy, the C++↔Python mapping, the
retrier classification table, and the resume guide — lives in
CLAUDE.md.
Original C++ KPL source for cross-referencing: https://github.com/awslabs/amazon-kinesis-producer.
License
Apache-2.0.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aiokpl-0.2.0.tar.gz.
File metadata
- Download URL: aiokpl-0.2.0.tar.gz
- Upload date:
- Size: 141.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2a742be6941ce6b5d5235d8c8584293d97aed1682d8f04cb8346e9f7df4c927e
|
|
| MD5 |
57a335bb395dd39dfe62805f141869da
|
|
| BLAKE2b-256 |
f58f2ef0ca747bfe1459f9653754b8be682cc3e4cc6f403da5d91ecd938db636
|
Provenance
The following attestation bundles were made for aiokpl-0.2.0.tar.gz:
Publisher:
publish.yml on DevArKaDiA/aiokpl
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aiokpl-0.2.0.tar.gz -
Subject digest:
2a742be6941ce6b5d5235d8c8584293d97aed1682d8f04cb8346e9f7df4c927e - Sigstore transparency entry: 1598243266
- Sigstore integration time:
-
Permalink:
DevArKaDiA/aiokpl@99b1e15b403a3929e8d1ec6cc4eb6a76567863e0 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/DevArKaDiA
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@99b1e15b403a3929e8d1ec6cc4eb6a76567863e0 -
Trigger Event:
release
-
Statement type:
File details
Details for the file aiokpl-0.2.0-py3-none-any.whl.
File metadata
- Download URL: aiokpl-0.2.0-py3-none-any.whl
- Upload date:
- Size: 63.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
20364bc3bc6b5effd1f7284a5d9f12f9d16d2a267e236b212971339c31a4e720
|
|
| MD5 |
29d23af6ed4504ab537959b8ff486995
|
|
| BLAKE2b-256 |
8e66425fe38eaf9f181c46697e8c781b03e838411b906b374b81953d8b8e30d2
|
Provenance
The following attestation bundles were made for aiokpl-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on DevArKaDiA/aiokpl
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aiokpl-0.2.0-py3-none-any.whl -
Subject digest:
20364bc3bc6b5effd1f7284a5d9f12f9d16d2a267e236b212971339c31a4e720 - Sigstore transparency entry: 1598243401
- Sigstore integration time:
-
Permalink:
DevArKaDiA/aiokpl@99b1e15b403a3929e8d1ec6cc4eb6a76567863e0 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/DevArKaDiA
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@99b1e15b403a3929e8d1ec6cc4eb6a76567863e0 -
Trigger Event:
release
-
Statement type: