Skip to main content

Kafka Job Queue for Python

Project description

KQ: Kafka Job Queue for Python

Build CodeQL codecov PyPI version GitHub license Python version

KQ (Kafka Queue) is a lightweight Python library which lets you enqueue and execute jobs asynchronously using Apache Kafka. It uses kafka-python under the hood.


  • Support for Python 3.5 will be dropped from KQ version 3.0.0.
  • See releases for latest updates.



Install using pip:

pip install kq

Getting Started

Start your Kafka instance. Example using Docker:

docker run -p 9092:9092 -e ADV_HOST= lensesio/fast-data-dev

Define your KQ module:

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter("[%(levelname)s] %(message)s")
stream_handler = logging.StreamHandler()
logger = logging.getLogger("kq.worker")

# Set up a Kafka consumer.
consumer = KafkaConsumer(

# Set up a worker.
worker = Worker(topic="topic", consumer=consumer)

Start your worker:

[INFO] Starting Worker(hosts= topic=topic, group=group) ...

Enqueue a function call:

import requests

from kafka import KafkaProducer
from kq import Queue

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers="")

# Set up a queue.
queue = Queue(topic="topic", producer=producer)

# Enqueue a function call.
job = queue.enqueue(requests.get, "")

# You can also specify the job timeout, Kafka message key and partition.
job = queue.using(timeout=5, key=b"foo", partition=0).enqueue(requests.get, "")

The worker executes the job in the background:

[INFO] Starting Worker(hosts=, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get("")
[INFO] Job c7bf2359 returned: <Response [200]>

See the documentation for more information.

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for kq, version 2.2.0
Filename, size File type Python version Upload date Hashes
Filename, size kq-2.2.0-py3-none-any.whl (11.5 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size kq-2.2.0.tar.gz (18.3 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring DigiCert DigiCert EV certificate Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page