Skip to main content

AsyncIO Kinesis Library

Project description

async-kinesis

Code style: black

Features

  • uses queues for both producer and consumer
    • producer flushes with put_records() if has enough to flush or after "buffer_time" reached
    • consumer iterates over msg queue independent of shard readers
  • Configurable to handle Sharding limits but will throttle/retry if required
    • ie multiple independent clients are saturating the Shards
  • Checkpointing with heartbeats
    • deadlock + reallocation of shards if checkpoint fails to heartbeat within "session_timeout"

Consumer Design

(Bears some explanation, kinda complex~)

  • fetch() gets called periodically (0.2 sec (ie max 5x per second as is the limit on shard get_records()))
    • iterate over the list of shards (set on startup, does not currently detect resharding)
      • assign shard if not in use and not at "max_shard_consumers" limit otherwise ignore/continue
      • ignore/continue if this shard is still fetching
      • process records if shard is done fetching
        • put records on queue
        • add checkpoint record to queue
        • assign NextShardIterator
      • create (get_records()) task again

Note that get_records() is throttled via "shard_fetch_rate=5" (ie the same 0.2 sec/ 5x limit)

This pattern seemed like the easiest way to maintain a pool of consumers without needing to think too hard about starting it's next job or handling new shards etc.

Not Implemented

  • resharding
  • client rebalancing (ie share the shards between consumers)

See also

https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/

Producer

async with Producer(stream_name="test") as producer:
    # Put item onto queue to be flushed via put_records()
    await producer.put({'my': 'data'})

Options:

(comments in quotes are Kinesis Limits as per AWS Docs)

  • region_name

    AWS Region

  • buffer_time=0.5

    Buffer time in seconds before auto flushing records

  • put_rate_limit_per_shard=1000

    "A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes

  • batch_size=500

    "Each PutRecords request can support up to 500 records"

  • max_queue_size=10000

    put() method will block when queue is at max

  • after_flush_fun

    async function to call after doing a flush (err put_records()) call

Consumer

async with Consumer(stream_name="test") as consumer:
    while True:
        async for item in consumer:
            print(item)
        # caught up.. take a breather~

Options:

(comments in quotes are Kinesis Limits as per AWS Docs)

  • region_name

    AWS Region

  • max_queue_size=1000

    the fetch() task shard

  • max_shard_consumers=None

    Max number of shards to use. None = all

  • record_limit=10000

    Number of records to fetch with get_records()

  • sleep_time_no_records=2

    No of seconds to sleep when caught up

  • iterator_type="TRIM_HORIZON"

    Default shard iterator type for new/unknown shards (ie start from start of stream) Alternative is "LATEST" (ie end of stream)

  • shard_fetch_rate=5

    No of fetches per second (max = 5)

  • checkpointer=None

    Checkpointer to use

Checkpointers

  • memory
  • redis

Yet another Python Kinesis Library?

Sadly I had issues with every other library I could find :(

(Actually I only found this one recently, might be ok alternative?)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

py_kinesis-0.0.1-py3.7.egg (22.9 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page