Skip to main content

AsyncIO Kinesis Library

Project description

async-kinesis

Code style: black

Features

  • uses queues for both producer and consumer
    • producer flushes with put_records() if has enough to flush or after "buffer_time" reached
    • consumer iterates over msg queue independent of shard readers
  • Configurable to handle Sharding limits but will throttle/retry if required
    • ie multiple independent clients are saturating the Shards
  • Checkpointing with heartbeats
    • deadlock + reallocation of shards if checkpoint fails to heartbeat within "session_timeout"

Consumer Design

(Bears some explanation, kinda complex~)

  • fetch() gets called periodically (0.2 sec (ie max 5x per second as is the limit on shard get_records()))
    • iterate over the list of shards (set on startup, does not currently detect resharding)
      • assign shard if not in use and not at "max_shard_consumers" limit otherwise ignore/continue
      • ignore/continue if this shard is still fetching
      • process records if shard is done fetching
        • put records on queue
        • add checkpoint record to queue
        • assign NextShardIterator
      • create (get_records()) task again

Note that get_records() is throttled via "shard_fetch_rate=5" (ie the same 0.2 sec/ 5x limit)

This pattern seemed like the easiest way to maintain a pool of consumers without needing to think too hard about starting it's next job or handling new shards etc.

Not Implemented

  • resharding
  • client rebalancing (ie share the shards between consumers)

See also

https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/

Producer

async with Producer(stream_name="test") as producer:
    # Put item onto queue to be flushed via put_records()
    await producer.put({'my': 'data'})

Options:

(comments in quotes are Kinesis Limits as per AWS Docs)

  • region_name

    AWS Region

  • buffer_time=0.5

    Buffer time in seconds before auto flushing records

  • put_rate_limit_per_shard=1000

    "A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes

  • batch_size=500

    "Each PutRecords request can support up to 500 records"

  • max_queue_size=10000

    put() method will block when queue is at max

  • after_flush_fun

    async function to call after doing a flush (err put_records()) call

Consumer

async with Consumer(stream_name="test") as consumer:
    while True:
        async for item in consumer:
            print(item)
        # caught up.. take a breather~

Options:

(comments in quotes are Kinesis Limits as per AWS Docs)

  • region_name

    AWS Region

  • max_queue_size=1000

    the fetch() task shard

  • max_shard_consumers=None

    Max number of shards to use. None = all

  • record_limit=10000

    Number of records to fetch with get_records()

  • sleep_time_no_records=2

    No of seconds to sleep when caught up

  • iterator_type="TRIM_HORIZON"

    Default shard iterator type for new/unknown shards (ie start from start of stream) Alternative is "LATEST" (ie end of stream)

  • shard_fetch_rate=5

    No of fetches per second (max = 5)

  • checkpointer=None

    Checkpointer to use

Checkpointers

  • memory
  • redis

Yet another Python Kinesis Library?

Sadly I had issues with every other library I could find :(

(Actually I only found this one recently, might be ok alternative?)

Project details


Release history Release notifications

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for py-kinesis, version 0.0.1
Filename, size File type Python version Upload date Hashes
Filename, size py_kinesis-0.0.1-py3.7.egg (22.9 kB) File type Egg Python version 3.7 Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page