Skip to main content

Pure-Python async Kinesis producer. KPL-equivalent without a native daemon.

Project description

aiokpl

CI codecov Python License Ruff

📚 Documentation

Pure-Python async Kinesis producer. KPL-equivalent without a native daemon.

v0.2 available — full pipeline, vendor-neutral metrics with first-party CloudWatch / OpenTelemetry / Datadog sinks, and a thread-safe SyncProducer for non-async callers. All exercised end-to-end against kinesis-mock. See the Roadmap.

A library that respects the shard as the unit of optimization, measures time before size, and treats failures as user-visible information — not noise to hide.


Status

v0.2 — feature-complete. Full async pipeline (ProducerAggregatorLimiterCollectorSenderRetrier), opt-in vendor-neutral metrics with first-party sinks, and a thread-safe SyncProducer for non-async callers. Not on PyPI yet — install via the repo URL.

Phase Scope Status
0 Repo scaffolding, design docs ✅ Done
1 Aggregation codec (KPL wire format) ✅ Done
2 ShardMap + prediction ✅ Done
3 Reducer, Aggregator, Collector ✅ Done
4 Limiter + TokenBucket ✅ Done
5 Sender + Retrier ✅ Done
6 Producer + lifecycle (first usable release: v0.1) ✅ Done
7 CloudWatch metrics (opt-in, default off) ✅ Done
8 Sync bridge (SyncProducer) ✅ Done

Why this exists

AWS ships the official Kinesis Producer Library as a native C++ binary (amazon-kinesis-producer) wrapped in Java/.NET sidecars. The Python ecosystem has never had a real KPL — only:

  • aws-kinesis-agg — Python codec for the aggregation format. Useful, but not a producer. You still call boto3.put_records yourself.
  • kiner, kinesis-python, kinesis-producer (ludia) — abandoned community attempts. All thin batchers over boto3.

aiokpl is a clean-room reimplementation in idiomatic async Python — built on anyio so the same code runs on both asyncio and trio — that preserves what's worth preserving from the C++ KPL (shard-aware pipeline, deadline-driven batching, smart retry classification, byte-exact aggregation) and drops what was an accident of C++ (IPC, named pipes, child process, custom spinlocks, static binaries, packaging hell).

It is not a wrapper around the C++ binary. It is a reimplementation of its ideas in a language where you don't need a daemon.


Core principles

  1. The shard is the unit of optimization, not the stream.
  2. Predict before asking — the sharding algorithm is deterministic.
  3. Batching is governed by deadlines, not sizes.
  4. Each stage has one responsibility and one downstream callback.
  5. Failures are data, not exceptions.
  6. Bounded latency beats maximum throughput.

The full design rationale lives in CLAUDE.md.


Features

What v0.2 ships

  • Backend-agnostic core via anyio — codec, ShardMap, Reducer, Aggregator, Collector, Limiter, and TokenBucket all run on either asyncio or trio. The Producer/SyncProducer/Sender/Retrier edge is asyncio-only because aiobotocore is asyncio-only.
  • Async-first API built on anyio and (for the network layer) aiobotocore.
  • Byte-exact KPL aggregation on the wire — KCL consumers deaggregate transparently.
  • Shard prediction via md5(partition_key) + cached ListShards, O(log N) lookup with bisect.
  • Per-shard rate limiting with a multi-stream token bucket (1000 records/s + 1 MiB/s, matching Kinesis hard limits).
  • Deadline-driven batching at two levels: UserRecord → AggregatedRecord and AggregatedRecord → PutRecords batch.
  • Smart retry classification distinguishing throttle, transient, wrong-shard (with split detection), and expired.
  • Per-record attempt history returned to the caller — every retry is visible.
  • Bounded backpressure via max_outstanding_records.
  • Graceful shutdown via async with + flush().
  • Vendor-neutral metrics behind a MetricsSink Protocol. Default NullSink is zero overhead. First-party sinks: InMemorySink for tests, CloudWatchSink bundled (since aiobotocore is already a dep), plus OpenTelemetrySink and DatadogSink behind the aiokpl[otel] / aiokpl[datadog] extras. Metric names match the C++ KPL constants verbatim.
  • Synchronous bridgeSyncProducer wraps the async core behind anyio.from_thread.start_blocking_portal() so Flask/Django/Jupyter callers can submit records without an event loop. Thread-safe put_record, bounded wait(timeout=) and flush(timeout=).

See the Non-goals section below for what aiokpl deliberately doesn't do.


Intended usage

import anyio
from aiokpl import Producer, Config

async def main():
    cfg = Config(
        region="us-east-1",
        aggregation_enabled=True,
        record_max_buffered_time_ms=100,
        record_ttl_ms=30_000,
        fail_if_throttled=False,
    )
    async with Producer(cfg) as producer:
        outcome = await producer.put_record(
            stream="my-stream",
            partition_key="user-123",
            data=b"hello",
        )
        result = await outcome.wait()
        if result.success:
            print(result.shard_id, result.sequence_number)
        else:
            print("failed:", result.attempts[-1].error_code)

anyio.run(main)  # works on asyncio (default) or pass backend="trio"

put_record() returns an awaitable future. It resolves when the record reaches a terminal state — success or final failure — and the attempts list always carries the full retry history.

Sync usage

For callers without an async event loop (Flask/Django handlers, scripts, Jupyter), use SyncProducer:

from aiokpl import Config, SyncProducer

with SyncProducer(Config(region="us-east-1")) as producer:
    outcome = producer.put_record(
        stream="my-stream",
        partition_key="user-123",
        data=b"hello",
    )
    result = outcome.wait(timeout=5.0)

