Miscellaneous utilities for AWS Kinesis.
pip install kinesisutils
To install the development version:
pip install git+https://github.com/findhotel/kinesisutils
At the moment there is only one utility implemented: a Python generator that reads records from a Kinesis stream. You can use it like this:
import json from kinesisutils.kinesisutils import KinesisGenerator # Hit Kinesis with at most 10 requests per second, usin json.loads to # deserialize the Kinesis records (the default). You could deactivate record # deserialization by setting des=None. kg = KinesisGenerator("stream_name", rqs=10, des=json.loads) for rec in kg: print(rec)
By default the generator will keep pulling records from Kinesis for 60 seconds. You can customize this timeout if you want:
from kinesisutils.kinesisutils import KinesisGenerator kg = KinesisGenerator("stream_name", timeout=30) for rec in kg: print(rec)
If you are using the Kinesis generator to read Cloudwatch log events that are being forwarded to a Kinesis stream using a [logs subscription][logsubs] you will need to decompress the Cloudwatch records before deserializing them:
import decompress import json kg = KinesisGenerator("stream_name", rqs=10, des=json.loads, preprocess=gzip.decompress) for rec in kg: print(rec)
If you have questions, bug reports, suggestions, etc. please create an issue on the GitHub project page. PRs are also welcome.
This software is licensed under the MIT license.
See License file.
© 2017 German Gomez-Herrero, Find Hotel and others.
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.