Easily write your kafka producers and consumers in flask
Project description
Flask And Kafka
Easily write your kafka producers and consumers in flask.
This plugin was developed using confluent-kafka to help you use your producers and consumers alongside your Flask project as easily as possible. Also, kafka-and-consumer logs all messages in producer and consumer
Installation
Install it with the following command:
pip install flask-and-kafka
Usage
Using consumer:
from flask import Flask
from flask_and_kafka import FlaskKafkaConsumer
from flask_and_kafka import close_kafka
app = Flask(__name__)
app.config['KAFKA_CONSUMER_CONFIGS'] = {"bootstrap.servers": 'localhost:9092'}
app.config['KAFKA_CONSUMER_LOG_PATH'] = "logs/kafka_consumer.log"
kafka_consumer = FlaskKafkaConsumer(app)
@kafka_consumer.handle_message(topic='test-topic', group_id='group1')
def handle_logistic_message(msg):
print(msg.value())
if __name__ == '__main__':
kafka_consumer.start()
close_kafka(consumer=kafka_consumer)
app.run()
You can also write your own consumers in separate modules and use them using the register_consumers method
consumers/test_consumer.py
@kafka_consumer.handle_message(topic='test-topic', group_id='group1')
def handle_logistic_message(msg):
print(msg.value())
app.py
from flask import Flask
from flask_and_kafka import FlaskKafkaConsumer
from flask_and_kafka import close_kafka
app = Flask(__name__)
app.config['KAFKA_CONSUMER_CONFIGS'] = {"bootstrap.servers": 'localhost:9092'}
app.config['KAFKA_CONSUMER_LOG_PATH'] = "logs/kafka_consumer.log"
kafka_consumer = FlaskKafkaConsumer(app)
kafka_consumer.register_consumers(['consumers.test_consumer'])
if __name__ == '__main__':
kafka_consumer.start()
close_kafka(consumer=kafka_consumer)
app.run()
Using producer:
from flask import Flask
from flask_and_kafka import FlaskKafkaProducer
from flask_and_kafka import close_kafka
app = Flask(__name__)
app.config['KAFKA_PRODUCER_CONFIGS'] = {"bootstrap.servers": 'localhost:9092'}
app.config['KAFKA_PRODUCER_LOG_PATH'] = "logs/kafka_producer.log"
kafka_producer = FlaskKafkaProducer(app)
kafka_producer.send_message(topic='test-topic', value="Hello, World!")
kafka_producer.send_message(topic='test-topic', value="Hello, World!")
kafka_producer.send_message(topic='test-topic', value="Hello, World!")
kafka_producer.send_message(topic='test-topic', value="Hello, World!")
if __name__ == '__main__':
close_kafka(producer=kafka_producer)
app.run()
Note that you must use close_kafka to close consumer and producer.
FlaskKafkaConsumer - handle_message decorator:
A decorator that registers a message handler function for the given topic and group ID.
Args:
- topic (str): The Kafka topic to subscribe to.
- group_id (str): The Kafka consumer group ID to use.
- num_consumers (int, optional): The number of Kafka consumer threads to spawn (default is 1).
- app_context (bool, optional): Whether to run the message handler function inside a Flask application context (default is False).
- **kwargs: Additional arguments to pass to the Kafka consumer constructor.
Returns: Callable: A decorator function that wraps the message handler function.
FlaskKafkaProducer - send_message method:
Send a message to the specified Kafka topic with the given key and value.
Args:
- topic (str): The Kafka topic to send the message to.
- value (any): The message value to send.
- key (str, optional): The message key to use (default: None).
- flush (bool, optional): Whether to flush the producer's message buffer immediately after sending the message (default: False).
- poll (bool, optional): Whether to wait for any outstanding messages to be sent before returning (default: True).
- poll_timeout (float, optional): The maximum amount of time to wait for outstanding messages to be sent, in seconds (default: 1).
- **kwargs: Additional keyword arguments to pass to the underlying Kafka producer.
Returns: None
Raises: KafkaError: If there is an error producing the message.
Note:
- If
flush
is True, any outstanding messages in the producer's buffer will be sent immediately after the current message is sent. - If
poll
is True, the producer will wait for any outstanding messages to be sent before returning, up to the specifiedpoll_timeout
. - The
poll
argument is only relevant ifflush
is False, since the producer always waits for outstanding messages to be sent before flushing.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file flask-and-kafka-0.0.4.tar.gz
.
File metadata
- Download URL: flask-and-kafka-0.0.4.tar.gz
- Upload date:
- Size: 6.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 95a4ead48eca6dce660e0d21b0dd1cc124948613946a3800c5a210dc981f9b95 |
|
MD5 | 83446466833063015bc0d060223827cf |
|
BLAKE2b-256 | 1df3500d68e2b6cf877683c5b51074e2a763b6a635d971565fea3490831f3825 |
File details
Details for the file flask_and_kafka-0.0.4-py3-none-any.whl
.
File metadata
- Download URL: flask_and_kafka-0.0.4-py3-none-any.whl
- Upload date:
- Size: 8.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4d2c0d5f1c313b6a4d8fc176ee844e42fc952f508b2ef4757be1fa5ba962bd13 |
|
MD5 | 4b4b6349f83546b73591d7a7f429d3a6 |
|
BLAKE2b-256 | 34f0c936a5159b46b8b8958974bcbcd0f79cba054a7598c29f3e07118b125503 |