Under the hood it spins up an anyio event loop on a background thread via anyio.from_thread.start_blocking_portal() and runs the async Producer there. put_record is thread-safe; wait() and flush() accept timeouts. See Phase 8 docs.


Architecture (target)

UserRecord
   │  producer.put_record()
   ▼
Aggregator   ──► AggregatedRecord  (per predicted shard, deadline-driven)
   ▼
Limiter      ──► throttled to 1000 rec/s + 1 MiB/s per shard
   ▼
Collector    ──► PutRecordsBatch   (500 records / 5 MiB / 256 KiB-per-shard)
   ▼
Sender       ──► aiobotocore.put_records (async)
   ▼
Retrier      ──► classify outcome (throttle / transient / wrong-shard / expired)
   ▼
finish_user_record  →  resolves the user's awaitable future

Same pipeline as the C++ KPL, in idiomatic anyio primitives. See CLAUDE.md for the C++↔Python translation table.


Roadmap

Phased on purpose. Each phase ships something testable on its own.

Phase 1 — Aggregation codec ✅

  • aiokpl/aggregation.py: encode/decode the KPL aggregated record format.
  • aiokpl/hashing.py: partition-key → 128-bit hash, explicit hash key parsing.
  • Conformance tests against aws-kinesis-agg and golden bytes captured from the C++ KPL.

Phase 2 — ShardMap ✅

  • Async refresh, state machine, bisect_left lookup, invalidate() from the retrier, exponential backoff (1s → 30s), background cleanup of closed shards after 60s.
  • Tests with moto for ListShards paginated.

Phase 3 — Reducer, Aggregator, Collector ✅

  • Generic deadline-driven batcher (reducer.py) — the core abstraction reused twice.
  • aggregator.py produces aggregated records per predicted shard, falling back to single-record mode when the ShardMap isn't ready.
  • collector.py produces PutRecords batches with the 256 KiB/shard short-circuit.

Phase 4 — Limiter + TokenBucket ✅

  • token_bucket.py: multi-stream, query-on-demand growth, no sleep.
  • limiter.py: per-shard ShardLimiter with a 25 ms drain loop.

Phase 5 — Sender + Retrier ✅

  • Glue to aiobotocore.put_records.
  • The full classification table — every row covered in unit tests, including wrong-shard-after-split.

Phase 6 — Producer + lifecycle → v0.1 release

  • Per-stream pipeline wiring, graceful shutdown, backpressure semaphore, configurable knobs.

Phase 7 — CloudWatch metrics ✅

  • In-process counters per metric / stream / shard / error code, rolling 60 s window, periodic upload via aiobotocore. Opt-in via Config.metrics_level; default NONE is a zero-overhead no-op.

Phase 8 — Sync bridge ✅

  • SyncProducer wraps the async Producer behind anyio.from_thread.start_blocking_portal() so plain blocking code (Flask/Django/Jupyter/scripts) can submit records and wait for outcomes without an async event loop. Thread-safe put_record, timeouts on wait() and flush().

Non-goals

  • Wrapping the C++ KPL daemon. Solved problem solved differently.
  • Compatibility with the KPL IPC protobuf (messages.proto). We only match the on-the-wire aggregation format Kinesis sees.
  • KCL / consumer side. Producer only.
  • Python < 3.10.
  • Sync-first API. Sync support exists (SyncProducer) but is a bridge over the async core, not the primary surface.

Development

Requirements: Python 3.10+ and uv (or pip).

git clone <repo>
cd aiokpl
uv venv && source .venv/bin/activate
uv pip install -e ".[dev]"
pytest

CI matrix: Python 3.10 / 3.11 / 3.12 / 3.13. ruff + mypy + pytest.


Reference

Implementation context — including the philosophy, the C++↔Python mapping, the retrier classification table, and the resume guide — lives in CLAUDE.md.

Original C++ KPL source for cross-referencing: https://github.com/awslabs/amazon-kinesis-producer.


License

Apache-2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiokpl-0.2.0.tar.gz (141.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aiokpl-0.2.0-py3-none-any.whl (63.8 kB view details)

Uploaded Python 3

File details

Details for the file aiokpl-0.2.0.tar.gz.

File metadata

  • Download URL: aiokpl-0.2.0.tar.gz
  • Upload date:
  • Size: 141.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for aiokpl-0.2.0.tar.gz
Algorithm Hash digest
SHA256 2a742be6941ce6b5d5235d8c8584293d97aed1682d8f04cb8346e9f7df4c927e
MD5 57a335bb395dd39dfe62805f141869da
BLAKE2b-256 f58f2ef0ca747bfe1459f9653754b8be682cc3e4cc6f403da5d91ecd938db636

See more details on using hashes here.

Provenance

The following attestation bundles were made for aiokpl-0.2.0.tar.gz:

Publisher: publish.yml on DevArKaDiA/aiokpl

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aiokpl-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: aiokpl-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 63.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for aiokpl-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 20364bc3bc6b5effd1f7284a5d9f12f9d16d2a267e236b212971339c31a4e720
MD5 29d23af6ed4504ab537959b8ff486995
BLAKE2b-256 8e66425fe38eaf9f181c46697e8c781b03e838411b906b374b81953d8b8e30d2

See more details on using hashes here.

Provenance

The following attestation bundles were made for aiokpl-0.2.0-py3-none-any.whl:

Publisher: publish.yml on DevArKaDiA/aiokpl

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page