Kafka Job Queue for Python
Project description
KQ: Kafka Job Queue for Python
KQ (Kafka Queue) is a lightweight Python library which lets you enqueue and execute jobs asynchronously using Apache Kafka. It uses kafka-python under the hood.
Announcements
- Support for Python 3.5 will be dropped from KQ version 3.0.0.
- See releases for latest updates.
Requirements
- Apache Kafka 0.9+
- Python 3.6+
Installation
Install using pip:
pip install kq
Getting Started
Start your Kafka instance. Example using Docker:
docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev
Define your KQ worker.py
module:
import logging
from kafka import KafkaConsumer
from kq import Worker
# Set up logging.
formatter = logging.Formatter("[%(levelname)s] %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger("kq.worker")
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
# Set up a Kafka consumer.
consumer = KafkaConsumer(
bootstrap_servers="127.0.0.1:9092",
group_id="group",
auto_offset_reset="latest"
)
# Set up a worker.
worker = Worker(topic="topic", consumer=consumer)
worker.start()
Start your worker:
python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ...
Enqueue a function call:
import requests
from kafka import KafkaProducer
from kq import Queue
# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers="127.0.0.1:9092")
# Set up a queue.
queue = Queue(topic="topic", producer=producer)
# Enqueue a function call.
job = queue.enqueue(requests.get, "https://google.com")
# You can also specify the job timeout, Kafka message key and partition.
job = queue.using(timeout=5, key=b"foo", partition=0).enqueue(requests.get, "https://google.com")
The worker executes the job in the background:
python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get("https://www.google.com")
[INFO] Job c7bf2359 returned: <Response [200]>
See the documentation for more information.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
kq-2.2.1.tar.gz
(18.2 kB
view details)
Built Distribution
kq-2.2.1-py3-none-any.whl
(11.6 kB
view details)
File details
Details for the file kq-2.2.1.tar.gz
.
File metadata
- Download URL: kq-2.2.1.tar.gz
- Upload date:
- Size: 18.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.10.1 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5f5e9827cd9f223fb3cac4202d58549f10488256cd699ca4a22f63ecda9a3e8a |
|
MD5 | 574c8ba00d6bafa1dd96e5da357f67b3 |
|
BLAKE2b-256 | 204e3f90c51ed89ad9dd9bff0a99f9331350f008137e78b1eb72f74ca628b6d8 |
File details
Details for the file kq-2.2.1-py3-none-any.whl
.
File metadata
- Download URL: kq-2.2.1-py3-none-any.whl
- Upload date:
- Size: 11.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.10.1 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 57bdbf0fb36735d503ebfee6a52c10947f04b10312bfeb6bc1868705ee6e41bc |
|
MD5 | b1a7300816b0a0a32ff40f44b222b88a |
|
BLAKE2b-256 | 509af0a70d1da4a41c91a54ff41eee8913a39ba43f5f35f966b7e218011edd55 |