Miscellaneous utilities for AWS Kinesis.
pip install kinesisutils
To install the development version:
pip install git+https://github.com/findhotel/kinesisutils
At the moment there is only one utility implemented: a Python generator that reads records from a Kinesis stream. You can use it like this:
import json from kinesisutils.kinesisutils import KinesisGenerator # Hit Kinesis with at most 10 requests per second, usin json.loads to # deserialize the Kinesis records (the default). You could deactivate record # deserialization by setting des=None. kg = KinesisGenerator("stream_name", rqs=10, des=json.loads) for rec in kg: print(rec)
By default the generator will keep pulling records from Kinesis for 60 seconds. You can customize this timeout if you want:
from kinesisutils.kinesisutils import KinesisGenerator kg = KinesisGenerator("stream_name", timeout=30) for rec in kg: print(rec)
If you are using the Kinesis generator to read Cloudwatch log events that are being forwarded to a Kinesis stream using a [logs subscription][logsubs] you will need to decompress the Cloudwatch records before deserializing them:
import decompress import json kg = KinesisGenerator("stream_name", rqs=10, des=json.loads, preprocess=gzip.decompress) for rec in kg: print(rec)
If you have questions, bug reports, suggestions, etc. please create an issue on the GitHub project page. PRs are also welcome.