Skip to main content

Kafka Job Queue for Python

Project description

KQ: Kafka Job Queue for Python

Build CodeQL codecov

KQ (Kafka Queue) is a lightweight Python library which lets you queue and execute jobs asynchronously using Apache Kafka. It is built on top of kafka-python.

Announcements

  • Python 3.5 will not be supported from kq v3.0.0.
  • See releases for latest updates.

Requirements

Installation

Install using pip:

pip install kq

Usage

Start your Kafka instance. Example using Kafka Docker:

docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev

Define your KQ worker.py module:

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger('kq.worker')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

# Set up a Kafka consumer.
consumer = KafkaConsumer(
    bootstrap_servers='127.0.0.1:9092',
    group_id='group',
    auto_offset_reset='latest'
)

# Set up a worker.
worker = Worker(topic='topic', consumer=consumer)
worker.start()

Start your worker:

python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ...

Enqueue a function call:

import requests

from kafka import KafkaProducer
from kq import Queue

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# Set up a queue.
queue = Queue(topic='topic', producer=producer)

# Enqueue a function call.
job = queue.enqueue(requests.get, 'https://google.com')

# You can also specify the job timeout, Kafka message key and partition.
job = queue.using(timeout=5, key=b'foo', partition=0).enqueue(requests.get, 'https://google.com')

The worker executes the job in the background:

python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get('https://www.google.com')
[INFO] Job c7bf2359 returned: <Response [200]>

See documentation for more information.

Contributing

Set up dev environment:

cd ~/your/kq/clone          # Activate venv if you have one
pip install -e .[dev]       # Install dev dependencies (black, mypy, pre-commit etc.)
pre-commit install          # Install git pre-commit hooks
py.test                     # Run unit tests

Run unit tests:

docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev  # Start Kafka docker
py.test

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kq-2.1.0.tar.gz (18.7 kB view hashes)

Uploaded Source

Built Distribution

kq-2.1.0-py3-none-any.whl (11.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page