AsyncIO Kinesis Library
Project description
async-kinesis
Features
- uses queues for both producer and consumer
- producer flushes with put_records() if has enough to flush or after "buffer_time" reached
- consumer iterates over msg queue independent of shard readers
- Configurable to handle Sharding limits but will throttle/retry if required
- ie multiple independent clients are saturating the Shards
- Checkpointing with heartbeats
- deadlock + reallocation of shards if checkpoint fails to heartbeat within "session_timeout"
Consumer Design
(Bears some explanation, kinda complex~)
- fetch() gets called periodically (0.2 sec (ie max 5x per second as is the limit on shard get_records()))
- iterate over the list of shards (set on startup, does not currently detect resharding)
- assign shard if not in use and not at "max_shard_consumers" limit otherwise ignore/continue
- ignore/continue if this shard is still fetching
- process records if shard is done fetching
- put records on queue
- add checkpoint record to queue
- assign NextShardIterator
- create (get_records()) task again
- iterate over the list of shards (set on startup, does not currently detect resharding)
Note that get_records() is throttled via "shard_fetch_rate=5" (ie the same 0.2 sec/ 5x limit)
This pattern seemed like the easiest way to maintain a pool of consumers without needing to think too hard about starting it's next job or handling new shards etc.
Not Implemented
- resharding
- client rebalancing (ie share the shards between consumers)
See also
Producer
async with Producer(stream_name="test") as producer:
# Put item onto queue to be flushed via put_records()
await producer.put({'my': 'data'})
Options:
(comments in quotes are Kinesis Limits as per AWS Docs)
-
region_name
AWS Region
-
buffer_time=0.5
Buffer time in seconds before auto flushing records
-
put_rate_limit_per_shard=1000
"A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes
-
batch_size=500
"Each PutRecords request can support up to 500 records"
-
max_queue_size=10000
put() method will block when queue is at max
-
after_flush_fun
async function to call after doing a flush (err put_records()) call
Consumer
async with Consumer(stream_name="test") as consumer:
while True:
async for item in consumer:
print(item)
# caught up.. take a breather~
Options:
(comments in quotes are Kinesis Limits as per AWS Docs)
-
region_name
AWS Region
-
max_queue_size=1000
the fetch() task shard
-
max_shard_consumers=None
Max number of shards to use. None = all
-
record_limit=10000
Number of records to fetch with get_records()
-
sleep_time_no_records=2
No of seconds to sleep when caught up
-
iterator_type="TRIM_HORIZON"
Default shard iterator type for new/unknown shards (ie start from start of stream) Alternative is "LATEST" (ie end of stream)
-
shard_fetch_rate=5
No of fetches per second (max = 5)
-
checkpointer=None
Checkpointer to use
Checkpointers
- memory
- redis
Yet another Python Kinesis Library?
Sadly I had issues with every other library I could find :(
-
https://github.com/NerdWalletOSS/kinesis-python
- pro:
- kinda works
- con
- threaded
- Outstanding PR to fix some issues
- checkpoints on every record on main thread
- pro:
-
https://github.com/ungikim/kinsumer
- pro:
- handles shard changes
- no producer
- no redis checkpointer/heartbeat
- threaded/seems kinda complicated~
- con
- consumer only
- pro:
-
https://github.com/bufferapp/kiner
- pro:
- Batching
- con
- Producer only
- pro:
-
https://github.com/niklio/aiokinesis
- pro:
- asyncio
- no checkpointing
- con
- limited to 1 shard / too simplistic
- pro:
-
https://github.com/ticketea/pynesis
- pro:
- checkpoints
- con
- hasn't been updated for 1 year
- doesnt use put_records()
- single threaded / round robin reads shards
- pro:
-
https://github.com/whale2/async-kinesis-client
- pro:
- checkpoints
- asyncio
- con
- ?
- pro:
(Actually I only found this one recently, might be ok alternative?)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file py_kinesis-0.0.1-py3.7.egg
.
File metadata
- Download URL: py_kinesis-0.0.1-py3.7.egg
- Upload date:
- Size: 22.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.0 requests-toolbelt/0.9.1 tqdm/4.32.1 CPython/3.7.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8abdf988df8f6735c4d00c7aa106bd85f2423df74179cc7ddda2c9a48c8c948e |
|
MD5 | 3aa4bf2a42be707a60004222ca7c6e29 |
|
BLAKE2b-256 | 9ab1972556f9a763de7c48d331d063fd62a899ef1294323b29d070f8c263bea0 |