Kafka Job Queue for Python
Project description
KQ: Kafka Job Queue for Python
KQ (Kafka Queue) is a lightweight Python library which lets you enqueue and execute jobs asynchronously using Apache Kafka. It uses kafka-python under the hood.
Announcements
- Support for Python 3.5 will be dropped from KQ version 3.0.0.
- See releases for latest updates.
Requirements
- Apache Kafka 0.9+
- Python 3.6+
Installation
Install using pip:
pip install kq
Getting Started
Start your Kafka instance. Example using Docker:
docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev
Define your KQ worker.py module:
import logging
from kafka import KafkaConsumer
from kq import Worker
# Set up logging.
formatter = logging.Formatter("[%(levelname)s] %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger("kq.worker")
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
# Set up a Kafka consumer.
consumer = KafkaConsumer(
bootstrap_servers="127.0.0.1:9092",
group_id="group",
auto_offset_reset="latest"
)
# Set up a worker.
worker = Worker(topic="topic", consumer=consumer)
worker.start()
Start your worker:
python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ...
Enqueue a function call:
import requests
from kafka import KafkaProducer
from kq import Queue
# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers="127.0.0.1:9092")
# Set up a queue.
queue = Queue(topic="topic", producer=producer)
# Enqueue a function call.
job = queue.enqueue(requests.get, "https://google.com")
# You can also specify the job timeout, Kafka message key and partition.
job = queue.using(timeout=5, key=b"foo", partition=0).enqueue(requests.get, "https://google.com")
The worker executes the job in the background:
python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get("https://www.google.com")
[INFO] Job c7bf2359 returned: <Response [200]>
See the documentation for more information.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kq-2.2.1.tar.gz.
File metadata
- Download URL: kq-2.2.1.tar.gz
- Upload date:
- Size: 18.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.10.1 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5f5e9827cd9f223fb3cac4202d58549f10488256cd699ca4a22f63ecda9a3e8a
|
|
| MD5 |
574c8ba00d6bafa1dd96e5da357f67b3
|
|
| BLAKE2b-256 |
204e3f90c51ed89ad9dd9bff0a99f9331350f008137e78b1eb72f74ca628b6d8
|
File details
Details for the file kq-2.2.1-py3-none-any.whl.
File metadata
- Download URL: kq-2.2.1-py3-none-any.whl
- Upload date:
- Size: 11.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.10.1 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
57bdbf0fb36735d503ebfee6a52c10947f04b10312bfeb6bc1868705ee6e41bc
|
|
| MD5 |
b1a7300816b0a0a32ff40f44b222b88a
|
|
| BLAKE2b-256 |
509af0a70d1da4a41c91a54ff41eee8913a39ba43f5f35f966b7e218011edd55
|