Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.
Project description
🚀 PGQueuer - Building Smoother Workflows One Queue at a Time 🚀
- 📚 Documentation: Explore the Docs
- 🔍 Source Code: View on GitHub
- 💬 Join the Discussion: Discord Community
PGQueuer is a minimalist, high-performance job queue library for Python, leveraging PostgreSQL's robustness. Designed with simplicity and efficiency in mind, PGQueuer offers real-time, high-throughput processing for background jobs using PostgreSQL's LISTEN/NOTIFY and FOR UPDATE SKIP LOCKED
mechanisms.
Features
- 💡 Simple Integration: Seamlessly integrates with Python applications using PostgreSQL, providing a clean and lightweight interface.
- ⚛️ Efficient Concurrency Handling: Supports
FOR UPDATE SKIP LOCKED
to ensure reliable concurrency control and smooth job processing without contention. - 🚧 Real-time Notifications: Uses PostgreSQL's
LISTEN
andNOTIFY
commands for real-time job status updates. - 👨🎓 Batch Processing: Supports large job batches, optimizing enqueueing and dequeuing with minimal overhead.
- ⏳ Graceful Shutdowns: Built-in signal handling ensures safe job processing shutdown without data loss.
- ⌛ Recurring Job Scheduling: Register and manage recurring tasks using cron-like expressions for periodic execution.
Installation
Install PGQueuer via pip:
pip install pgqueuer
Quick Start
Below is a minimal example of how to use PGQueuer to process data.
Step 1: Write a consumer
from __future__ import annotations
from datetime import datetime
import asyncpg
from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Job, Schedule
async def main() -> PgQueuer:
connection = await asyncpg.connect()
driver = AsyncpgDriver(connection)
pgq = PgQueuer(driver)
# Entrypoint for jobs whos entrypoint is named 'fetch'.
@pgq.entrypoint("fetch")
async def process_message(job: Job) -> None:
print(f"Processed message: {job!r}")
# Define and register recurring tasks using cron expressions
# The cron expression "* * * * *" means the task will run every minute
@pgq.schedule("scheduled_every_minute", "* * * * *")
async def scheduled_every_minute(schedule: Schedule) -> None:
print(f"Executed every minute {schedule!r} {datetime.now()!r}")
return pgq
The above example is located in the examples folder, and can be run by using the pgq
cli.
pgq run examples.consumer.main
Step 2: Write a producer
from __future__ import annotations
import asyncio
import sys
import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
async def main(N: int) -> None:
connection = await asyncpg.connect()
driver = AsyncpgDriver(connection)
queries = Queries(driver)
await queries.enqueue(
["fetch"] * N,
[f"this is from me: {n}".encode() for n in range(1, N + 1)],
[0] * N,
)
if __name__ == "__main__":
N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1])
asyncio.run(main(N))
Run the producer:
python3 examples/producer.py 10000
Dashboard
Monitor job processing statistics in real-time using the built-in dashboard:
pgq dashboard --interval 10 --tail 25 --table-format grid
This provides a real-time, refreshing view of job queues and their status.
Example output:
+---------------------------+-------+------------+--------------------------+------------+----------+
| Created | Count | Entrypoint | Time in Queue (HH:MM:SS) | Status | Priority |
+---------------------------+-------+------------+--------------------------+------------+----------+
| 2024-05-05 16:44:26+00:00 | 49 | sync | 0:00:01 | successful | 0 |
...
+---------------------------+-------+------------+--------------------------+------------+----------+
Why Choose PGQueuer?
- Built for Scale: Handles thousands of jobs per second, making it ideal for high-throughput applications.
- PostgreSQL Native: Utilizes advanced PostgreSQL features for robust job handling.
- Flexible Concurrency: Offers rate and concurrency limiting to cater to different use-cases, from bursty workloads to critical resource-bound tasks.
License
PGQueuer is MIT licensed. See LICENSE for more information.
Ready to supercharge your workflows? Install PGQueuer today and take your job management to the next level!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pgqueuer-0.16.2.tar.gz
.
File metadata
- Download URL: pgqueuer-0.16.2.tar.gz
- Upload date:
- Size: 202.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d03efebd2cd6e13d8e7d3e1c537a2b60ba04b3aeddbc409047b67f767e858fde |
|
MD5 | fc3d80b46cd121465eac36506c1f2deb |
|
BLAKE2b-256 | d022b5f65f014fcf93e4df95acb58cf1466dbf48e049936ab439a9e9fe2d4642 |
Provenance
The following attestation bundles were made for pgqueuer-0.16.2.tar.gz
:
Publisher:
release.yml
on janbjorge/pgqueuer
-
Statement type:
https://in-toto.io/Statement/v1
- Predicate type:
https://docs.pypi.org/attestations/publish/v1
- Subject name:
pgqueuer-0.16.2.tar.gz
- Subject digest:
d03efebd2cd6e13d8e7d3e1c537a2b60ba04b3aeddbc409047b67f767e858fde
- Sigstore transparency entry: 149346932
- Sigstore integration time:
- Predicate type:
File details
Details for the file pgqueuer-0.16.2-py3-none-any.whl
.
File metadata
- Download URL: pgqueuer-0.16.2-py3-none-any.whl
- Upload date:
- Size: 42.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d4866050693237730c44f91cd8ac579323cffc5d6c5ce0f2c0af568eb651942d |
|
MD5 | 46ffe6d6c6552d10eb7fb8ae35ec5eeb |
|
BLAKE2b-256 | a83a31707951b9c5bae93d0b50a7c6896b26747a434e712374c34ab44685c2bd |
Provenance
The following attestation bundles were made for pgqueuer-0.16.2-py3-none-any.whl
:
Publisher:
release.yml
on janbjorge/pgqueuer
-
Statement type:
https://in-toto.io/Statement/v1
- Predicate type:
https://docs.pypi.org/attestations/publish/v1
- Subject name:
pgqueuer-0.16.2-py3-none-any.whl
- Subject digest:
d4866050693237730c44f91cd8ac579323cffc5d6c5ce0f2c0af568eb651942d
- Sigstore transparency entry: 149346933
- Sigstore integration time:
- Predicate type: