Running kombu consumers with support of liveness probe for kubernetes
Project description
Kube Kombu
This project wraps the kombu consumer of python for the use with writing consumer for RabbitMQ custom pubsub. Since Kombu Consumer doesn't come with liveness check this package provides liveness check on the top of kombu consumers. This package exposes a TCP port which can be added to kubernetes liveness probe.
Logic for liveness probe is as follows:-
- Open an asnycio server TCP port in the same process as Consumer.
- On Message Received of TCP checks for liveness of the threads and connection of the Kombu Consumers.
- If the rabbit consumers are found to be inactive this closes the TCP port.
- Once the port is closed liveness checks will fail the next time leading to restart of pod
Setup and Running Kombu consumers
Installation Steps:
- Install python 3.9 or greater on your system using pyenv
- Activate your project's virtual environment for installing this library
$ source <virtualenv-path>/bin/activate
- Install consumer library by running
$ pip install kube_kombu
Implementation Steps
If you are using django you'll need to setup the django project before running the start_consumer
.
Example :-
import django
from django.conf import settings
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "project.settings")
django.setup()
There are three variables that can be defined in your django settings file or in environment variables or as constants in your project:
RABBITMQ
: This can be a dictionary containing rabbitmq related variables ####Example:
RABBITMQ = {
"URL": "<RABBIT_URL>",
"EXCHANGE": "<RABBIT_EXCHANGE>",
"EXCHANGE_TYPE": "<RABBIT_EXCHANGE_TYPE>",
"ROUTING_KEY": "<RABBIT_ROUTING_KEY>",
"QUEUE": "<RABBIT_QUEUE>"
}
Once you have defined the rabbit config you need to define consumer_config :-
from kube_kombu.consumer_config import ConsumerConfig
consumer_config = ConsumerConfig(
"URL",
"EXCHANGE",
"EXCHANGE_TYPE",
"ROUTING_KEY",
"QUEUE",
)
AbstractConsumerAdapter
defines the abstract method callback
which you need to extend your class with and implement your own adapter on what you want to do on receiving the message.
Sample Adapter Can be written as :-
import json
import logging
from kube_kombu.abstract_consumer_adapter import AbstractConsumerAdapter
LOGGER = logging.getLogger(__name__)
class SampleConsumerAdapter(AbstractConsumerAdapter):
@classmethod
def handle_event1(cls, data):
pass
@classmethod
def handle_event2(cls, data):
pass
def callback(self, data, message):
try:
if isinstance(data, str):
data = json.loads(data)
if data["event"] == "event1":
SampleConsumerAdapter.handle_event1(data["payload"])
elif data["event"] == "event2":
SampleConsumerAdapter.handle_event2(data["payload"])
except Exception as e:
LOGGER.exception(f"Exception in callback, Error: {e}")
message.ack()
DONOT FORGET TO ACK THE MESSAGE at the end of callback
Once you have implemented the Adapters and config of your own you will now need to start the consumer which can be done as follows:-
from kube_kombu.start_consumer import start_consumer
start_consumer(
consumer_config,
SampleConsumerAdapter,
host,
port
)
At the end if you want to run the Kombu Consumers on Pod you can implement the __main__
as follows:-
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Setup Host and Port for Kube Liveness check')
parser.add_argument('--port', type=int, metavar='path', default=8988,
help='Post to start TCP healthCheck server on. Default is 8988')
parser.add_argument('--host', metavar='path', default="0.0.0.0",
help='IP host to start health check server on. Default is 0.0.0.0')
args = parser.parse_args()
main(args.host, args.port)
This will help you to pass the post and host from docker RUN
command instead.
HEALTHCHECK_HOST
: Host on which the consumer thread will open a port fot liveness check. Keep it0.0.0.0
for use with Kubernetes liveness check.HEALTHCHECK_PORT
: Port which will be opened by consumer thread for liveness check. Use the same port as you would use withEXPOSE
command in docker
Example
Sample scripts are written in the sample
directory for defining and initializing the consumer
check_liveness_probe.py
file is for testing the socket connection locally
You should now be able to use tcp liveness probe in kubernetes for liveness check of the pod.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file kube_kombu-2.0.0.tar.gz
.
File metadata
- Download URL: kube_kombu-2.0.0.tar.gz
- Upload date:
- Size: 6.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.15.0 pkginfo/1.8.3 requests/2.27.1 setuptools/41.2.0 requests-toolbelt/1.0.0 tqdm/4.64.1 CPython/2.7.18
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6be4c6a1d671829f8c666ea5202f298bf0be0385f5b743187f73a28884bf7fca |
|
MD5 | 39dd99d796833d1021748b73fbf243a1 |
|
BLAKE2b-256 | 0ec87cdd3ff2c4efea3caef7cdb59f3ec96f191c5fb4890b2008d208f7f2cb19 |