Skip to main content

Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.

Project description

🚀 PGQueuer - Building Smoother Workflows One Queue at a Time 🚀

CI pypi downloads versions



PGQueuer is a minimalist, high-performance job queue library for Python, leveraging PostgreSQL's robustness. Designed with simplicity and efficiency in mind, PGQueuer offers real-time, high-throughput processing for background jobs using PostgreSQL's LISTEN/NOTIFY and FOR UPDATE SKIP LOCKED mechanisms.

Features

  • 💡 Simple Integration: PGQueuer seamlessly integrates with any Python application using PostgreSQL, providing a clean and lightweight interface.
  • ⚛️ Efficient Concurrency Handling: Supports FOR UPDATE SKIP LOCKED to ensure reliable concurrency control and smooth job processing without contention.
  • 🚧 Real-time Notifications: Uses PostgreSQL's LISTEN and NOTIFY commands to trigger real-time job status updates.
  • 👨‍🎓 Batch Processing: Supports large job batches, optimizing enqueueing and dequeuing with minimal overhead.
  • ⏳ Graceful Shutdowns: Built-in signal handling ensures safe job processing shutdown without data loss.
  • ⌛ Recurring Job Scheduling: Register and manage recurring tasks using cron-like expressions for periodic execution.

Installation

Install PGQueuer via pip:

pip install pgqueuer

Quick Start

Below is a minimal example of how to use PGQueuer to process data.

Step 1: Consumer - Run the Worker

Start a consumer to process incoming jobs as soon as they are enqueued. PGQueuer ensures graceful shutdowns using pre-configured signal handlers.

import asyncpg
from pgqueuer.db import AsyncpgDriver, dsn
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager

async def main() -> QueueManager:
    connection = await asyncpg.connect(dsn())
    driver = AsyncpgDriver(connection)
    qm = QueueManager(driver)

    @qm.entrypoint("fetch")
    async def process_message(job: Job) -> None:
        print(f"Processed message: {job}")

    return qm

Run the consumer:

pgq run examples.consumer.main

Step 2: Producer - Add Jobs to Queue

Now, produce jobs that will be processed by the consumer. Below is a simple script to enqueue 10,000 jobs.

import asyncio
import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries

async def main(N: int) -> None:
    connection = await asyncpg.connect()
    driver = AsyncpgDriver(connection)
    queries = Queries(driver)
    await queries.enqueue(
        ["fetch"] * N,
        [f"this is from me: {n}".encode() for n in range(1, N + 1)],
        [0] * N,
    )

if __name__ == "__main__":
    asyncio.run(main(10000))

Run the producer:

python3 examples/producer.py 10000

Step 3: Scheduler - Recurring Jobs

PGQueuer also supports recurring job scheduling, allowing you to register tasks that run periodically based on cron-like expressions.

Here is a minimal example of how to use the scheduling feature to run tasks periodically:

import asyncio
import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.scheduler import Scheduler
from pgqueuer.models import Schedule

async def create_scheduler() -> Scheduler:
    connection = await asyncpg.connect("postgresql://user:password@localhost:5432/yourdatabase")
    driver = AsyncpgDriver(connection)
    scheduler = Scheduler(driver)

    # Define and register recurring tasks using cron expressions
    # The cron expression "* * * * *" means the task will run every minute
    @scheduler.schedule("update_product_catalog", "* * * * *")
    async def update_product_catalog(schedule: Schedule) -> None:
        print(f"Running update_product_catalog task: {schedule}")
        await asyncio.sleep(0.1)
        print("update_product_catalog task completed.")

    # The cron expression "0 0 * * *" means the task will run every day at midnight
    @scheduler.schedule("clean_expired_tokens", "0 0 * * *")
    async def clean_expired_tokens(schedule: Schedule) -> None:
        print(f"Running clean_expired_tokens task: {schedule}")
        await asyncio.sleep(0.2)
        print("clean_expired_tokens task completed.")

    return scheduler

async def main():
    # Create and run the scheduler
    scheduler = await create_scheduler()
    await scheduler.run()

if __name__ == "__main__":
    asyncio.run(main())

Run the scheduler:

pgq run myapp.create_scheduler

This example showcases how you can use the new scheduling feature to automate recurring tasks such as data synchronization or cleanup jobs.

Dashboard

Monitor job processing statistics in real-time using the built-in dashboard:

pgq dashboard --interval 10 --tail 25 --table-format grid

This provides a real-time, refreshing view of job queues and their status.

Example output:

+---------------------------+-------+------------+--------------------------+------------+----------+
|          Created          | Count | Entrypoint | Time in Queue (HH:MM:SS) |   Status   | Priority |
+---------------------------+-------+------------+--------------------------+------------+----------+
| 2024-05-05 16:44:26+00:00 |  49   |    sync    |         0:00:01          | successful |    0     |
...
+---------------------------+-------+------------+--------------------------+------------+----------+

Why Choose PGQueuer?

  • Built for Scale: Handles thousands of jobs per second, making it ideal for high-throughput applications.
  • PostgreSQL Native: Utilizes advanced PostgreSQL features for robust job handling.
  • Flexible Concurrency: Offers rate and concurrency limiting to cater to different use-cases, from bursty workloads to critical resource-bound tasks.

License

PGQueuer is MIT licensed. See LICENSE for more information.


Ready to supercharge your workflows? Install PGQueuer today and take your job management to the next level!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgqueuer-0.16.0.tar.gz (200.7 kB view details)

Uploaded Source

Built Distribution

pgqueuer-0.16.0-py3-none-any.whl (41.6 kB view details)

Uploaded Python 3

File details

Details for the file pgqueuer-0.16.0.tar.gz.

File metadata

  • Download URL: pgqueuer-0.16.0.tar.gz
  • Upload date:
  • Size: 200.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pgqueuer-0.16.0.tar.gz
Algorithm Hash digest
SHA256 8602818a9a7d8b7b452e2607277162850ff9c1ef205f5aefa9b8c70c55c6287a
MD5 32adb80133864596eca1e99e3c6581ad
BLAKE2b-256 8364c81de3db22cdabe8d7768ae9feee57523bee9758b0ba8e4a8bcd12bdd92d

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgqueuer-0.16.0.tar.gz:

Publisher: release.yml on janbjorge/pgqueuer

Attestations:

File details

Details for the file pgqueuer-0.16.0-py3-none-any.whl.

File metadata

  • Download URL: pgqueuer-0.16.0-py3-none-any.whl
  • Upload date:
  • Size: 41.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pgqueuer-0.16.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2c1ad307908323abab83da9505c3fc02c481d085c625690dc547c457bcee3446
MD5 91ef6e59d7c280c5ec0c05e73fad595e
BLAKE2b-256 e8ec49521e2154055f9c7a819288fb69e840d7ece3a3318f761a0721cd2af059

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgqueuer-0.16.0-py3-none-any.whl:

Publisher: release.yml on janbjorge/pgqueuer

Attestations:

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page