Skip to main content

Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.

Project description

🚀 PGQueuer - Building Smoother Workflows One Queue at a Time 🚀

CI pypi downloads versions



PGQueuer is a minimalist, high-performance job queue library for Python, leveraging PostgreSQL's robustness. Designed with simplicity and efficiency in mind, PGQueuer offers real-time, high-throughput processing for background jobs using PostgreSQL's LISTEN/NOTIFY and FOR UPDATE SKIP LOCKED mechanisms.

Features

  • 💡 Simple Integration: Seamlessly integrates with Python applications using PostgreSQL, providing a clean and lightweight interface.
  • ⚛️ Efficient Concurrency Handling: Supports FOR UPDATE SKIP LOCKED to ensure reliable concurrency control and smooth job processing without contention.
  • 🚧 Real-time Notifications: Uses PostgreSQL's LISTEN and NOTIFY commands for real-time job status updates.
  • 👨‍🎓 Batch Processing: Supports large job batches, optimizing enqueueing and dequeuing with minimal overhead.
  • ⏳ Graceful Shutdowns: Built-in signal handling ensures safe job processing shutdown without data loss.
  • ⌛ Recurring Job Scheduling: Register and manage recurring tasks using cron-like expressions for periodic execution.

Installation

Install PGQueuer via pip:

pip install pgqueuer

Quick Start

Below is a minimal example of how to use PGQueuer to process data.

Step 1: Write a consumer

from __future__ import annotations

from datetime import datetime

import asyncpg

from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Job, Schedule


async def main() -> PgQueuer:
    connection = await asyncpg.connect()
    driver = AsyncpgDriver(connection)
    pgq = PgQueuer(driver)

    # Entrypoint for jobs whos entrypoint is named 'fetch'.
    @pgq.entrypoint("fetch")
    async def process_message(job: Job) -> None:
        print(f"Processed message: {job!r}")

    # Define and register recurring tasks using cron expressions
    # The cron expression "* * * * *" means the task will run every minute
    @pgq.schedule("scheduled_every_minute", "* * * * *")
    async def scheduled_every_minute(schedule: Schedule) -> None:
        print(f"Executed every minute {schedule!r} {datetime.now()!r}")

    return pgq

The above example is located in the examples folder, and can be run by using the pgq cli.

pgq run examples.consumer.main

Step 2: Write a producer

from __future__ import annotations

import asyncio
import sys

import asyncpg

from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries


async def main(N: int) -> None:
    connection = await asyncpg.connect()
    driver = AsyncpgDriver(connection)
    queries = Queries(driver)
    await queries.enqueue(
        ["fetch"] * N,
        [f"this is from me: {n}".encode() for n in range(1, N + 1)],
        [0] * N,
    )


if __name__ == "__main__":
    N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1])
    asyncio.run(main(N))

Run the producer:

python3 examples/producer.py 10000

Dashboard

Monitor job processing statistics in real-time using the built-in dashboard:

pgq dashboard --interval 10 --tail 25 --table-format grid

This provides a real-time, refreshing view of job queues and their status.

Example output:

+---------------------------+-------+------------+--------------------------+------------+----------+
|          Created          | Count | Entrypoint | Time in Queue (HH:MM:SS) |   Status   | Priority |
+---------------------------+-------+------------+--------------------------+------------+----------+
| 2024-05-05 16:44:26+00:00 |  49   |    sync    |         0:00:01          | successful |    0     |
...
+---------------------------+-------+------------+--------------------------+------------+----------+

Why Choose PGQueuer?

  • Built for Scale: Handles thousands of jobs per second, making it ideal for high-throughput applications.
  • PostgreSQL Native: Utilizes advanced PostgreSQL features for robust job handling.
  • Flexible Concurrency: Offers rate and concurrency limiting to cater to different use-cases, from bursty workloads to critical resource-bound tasks.

License

PGQueuer is MIT licensed. See LICENSE for more information.


Ready to supercharge your workflows? Install PGQueuer today and take your job management to the next level!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgqueuer-0.16.2.tar.gz (202.1 kB view details)

Uploaded Source

Built Distribution

pgqueuer-0.16.2-py3-none-any.whl (42.3 kB view details)

Uploaded Python 3

File details

Details for the file pgqueuer-0.16.2.tar.gz.

File metadata

  • Download URL: pgqueuer-0.16.2.tar.gz
  • Upload date:
  • Size: 202.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pgqueuer-0.16.2.tar.gz
Algorithm Hash digest
SHA256 d03efebd2cd6e13d8e7d3e1c537a2b60ba04b3aeddbc409047b67f767e858fde
MD5 fc3d80b46cd121465eac36506c1f2deb
BLAKE2b-256 d022b5f65f014fcf93e4df95acb58cf1466dbf48e049936ab439a9e9fe2d4642

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgqueuer-0.16.2.tar.gz:

Publisher: release.yml on janbjorge/pgqueuer

Attestations:

File details

Details for the file pgqueuer-0.16.2-py3-none-any.whl.

File metadata

  • Download URL: pgqueuer-0.16.2-py3-none-any.whl
  • Upload date:
  • Size: 42.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pgqueuer-0.16.2-py3-none-any.whl
Algorithm Hash digest
SHA256 d4866050693237730c44f91cd8ac579323cffc5d6c5ce0f2c0af568eb651942d
MD5 46ffe6d6c6552d10eb7fb8ae35ec5eeb
BLAKE2b-256 a83a31707951b9c5bae93d0b50a7c6896b26747a434e712374c34ab44685c2bd

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgqueuer-0.16.2-py3-none-any.whl:

Publisher: release.yml on janbjorge/pgqueuer

Attestations:

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page