Skip to main content

Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.

Project description

Readme

🚀 PGQueuer - Building Smoother Workflows One Queue at a Time 🚀

CI pypi downloads versions


📚 Documentation: Explore the Docs 📖

🔍 Source Code: View on GitHub 💾

💬 Join the Discussion: Discord Community


PGQueuer

PGQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PGQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.

Features

  • Simple Integration: Easy to integrate with existing Python applications using PostgreSQL.
  • Efficient Concurrency Handling: Utilizes PostgreSQL's FOR UPDATE SKIP LOCKED for reliable and concurrent job processing.
  • Real-time Notifications: Leverages LISTEN and NOTIFY for real-time updates on job status changes.

Installation

To install PGQueuer, simply install with pip the following command:

pip install pgqueuer

Example Usage

Here's how you can use PGQueuer in a typical scenario processing incoming data messages:

Write and run a consumer

Start a long-lived consumer that will begin processing jobs as soon as they are enqueued by another process. In this case we want to be a bit more carefull as we want gracefull shutdowns, pgqueuer run will setup signals to ensure this.

from __future__ import annotations

import asyncpg
from pgqueuer.db import AsyncpgDriver, dsn
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager


async def main() -> QueueManager:
    connection = await asyncpg.connect(dsn())
    driver = AsyncpgDriver(connection)
    qm = QueueManager(driver)

    # Setup the 'fetch' entrypoint
    @qm.entrypoint("fetch")
    async def process_message(job: Job) -> None:
        print(f"Processed message: {job}")

    return qm
python3 -m pgqueuer run tools.consumer.main

Write and run a producer

Start a short-lived producer that will enqueue 10,000 jobs.

from __future__ import annotations

import asyncio
import sys

import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries


async def main(N: int) -> None:
    connection = await asyncpg.connect()
    driver = AsyncpgDriver(connection)
    queries = Queries(driver)
    await queries.enqueue(
        ["fetch"] * N,
        [f"this is from me: {n}".encode() for n in range(1, N+1)],
        [0] * N,
    )


if __name__ == "__main__":
    print(sys.argv)
    N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1])
    asyncio.run(main(N))
python3 tools/producer.py 10000

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgqueuer-0.11.0.tar.gz (188.6 kB view details)

Uploaded Source

Built Distribution

pgqueuer-0.11.0-py3-none-any.whl (34.9 kB view details)

Uploaded Python 3

File details

Details for the file pgqueuer-0.11.0.tar.gz.

File metadata

  • Download URL: pgqueuer-0.11.0.tar.gz
  • Upload date:
  • Size: 188.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pgqueuer-0.11.0.tar.gz
Algorithm Hash digest
SHA256 deb270a1944b6a7a2ceb439c6e2745b850534c60dcbc18b6abc560ab724bf0c2
MD5 ea58cc3b7e8a220108b60aaef8f98a44
BLAKE2b-256 48144ad6e491ff8486876c588bb0127cb737bef8aaf6f9b3a76567ad4955f92b

See more details on using hashes here.

File details

Details for the file pgqueuer-0.11.0-py3-none-any.whl.

File metadata

  • Download URL: pgqueuer-0.11.0-py3-none-any.whl
  • Upload date:
  • Size: 34.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pgqueuer-0.11.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f3d9497ab737a54f92b3c3abe1b79a07ae7dac0c5ad812456d2091e6e93e2b2b
MD5 b9f640222cbd292ef592033313cbabc4
BLAKE2b-256 493523538781898e3f3e31618655d6b147ae0b09a3043c690ed49695b62f1298

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page