Skip to main content

Kafka Job Queue for Python

Project description

KQ: Kafka Job Queue for Python

Build CodeQL codecov PyPI version GitHub license Python version

KQ (Kafka Queue) is a lightweight Python library which lets you enqueue and execute jobs asynchronously using Apache Kafka. It uses kafka-python under the hood.

Announcements

  • Support for Python 3.5 will be dropped from KQ version 3.0.0.
  • See releases for latest updates.

Requirements

Installation

Install using pip:

pip install kq

Getting Started

Start your Kafka instance. Example using Docker:

docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev

Define your KQ worker.py module:

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter("[%(levelname)s] %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger("kq.worker")
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

# Set up a Kafka consumer.
consumer = KafkaConsumer(
    bootstrap_servers="127.0.0.1:9092",
    group_id="group",
    auto_offset_reset="latest"
)

# Set up a worker.
worker = Worker(topic="topic", consumer=consumer)
worker.start()

Start your worker:

python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ...

Enqueue a function call:

import requests

from kafka import KafkaProducer
from kq import Queue

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers="127.0.0.1:9092")

# Set up a queue.
queue = Queue(topic="topic", producer=producer)

# Enqueue a function call.
job = queue.enqueue(requests.get, "https://google.com")

# You can also specify the job timeout, Kafka message key and partition.
job = queue.using(timeout=5, key=b"foo", partition=0).enqueue(requests.get, "https://google.com")

The worker executes the job in the background:

python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get("https://www.google.com")
[INFO] Job c7bf2359 returned: <Response [200]>

See the documentation for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kq-2.2.1.tar.gz (18.2 kB view details)

Uploaded Source

Built Distribution

kq-2.2.1-py3-none-any.whl (11.6 kB view details)

Uploaded Python 3

File details

Details for the file kq-2.2.1.tar.gz.

File metadata

  • Download URL: kq-2.2.1.tar.gz
  • Upload date:
  • Size: 18.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.10.1 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2

File hashes

Hashes for kq-2.2.1.tar.gz
Algorithm Hash digest
SHA256 5f5e9827cd9f223fb3cac4202d58549f10488256cd699ca4a22f63ecda9a3e8a
MD5 574c8ba00d6bafa1dd96e5da357f67b3
BLAKE2b-256 204e3f90c51ed89ad9dd9bff0a99f9331350f008137e78b1eb72f74ca628b6d8

See more details on using hashes here.

File details

Details for the file kq-2.2.1-py3-none-any.whl.

File metadata

  • Download URL: kq-2.2.1-py3-none-any.whl
  • Upload date:
  • Size: 11.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.10.1 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2

File hashes

Hashes for kq-2.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 57bdbf0fb36735d503ebfee6a52c10947f04b10312bfeb6bc1868705ee6e41bc
MD5 b1a7300816b0a0a32ff40f44b222b88a
BLAKE2b-256 509af0a70d1da4a41c91a54ff41eee8913a39ba43f5f35f966b7e218011edd55

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page