Skip to main content

Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.

Project description

Readme

🚀 PGQueuer - Building Smoother Workflows One Queue at a Time 🚀

CI pypi downloads versions


📚 Documentation: Explore the Docs 📖

🔍 Source Code: View on GitHub 💾

💬 Join the Discussion: Discord Community


PGQueuer

PGQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PGQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.

Features

  • Simple Integration: Easy to integrate with existing Python applications using PostgreSQL.
  • Efficient Concurrency Handling: Utilizes PostgreSQL's FOR UPDATE SKIP LOCKED for reliable and concurrent job processing.
  • Real-time Notifications: Leverages LISTEN and NOTIFY for real-time updates on job status changes.

Installation

To install PGQueuer, simply install with pip the following command:

pip install pgqueuer

Example Usage

Here's how you can use PGQueuer in a typical scenario processing incoming data messages:

Write and run a consumer

Start a long-lived consumer that will begin processing jobs as soon as they are enqueued by another process. In this case we want to be a bit more carefull as we want gracefull shutdowns, pgqueuer run will setup signals to ensure this.

from __future__ import annotations

import asyncpg
from pgqueuer.db import AsyncpgDriver, dsn
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager


async def main() -> QueueManager:
    connection = await asyncpg.connect(dsn())
    driver = AsyncpgDriver(connection)
    qm = QueueManager(driver)

    # Setup the 'fetch' entrypoint
    @qm.entrypoint("fetch")
    async def process_message(job: Job) -> None:
        print(f"Processed message: {job}")

    return qm
python3 -m pgqueuer run tools.consumer.main

Write and run a producer

Start a short-lived producer that will enqueue 10,000 jobs.

from __future__ import annotations

import asyncio
import sys

import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries


async def main(N: int) -> None:
    connection = await asyncpg.connect()
    driver = AsyncpgDriver(connection)
    queries = Queries(driver)
    await queries.enqueue(
        ["fetch"] * N,
        [f"this is from me: {n}".encode() for n in range(1, N+1)],
        [0] * N,
    )


if __name__ == "__main__":
    print(sys.argv)
    N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1])
    asyncio.run(main(N))
python3 tools/producer.py 10000

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgqueuer-0.9.2.tar.gz (184.4 kB view details)

Uploaded Source

Built Distribution

pgqueuer-0.9.2-py3-none-any.whl (34.1 kB view details)

Uploaded Python 3

File details

Details for the file pgqueuer-0.9.2.tar.gz.

File metadata

  • Download URL: pgqueuer-0.9.2.tar.gz
  • Upload date:
  • Size: 184.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pgqueuer-0.9.2.tar.gz
Algorithm Hash digest
SHA256 76a92ab37abc58ad612dd558b1dcab8ead85968c26ffee909878ab0bb56fa905
MD5 24426eb8836b2c54650a88ceeebd8170
BLAKE2b-256 597fb8f4f7c1509609790289a4c9b9ea48e115e9c1b1d2c370c839bf80013699

See more details on using hashes here.

File details

Details for the file pgqueuer-0.9.2-py3-none-any.whl.

File metadata

  • Download URL: pgqueuer-0.9.2-py3-none-any.whl
  • Upload date:
  • Size: 34.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pgqueuer-0.9.2-py3-none-any.whl
Algorithm Hash digest
SHA256 9efd58399615bc5105495d46ac22d214b94550a98c3b6b659ff20f2d5d226431
MD5 6fcd41ffebad14e6bbb671b30d603c82
BLAKE2b-256 1264b29693d6469363e73bf6fcadffff9b09a4ee82cac013b1f6feebbeac9c16

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page