Skip to main content

AsyncIO Kinesis Library

Project description

async-kinesis

Code style: black

Features

  • uses queues for both producer and consumer
    • producer flushes with put_records() if has enough to flush or after "buffer_time" reached
    • consumer iterates over msg queue independent of shard readers
  • Configurable to handle Sharding limits but will throttle/retry if required
    • ie multiple independent clients are saturating the Shards
  • Checkpointing with heartbeats
    • deadlock + reallocation of shards if checkpoint fails to heartbeat within "session_timeout"

Consumer Design

(Bears some explanation, kinda complex~)

  • fetch() gets called periodically (0.2 sec (ie max 5x per second as is the limit on shard get_records()))
    • iterate over the list of shards (set on startup, does not currently detect resharding)
      • assign shard if not in use and not at "max_shard_consumers" limit otherwise ignore/continue
      • ignore/continue if this shard is still fetching
      • process records if shard is done fetching
        • put records on queue
        • add checkpoint record to queue
        • assign NextShardIterator
      • create (get_records()) task again

Note that get_records() is throttled via "shard_fetch_rate=5" (ie the same 0.2 sec/ 5x limit)

This pattern seemed like the easiest way to maintain a pool of consumers without needing to think too hard about starting it's next job or handling new shards etc.

Not Implemented

  • resharding
  • client rebalancing (ie share the shards between consumers)

See also

https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/

Producer

async with Producer(stream_name="test") as producer:
    # Put item onto queue to be flushed via put_records()
    await producer.put({'my': 'data'})

Options:

(comments in quotes are Kinesis Limits as per AWS Docs)

  • region_name

    AWS Region

  • buffer_time=0.5

    Buffer time in seconds before auto flushing records

  • put_rate_limit_per_shard=1000

    "A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes

  • batch_size=500

    "Each PutRecords request can support up to 500 records"

  • max_queue_size=10000

    put() method will block when queue is at max

  • after_flush_fun

    async function to call after doing a flush (err put_records()) call

Consumer

async with Consumer(stream_name="test") as consumer:
    while True:
        async for item in consumer:
            print(item)
        # caught up.. take a breather~

Options:

(comments in quotes are Kinesis Limits as per AWS Docs)

  • region_name

    AWS Region

  • max_queue_size=1000

    the fetch() task shard

  • max_shard_consumers=None

    Max number of shards to use. None = all

  • record_limit=10000

    Number of records to fetch with get_records()

  • sleep_time_no_records=2

    No of seconds to sleep when caught up

  • iterator_type="TRIM_HORIZON"

    Default shard iterator type for new/unknown shards (ie start from start of stream) Alternative is "LATEST" (ie end of stream)

  • shard_fetch_rate=5

    No of fetches per second (max = 5)

  • checkpointer=None

    Checkpointer to use

Checkpointers

  • memory
  • redis

Yet another Python Kinesis Library?

Sadly I had issues with every other library I could find :(

(Actually I only found this one recently, might be ok alternative?)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

py_kinesis-0.0.1-py3.7.egg (22.9 kB view details)

Uploaded Source

File details

Details for the file py_kinesis-0.0.1-py3.7.egg.

File metadata

  • Download URL: py_kinesis-0.0.1-py3.7.egg
  • Upload date:
  • Size: 22.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.0 requests-toolbelt/0.9.1 tqdm/4.32.1 CPython/3.7.3

File hashes

Hashes for py_kinesis-0.0.1-py3.7.egg
Algorithm Hash digest
SHA256 8abdf988df8f6735c4d00c7aa106bd85f2423df74179cc7ddda2c9a48c8c948e
MD5 3aa4bf2a42be707a60004222ca7c6e29
BLAKE2b-256 9ab1972556f9a763de7c48d331d063fd62a899ef1294323b29d070f8c263bea0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page