Skip to main content

Easily write your kafka producers and consumers in flask

Project description

Flask And Kafka

Easily write your kafka producers and consumers in flask.

This plugin was developed using confluent-kafka to help you use your producers and consumers alongside your Flask project as easily as possible. Also, kafka-and-consumer logs all messages in producer and consumer

Installation

Install it with the following command:

pip install flask-and-kafka

Usage

Using consumer:

from flask import Flask
from flask_and_kafka import FlaskKafkaConsumer
from flask_and_kafka import close_kafka


app = Flask(__name__)
app.config['KAFKA_CONSUMER_CONFIGS'] = {"bootstrap.servers": 'localhost:9092'}
app.config['KAFKA_CONSUMER_LOG_PATH'] = "logs/kafka_consumer.log"

kafka_consumer = FlaskKafkaConsumer(app)

@kafka_consumer.handle_message(topic='test-topic', group_id='group1')
def handle_logistic_message(msg):
    print(msg.value())

if __name__ == '__main__':
    kafka_consumer.start()
    close_kafka(consumer=kafka_consumer)
    app.run()

You can also write your own consumers in separate modules and use them using the register_consumers method

consumers/test_consumer.py ‍‍‍‍

@kafka_consumer.handle_message(topic='test-topic', group_id='group1')
def handle_logistic_message(msg):
    print(msg.value())

app.py

from flask import Flask
from flask_and_kafka import FlaskKafkaConsumer
from flask_and_kafka import close_kafka


app = Flask(__name__)
app.config['KAFKA_CONSUMER_CONFIGS'] = {"bootstrap.servers": 'localhost:9092'}
app.config['KAFKA_CONSUMER_LOG_PATH'] = "logs/kafka_consumer.log"

kafka_consumer = FlaskKafkaConsumer(app)
kafka_consumer.register_consumers(['consumers.test_consumer'])

if __name__ == '__main__':
    kafka_consumer.start()
    close_kafka(consumer=kafka_consumer)
    app.run()

Using producer:
from flask import Flask
from flask_and_kafka import FlaskKafkaProducer
from flask_and_kafka import close_kafka


app = Flask(__name__)
app.config['KAFKA_PRODUCER_CONFIGS'] = {"bootstrap.servers": 'localhost:9092'}
app.config['KAFKA_PRODUCER_LOG_PATH'] = "logs/kafka_producer.log"
kafka_producer = FlaskKafkaProducer(app)

kafka_producer.send_message(topic='test-topic', value="Hello, World!")
kafka_producer.send_message(topic='test-topic', value="Hello, World!")
kafka_producer.send_message(topic='test-topic', value="Hello, World!")
kafka_producer.send_message(topic='test-topic', value="Hello, World!")

if __name__ == '__main__':
    close_kafka(producer=kafka_producer)
    app.run()

‌Note that you must use close_kafka to close consumer and producer.


FlaskKafkaConsumer - handle_message decorator:

A decorator that registers a message handler function for the given topic and group ID.

Args:

  • topic (str): The Kafka topic to subscribe to.
  • group_id (str): The Kafka consumer group ID to use.
  • num_consumers (int, optional): The number of Kafka consumer threads to spawn (default is 1).
  • app_context (bool, optional): Whether to run the message handler function inside a Flask application context (default is False).
  • **kwargs: Additional arguments to pass to the Kafka consumer constructor.

Returns: Callable: A decorator function that wraps the message handler function.

FlaskKafkaProducer - send_message method:

Send a message to the specified Kafka topic with the given key and value.

Args:

  • topic (str): The Kafka topic to send the message to.
  • value (any): The message value to send.
  • key (str, optional): The message key to use (default: None).
  • flush (bool, optional): Whether to flush the producer's message buffer immediately after sending the message (default: False).
  • poll (bool, optional): Whether to wait for any outstanding messages to be sent before returning (default: True).
  • poll_timeout (float, optional): The maximum amount of time to wait for outstanding messages to be sent, in seconds (default: 1).
  • **kwargs: Additional keyword arguments to pass to the underlying Kafka producer.

Returns: None

Raises: KafkaError: If there is an error producing the message.

Note:

  • If flush is True, any outstanding messages in the producer's buffer will be sent immediately after the current message is sent.
  • If poll is True, the producer will wait for any outstanding messages to be sent before returning, up to the specified poll_timeout.
  • The poll argument is only relevant if flush is False, since the producer always waits for outstanding messages to be sent before flushing.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flask-and-kafka-0.0.4.tar.gz (6.6 kB view details)

Uploaded Source

Built Distribution

flask_and_kafka-0.0.4-py3-none-any.whl (8.1 kB view details)

Uploaded Python 3

File details

Details for the file flask-and-kafka-0.0.4.tar.gz.

File metadata

  • Download URL: flask-and-kafka-0.0.4.tar.gz
  • Upload date:
  • Size: 6.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.2

File hashes

Hashes for flask-and-kafka-0.0.4.tar.gz
Algorithm Hash digest
SHA256 95a4ead48eca6dce660e0d21b0dd1cc124948613946a3800c5a210dc981f9b95
MD5 83446466833063015bc0d060223827cf
BLAKE2b-256 1df3500d68e2b6cf877683c5b51074e2a763b6a635d971565fea3490831f3825

See more details on using hashes here.

File details

Details for the file flask_and_kafka-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for flask_and_kafka-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 4d2c0d5f1c313b6a4d8fc176ee844e42fc952f508b2ef4757be1fa5ba962bd13
MD5 4b4b6349f83546b73591d7a7f429d3a6
BLAKE2b-256 34f0c936a5159b46b8b8958974bcbcd0f79cba054a7598c29f3e07118b125503

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page