Skip to main content

A kinesis consumer is purely written in python.

Project description

Kinesis Consumer in Python

alt text alt text alt text alt text alt text

A kinesis consumer is purely written in python. This is a lightweight wrapper on top of AWS python library boto3. You also can consume records from Kinesis Data Stream (KDS) via:

  • Lambda function: I have a demo kinesis-lambda-sqs-demo showing how to consume records in a serverless and real-time way.
  • Kinesis Firehose: This is a AWS managed service and easily save records into different sinks, like S3, ElasticSearch, Redshift.

Installation

Install the package via pip:

pip install kcpy

Getting started

from kcpy import StreamConsumer
consumer = StreamConsumer('my_stream_name')
for record in consumer:
    print(record)

The output would look like:

{
    'ApproximateArrivalTimestamp': datetime.datetime(2018, 11, 13, 11, 57, 55, 117807), 
    'Data': b'Jessica Walter', 
    'PartitionKey': 'Jessica Walter', 
    'SequenceNumber': '1'
}

Or, you can consume stream data with checkpointing:

from kcpy import StreamConsumer
consumer = StreamConsumer('my_stream_name', consumer_name='my_consumer', checkpoint=True)
for record in consumer:
    print(record)

Checkpointing

Below shows the schema of checkpointing:

                                                                   producer
[stream_1]                                                            |
+---------------+---+---+---+---+---+---+---+---+                     |
| shard_1       | 1 | 2 | 3 | 4 | 5 | 6 | 7 |...| <-------------------+
+---------------+---+---+---+---+---+---+---+---+                     |
| shard_2       | 1 | 2 | 3 | 4 | 5 |...| <---------------------------+
+---------------+---+---+---+---+---+---+---+---+---+                 |
| shard_3       | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |...| <---------------+
+---------------+---+---+---+---+---+---+---+---+---+
                          ^                   ^
                          |                   |
                      consumer_1          consumer_2
                          |                   |
                          |                   +---------+
                          |                             |
                          +------------------+          |
                                             |          |
                                             v          |
+---------------+-------------+----------+--------+     |
| consumer_name | stream_name | shard_id | seq_no |     |
+---------------+-------------+----------+--------+     |
| consumer_1    | stream_1    | shard_1  |   5    |     |
| consumer_1    | stream_1    | shard_2  |   15   |     |
| consumer_1    | stream_1    | ...      |   15   |     |
| consumer_1    | stream_1    | shard_N  |   XX   |     |
| consumer_2    | stream_1    | shard_1  |   6    | <---+
+---------------+-------------+----------+--------+

Features

  • Read records from a stream with multiple shards
  • Save checkpoint for each shard consumer for a stream

Todo

  • Add type checking with mypy
  • Add tox for automating multiple testing environments
  • Add the config for travis CI
  • Support other storage solutions (mysql, dynamodb, redis, etc.) for checkpointing
  • Rebalance when the number of shards changes
  • Allow kcpy to run on multiple machines

Changelog

0.1.7

  • Add travis CI config and remove python3.5.

0.1.6

  • Fix some issues in setup.py.

0.1.5

  • Add consumer checkpointing with a simple sqlite storage solution.

0.1.4

  • Pass aws configurations into boto3 client directly.

0.1.3

  • Update the README.

0.1.2

  • Add markdown support for long description.

0.1.1

  • Add a long description.

0.1.0

  • First version of kcpy.

License

Copyright (c) 2018 Hengfeng Li. It is free software, and may be redistributed under the terms specified in the LICENSE file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kcpy-0.1.7.tar.gz (6.2 kB view details)

Uploaded Source

Built Distribution

kcpy-0.1.7-py2.py3-none-any.whl (7.3 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file kcpy-0.1.7.tar.gz.

File metadata

  • Download URL: kcpy-0.1.7.tar.gz
  • Upload date:
  • Size: 6.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.20.1 setuptools/40.6.2 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/3.7.0

File hashes

Hashes for kcpy-0.1.7.tar.gz
Algorithm Hash digest
SHA256 5f1f46d119bcd0cfc8945a10e6148f905c4b4c357df9590e84b7d643efb19cad
MD5 81e23521fafdbc7ffda212d4be3b9292
BLAKE2b-256 c78ed819d04492b2e9c21d3033551d7395bfb423a1e7a2ef32d5471a602d0749

See more details on using hashes here.

File details

Details for the file kcpy-0.1.7-py2.py3-none-any.whl.

File metadata

  • Download URL: kcpy-0.1.7-py2.py3-none-any.whl
  • Upload date:
  • Size: 7.3 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.20.1 setuptools/40.6.2 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/3.7.0

File hashes

Hashes for kcpy-0.1.7-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 b72e8bcb85c6fb999c3287a01b2509096db416a51b30bfeee07d385f3054e8ec
MD5 a7d097b1270a6732514c14ba2834dab2
BLAKE2b-256 9708db9b07d1c94880ee36c02cec65f1f4a5fd987ea6faa1da717150834319dc

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page