AsyncIO Kinesis Library
Project description
async-kinesis
Features
- uses queues for both producer and consumer
- producer flushes with put_records() if has enough to flush or after "buffer_time" reached
- consumer iterates over msg queue independent of shard readers
- Configurable to handle Sharding limits but will throttle/retry if required
- ie multiple independent clients are saturating the Shards
- Checkpointing with heartbeats
- deadlock + reallocation of shards if checkpoint fails to heartbeat within "session_timeout"
- aggregators
- json line delimited
Consumer Design
(Bears some explanation, kinda complex~)
- fetch() gets called periodically (0.2 sec (ie max 5x per second as is the limit on shard get_records()))
- iterate over the list of shards (set on startup, does not currently detect resharding)
- assign shard if not in use and not at "max_shard_consumers" limit otherwise ignore/continue
- ignore/continue if this shard is still fetching
- process records if shard is done fetching
- put records on queue
- add checkpoint record to queue
- assign NextShardIterator
- create (get_records()) task again
- iterate over the list of shards (set on startup, does not currently detect resharding)
Note that get_records() is throttled via "shard_fetch_rate=5" (ie the same 0.2 sec/ 5x limit)
This pattern seemed like the easiest way to maintain a pool of consumers without needing to think too hard about starting it's next job or handling new shards etc.
Not Implemented
- resharding
- client rebalancing (ie share the shards between consumers)
See also
Producer
async with Producer(stream_name="test") as producer:
# Put item onto queue to be flushed via put_records()
await producer.put({'my': 'data'})
Options:
(comments in quotes are Kinesis Limits as per AWS Docs)
-
region_name
AWS Region
-
buffer_time=0.5
Buffer time in seconds before auto flushing records
-
put_rate_limit_per_shard=1000
"A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes
-
batch_size=500
"Each PutRecords request can support up to 500 records"
-
max_queue_size=10000
put() method will block when queue is at max
-
after_flush_fun
async function to call after doing a flush (err put_records()) call
-
aggregator=JsonWithoutAggregation()
Record aggregator. Default is JSON without aggregation Note this is highly inefficient as each record can be up to 1Mib
Consumer
async with Consumer(stream_name="test") as consumer:
while True:
async for item in consumer:
print(item)
# caught up.. take a breather~
Options:
(comments in quotes are Kinesis Limits as per AWS Docs)
-
region_name
AWS Region
-
max_queue_size=1000
the fetch() task shard
-
max_shard_consumers=None
Max number of shards to use. None = all
-
record_limit=10000
Number of records to fetch with get_records()
-
sleep_time_no_records=2
No of seconds to sleep when caught up
-
iterator_type="TRIM_HORIZON"
Default shard iterator type for new/unknown shards (ie start from start of stream) Alternative is "LATEST" (ie end of stream)
-
shard_fetch_rate=5
No of fetches per second (max = 5)
-
checkpointer=MemoryCheckPointer()
Checkpointer to use
-
aggregator=JsonWithoutAggregation()
Record aggregator. Must Match aggregator used by Producer()
Checkpointers
- memory
- redis
Yet another Python Kinesis Library?
Sadly I had issues with every other library I could find :(
-
https://github.com/NerdWalletOSS/kinesis-python
- pro:
- kinda works
- con
- threaded
- Outstanding PR to fix some issues
- checkpoints on every record on main thread
- pro:
-
https://github.com/ungikim/kinsumer
- pro:
- handles shard changes
- no producer
- no redis checkpointer/heartbeat
- threaded/seems kinda complicated~
- con
- consumer only
- pro:
-
https://github.com/bufferapp/kiner
- pro:
- Batching
- con
- Producer only
- pro:
-
https://github.com/niklio/aiokinesis
- pro:
- asyncio
- no checkpointing
- con
- limited to 1 shard / too simplistic
- pro:
-
https://github.com/ticketea/pynesis
- pro:
- checkpoints
- con
- hasn't been updated for 1 year
- doesnt use put_records()
- single threaded / round robin reads shards
- pro:
-
https://github.com/whale2/async-kinesis-client
- pro:
- checkpoints
- asyncio
- con
- ?
- pro:
(Actually I only found this one recently, might be ok alternative?)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for async_kinesis-0.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | f108cbacaee2ace3b6b1a3f9913f2de4fb4c94a2c5eebb34efe72dd328e92146 |
|
MD5 | e8e92fe55ba7eee110ae8a65fca22239 |
|
BLAKE2b-256 | 30272e9181aa0b798a38ca9cb33d6e8ff665c9b9c69737cd72f0b178f04bf29d |