Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.
Project description
Readme
🚀 PGQueuer - Building Smoother Workflows One Queue at a Time 🚀
📚 Documentation: Explore the Docs 📖
🔍 Source Code: View on GitHub 💾
💬 Join the Discussion: Discord Community
PGQueuer
PGQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PGQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.
Features
- Simple Integration: Easy to integrate with existing Python applications using PostgreSQL.
- Efficient Concurrency Handling: Utilizes PostgreSQL's
FOR UPDATE SKIP LOCKED
for reliable and concurrent job processing. - Real-time Notifications: Leverages
LISTEN
andNOTIFY
for real-time updates on job status changes.
Installation
To install PGQueuer, simply install with pip the following command:
pip install pgqueuer
Example Usage
Here's how you can use PGQueuer in a typical scenario processing incoming data messages:
Write and run a consumer
Start a long-lived consumer that will begin processing jobs as soon as they are enqueued by another process. In this case we want to be a bit more carefull as we want gracefull shutdowns, pgqueuer run
will setup signals to
ensure this.
from __future__ import annotations
import asyncpg
from pgqueuer.db import AsyncpgDriver, dsn
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager
async def main() -> QueueManager:
connection = await asyncpg.connect(dsn())
driver = AsyncpgDriver(connection)
qm = QueueManager(driver)
# Setup the 'fetch' entrypoint
@qm.entrypoint("fetch")
async def process_message(job: Job) -> None:
print(f"Processed message: {job}")
return qm
python3 -m pgqueuer run tools.consumer.main
Write and run a producer
Start a short-lived producer that will enqueue 10,000 jobs.
from __future__ import annotations
import asyncio
import sys
import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
async def main(N: int) -> None:
connection = await asyncpg.connect()
driver = AsyncpgDriver(connection)
queries = Queries(driver)
await queries.enqueue(
["fetch"] * N,
[f"this is from me: {n}".encode() for n in range(1, N+1)],
[0] * N,
)
if __name__ == "__main__":
print(sys.argv)
N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1])
asyncio.run(main(N))
python3 tools/producer.py 10000
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pgqueuer-0.12.0.tar.gz
.
File metadata
- Download URL: pgqueuer-0.12.0.tar.gz
- Upload date:
- Size: 188.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6ad8747f53d07f86ed39d4222799ac255b8f50ae6383844482cbf1780f628e27 |
|
MD5 | 6d46459f8b57e0fe6c00e6740accb860 |
|
BLAKE2b-256 | 2c1011b42ad2811cfdc24fea0fb6637c995306cbd619ad560d7e94543317872e |
File details
Details for the file pgqueuer-0.12.0-py3-none-any.whl
.
File metadata
- Download URL: pgqueuer-0.12.0-py3-none-any.whl
- Upload date:
- Size: 35.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0ae614b09bff46f1bf69f64243d62b35d3c147c38d95ff5c18e69e40521083d9 |
|
MD5 | b7dbdb6ad59971af3211cada8b07d86d |
|
BLAKE2b-256 | 7d119eac817a8af8883fc1b094916a83ebfc3f9a5b99ef5bc2bb5bab166789ce |