Kinesis stream consumer(reader) written in python.
Project description
kinesis-stream-consumer
Kinesis stream consumer channelize through redis along with aws autorefreshable session
Usage
Requirements
- python >= 3.0
- boto3 >= 1.13.5
- kinesis-python >= 0.2.1
- redis >= 3.5.0
Installation
Install with:
pip install kinesis-stream-consumer
Or, if you're using a development version cloned from this repository:
git clone https://github.com/harshittrivedi78/kinesis-stream-consumer.git
python kinesis-stream-consumer/setup.py install
This will install boto3 >= 1.13.5 and kinesis-python >= 0.2.1 and redis >= 3.5.0
How to use it?
There is two consumer which has to be run parallelly one is kinesis consumer and second is records queue consumer (redis). I have added a example.py file in this code base which can be used to check and test the code.
import threading
from kinesis_stream.consumer import KinesisConsumer
from kinesis_stream.record_queue import RecordQueueConsumer
from kinesis_stream.redis_wrapper import get_redis_conn
redis_conn = get_redis_conn(host="localhost", port=6379, db="0")
stream_name = "test-kinesis-stream"
region = "eu-west-1"
kinesis_consumer = KinesisConsumer(stream_name, region, redis_conn)
record_queue_consumer = RecordQueueConsumer(stream_name, redis_conn)
kinesis_consumer_thread = threading.Thread(name='kinesis_consumer', target=kinesis_consumer.start)
kinesis_consumer_thread.start()
record_queue_consumer_thread = threading.Thread(name='record_queue_consumer', target=record_queue_consumer.start)
record_queue_consumer_thread.start()
Override handle_message func to do some stuff with the kinesis messages.
from kinesis_stream.record_queue import RecordQueueConsumer as BaseRecordQueueConsumer
class RecordQueueConsumer(BaseRecordQueueConsumer):
def handle_message(self, message):
# your code
print(message)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for kinesis-stream-consumer-1.0.1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 937f464953ee2dac36efa8afb9f81924b4a8f1cfe8c8ad682337d996598cb6e5 |
|
MD5 | fc296f5f009c7e1a0a5121b9cce97dd2 |
|
BLAKE2b-256 | 0a70945a4b85e3ec0b694d7248e1525ead788578963855da8feaa75388694af5 |
Close
Hashes for kinesis_stream_consumer-1.0.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 56589eb73b60654e68286364039f84314726829d63d6cad2d1fc05524a42297b |
|
MD5 | 791add128666279f581712f53f0420e1 |
|
BLAKE2b-256 | 0a507931a78570fca9d1396e15b3d3b74f896c938a5b22d82b3f7d711f5687b7 |