Skip to main content

Kinesis stream consumer(reader) written in python.

Project description

kinesis-stream-consumer

Kinesis stream consumer channelize through redis along with aws autorefreshable session

Usage

Requirements

  • python >= 3.0
  • boto3 >= 1.13.5
  • kinesis-python >= 0.2.1
  • redis >= 3.5.0

Installation

Install with:

pip install kinesis-stream-consumer

Or, if you're using a development version cloned from this repository:

git clone https://github.com/harshittrivedi78/kinesis-stream-consumer.git
python kinesis-stream-consumer/setup.py install

This will install boto3 >= 1.13.5 and kinesis-python >= 0.2.1 and redis >= 3.5.0

How to use it?

There is two consumer which has to be run parallelly one is kinesis consumer and second is records queue consumer (redis). I have added a example.py file in this code base which can be used to check and test the code.

import threading

from kinesis_stream.consumer import KinesisConsumer
from kinesis_stream.record_queue import RecordQueueConsumer
from kinesis_stream.redis_wrapper import get_redis_conn

redis_conn = get_redis_conn(host="localhost", port=6379, db="0")

stream_name = "test-kinesis-stream"
region = "eu-west-1"

kinesis_consumer = KinesisConsumer(stream_name, region, redis_conn)
record_queue_consumer = RecordQueueConsumer(stream_name, redis_conn)

kinesis_consumer_thread = threading.Thread(name='kinesis_consumer', target=kinesis_consumer.start)
kinesis_consumer_thread.start()

record_queue_consumer_thread = threading.Thread(name='record_queue_consumer', target=record_queue_consumer.start)
record_queue_consumer_thread.start()

Override handle_message func to do some stuff with the kinesis messages.

from kinesis_stream.record_queue import RecordQueueConsumer as BaseRecordQueueConsumer

class RecordQueueConsumer(BaseRecordQueueConsumer):
    def handle_message(self, message):
        # your code
        print(message)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kinesis-stream-consumer-1.0.1.tar.gz (2.3 kB view details)

Uploaded Source

Built Distribution

kinesis_stream_consumer-1.0.1-py2.py3-none-any.whl (14.4 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file kinesis-stream-consumer-1.0.1.tar.gz.

File metadata

  • Download URL: kinesis-stream-consumer-1.0.1.tar.gz
  • Upload date:
  • Size: 2.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.7.5

File hashes

Hashes for kinesis-stream-consumer-1.0.1.tar.gz
Algorithm Hash digest
SHA256 937f464953ee2dac36efa8afb9f81924b4a8f1cfe8c8ad682337d996598cb6e5
MD5 fc296f5f009c7e1a0a5121b9cce97dd2
BLAKE2b-256 0a70945a4b85e3ec0b694d7248e1525ead788578963855da8feaa75388694af5

See more details on using hashes here.

File details

Details for the file kinesis_stream_consumer-1.0.1-py2.py3-none-any.whl.

File metadata

  • Download URL: kinesis_stream_consumer-1.0.1-py2.py3-none-any.whl
  • Upload date:
  • Size: 14.4 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.7.5

File hashes

Hashes for kinesis_stream_consumer-1.0.1-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 56589eb73b60654e68286364039f84314726829d63d6cad2d1fc05524a42297b
MD5 791add128666279f581712f53f0420e1
BLAKE2b-256 0a507931a78570fca9d1396e15b3d3b74f896c938a5b22d82b3f7d711f5687b7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page