Skip to main content

Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.

Project description

Readme

🚀 PGQueuer - Building Smoother Workflows One Queue at a Time 🚀

CI pypi downloads versions


📚 Documentation: Explore the Docs 📖

🔍 Source Code: View on GitHub 💾

💬 Join the Discussion: Discord Community


PGQueuer

PGQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PGQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.

Features

  • Simple Integration: Easy to integrate with existing Python applications using PostgreSQL.
  • Efficient Concurrency Handling: Utilizes PostgreSQL's FOR UPDATE SKIP LOCKED for reliable and concurrent job processing.
  • Real-time Notifications: Leverages LISTEN and NOTIFY for real-time updates on job status changes.

Installation

To install PGQueuer, simply install with pip the following command:

pip install pgqueuer

Example Usage

Here's how you can use PGQueuer in a typical scenario processing incoming data messages:

Write and run a consumer

Start a long-lived consumer that will begin processing jobs as soon as they are enqueued by another process. In this case we want to be a bit more carefull as we want gracefull shutdowns, pgqueuer run will setup signals to ensure this.

from __future__ import annotations

import asyncpg
from pgqueuer.db import AsyncpgDriver, dsn
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager


async def main() -> QueueManager:
    connection = await asyncpg.connect(dsn())
    driver = AsyncpgDriver(connection)
    qm = QueueManager(driver)

    # Setup the 'fetch' entrypoint
    @qm.entrypoint("fetch")
    async def process_message(job: Job) -> None:
        print(f"Processed message: {job}")

    return qm
python3 -m pgqueuer run tools.consumer.main

Write and run a producer

Start a short-lived producer that will enqueue 10,000 jobs.

from __future__ import annotations

import asyncio
import sys

import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries


async def main(N: int) -> None:
    connection = await asyncpg.connect()
    driver = AsyncpgDriver(connection)
    queries = Queries(driver)
    await queries.enqueue(
        ["fetch"] * N,
        [f"this is from me: {n}".encode() for n in range(1, N+1)],
        [0] * N,
    )


if __name__ == "__main__":
    print(sys.argv)
    N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1])
    asyncio.run(main(N))
python3 tools/producer.py 10000

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgqueuer-0.10.0.tar.gz (186.1 kB view details)

Uploaded Source

Built Distribution

pgqueuer-0.10.0-py3-none-any.whl (34.4 kB view details)

Uploaded Python 3

File details

Details for the file pgqueuer-0.10.0.tar.gz.

File metadata

  • Download URL: pgqueuer-0.10.0.tar.gz
  • Upload date:
  • Size: 186.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pgqueuer-0.10.0.tar.gz
Algorithm Hash digest
SHA256 a09a71da33ec5fc6fc2bec01e37a6ee331d9d7662b2548de99f084d663fff2fe
MD5 cf0c2a4be5cc620bfd0a41659e4e5a9c
BLAKE2b-256 637626f07cee74c9b3f30f41ae6a26e9c115b7e6dda8e2eadbc207d372c7c260

See more details on using hashes here.

File details

Details for the file pgqueuer-0.10.0-py3-none-any.whl.

File metadata

  • Download URL: pgqueuer-0.10.0-py3-none-any.whl
  • Upload date:
  • Size: 34.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pgqueuer-0.10.0-py3-none-any.whl
Algorithm Hash digest
SHA256 aa2a011ebc84c3d569e6fbaa68fd87d3f478bd8a49c6c625b44d0cffdee51c7f
MD5 714df977bbc466c37e5231086f5bed3f
BLAKE2b-256 03b3f8328ad2fa0b0a05f39b957ea107c4b2d4fbc5a71319d0cca2f71ccbb017

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page