Skip to main content

Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.

Project description

Readme

🚀 PGQueuer - Building Smoother Workflows One Queue at a Time 🚀

CI pypi downloads versions


📚 Documentation: Explore the Docs 📖

🔍 Source Code: View on GitHub 💾

💬 Join the Discussion: Discord Community


PGQueuer

PGQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PGQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.

Features

  • Simple Integration: Easy to integrate with existing Python applications using PostgreSQL.
  • Efficient Concurrency Handling: Utilizes PostgreSQL's FOR UPDATE SKIP LOCKED for reliable and concurrent job processing.
  • Real-time Notifications: Leverages LISTEN and NOTIFY for real-time updates on job status changes.

Installation

To install PGQueuer, simply install with pip the following command:

pip install pgqueuer

Example Usage

Here's how you can use PGQueuer in a typical scenario processing incoming data messages:

Write and run a consumer

Start a long-lived consumer that will begin processing jobs as soon as they are enqueued by another process. In this case we want to be a bit more carefull as we want gracefull shutdowns, pgqueuer run will setup signals to ensure this.

from __future__ import annotations

import asyncpg
from pgqueuer.db import AsyncpgDriver, dsn
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager


async def main() -> QueueManager:
    connection = await asyncpg.connect(dsn())
    driver = AsyncpgDriver(connection)
    qm = QueueManager(driver)

    # Setup the 'fetch' entrypoint
    @qm.entrypoint("fetch")
    async def process_message(job: Job) -> None:
        print(f"Processed message: {job}")

    return qm
python3 -m pgqueuer run tools.consumer.main

Write and run a producer

Start a short-lived producer that will enqueue 10,000 jobs.

from __future__ import annotations

import asyncio
import sys

import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries


async def main(N: int) -> None:
    connection = await asyncpg.connect()
    driver = AsyncpgDriver(connection)
    queries = Queries(driver)
    await queries.enqueue(
        ["fetch"] * N,
        [f"this is from me: {n}".encode() for n in range(1, N+1)],
        [0] * N,
    )


if __name__ == "__main__":
    print(sys.argv)
    N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1])
    asyncio.run(main(N))
python3 tools/producer.py 10000

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgqueuer-0.9.1.tar.gz (183.2 kB view details)

Uploaded Source

Built Distribution

pgqueuer-0.9.1-py3-none-any.whl (34.1 kB view details)

Uploaded Python 3

File details

Details for the file pgqueuer-0.9.1.tar.gz.

File metadata

  • Download URL: pgqueuer-0.9.1.tar.gz
  • Upload date:
  • Size: 183.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.6

File hashes

Hashes for pgqueuer-0.9.1.tar.gz
Algorithm Hash digest
SHA256 52cd74783bf7900946a3f2ee7eb4cd7518e1d555da9ce4350d025a17fa39b24b
MD5 d9c8bbfe72433bfe68e9612a11da9d9c
BLAKE2b-256 f8c20e03cb5e13862aca5a8dbfc6d9ba3044e8afc438d2b3ab306ae0db1a28af

See more details on using hashes here.

File details

Details for the file pgqueuer-0.9.1-py3-none-any.whl.

File metadata

  • Download URL: pgqueuer-0.9.1-py3-none-any.whl
  • Upload date:
  • Size: 34.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.6

File hashes

Hashes for pgqueuer-0.9.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2e8cf6e3f38876c33a556853f54ad8e1195b7f8c7be66cbe33438bba5b630ff1
MD5 bcfb1f4958b44e38a5836ed03e8626ff
BLAKE2b-256 291cab82fb28c2f3ad28cec1ba4f2a0615a1473d17709aa7b967271afaaca703

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page