Kinesis stream consumer(reader) written in python.
Project description
kinesis-stream-consumer
Kinesis stream consumer channelize through redis along with aws autorefreshable session
Usage
Requirements
- python >= 3.0
- boto3 >= 1.13.5
- kinesis-python >= 0.2.1
- redis >= 3.5.0
Installation
Install with:
pip install kinesis-stream-consumer
Or, if you're using a development version cloned from this repository:
git clone https://github.com/harshittrivedi78/kinesis-stream-consumer.git
python kinesis-stream-consumer/setup.py install
This will install boto3 >= 1.13.5 and kinesis-python >= 0.2.1 and redis >= 3.5.0
How to use it?
There is two consumer which has to be run parallelly one is kinesis consumer and second is records queue consumer (redis). I have added a example.py file in this code base which can be used to check and test the code.
import threading
from kinesis_stream.consumer import KinesisConsumer
from kinesis_stream.record_queue import RecordQueueConsumer
from kinesis_stream.redis_wrapper import get_redis_conn
redis_conn = get_redis_conn(host="localhost", port=6379, db="0")
stream_name = "test-kinesis-stream"
region = "eu-west-1"
kinesis_consumer = KinesisConsumer(stream_name, region, redis_conn)
record_queue_consumer = RecordQueueConsumer(stream_name, redis_conn)
kinesis_consumer_thread = threading.Thread(name='kinesis_consumer', target=kinesis_consumer.start)
kinesis_consumer_thread.start()
record_queue_consumer_thread = threading.Thread(name='record_queue_consumer', target=record_queue_consumer.start)
record_queue_consumer_thread.start()
Override handle_message func to do some stuff with the kinesis messages.
from kinesis_stream.record_queue import RecordQueueConsumer as BaseRecordQueueConsumer
class RecordQueueConsumer(BaseRecordQueueConsumer):
def handle_message(self, message):
# your code
print(message)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file kinesis-stream-consumer-1.0.1.tar.gz
.
File metadata
- Download URL: kinesis-stream-consumer-1.0.1.tar.gz
- Upload date:
- Size: 2.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.7.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 937f464953ee2dac36efa8afb9f81924b4a8f1cfe8c8ad682337d996598cb6e5 |
|
MD5 | fc296f5f009c7e1a0a5121b9cce97dd2 |
|
BLAKE2b-256 | 0a70945a4b85e3ec0b694d7248e1525ead788578963855da8feaa75388694af5 |
File details
Details for the file kinesis_stream_consumer-1.0.1-py2.py3-none-any.whl
.
File metadata
- Download URL: kinesis_stream_consumer-1.0.1-py2.py3-none-any.whl
- Upload date:
- Size: 14.4 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.7.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 56589eb73b60654e68286364039f84314726829d63d6cad2d1fc05524a42297b |
|
MD5 | 791add128666279f581712f53f0420e1 |
|
BLAKE2b-256 | 0a507931a78570fca9d1396e15b3d3b74f896c938a5b22d82b3f7d711f5687b7 |