Skip to main content

Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.

Project description

๐Ÿš€ PgQueuer: PostgreSQL-powered job queues for Python

CI pypi downloads stars versions

๐Ÿ“š Docs ยท ๐Ÿ’ป Source ยท ๐Ÿ’ฌ Discord

Your PostgreSQL database is already a job queue.

PgQueuer turns PostgreSQL into a fast, reliable background job processor. Jobs live in the same database as your application data. One stack, full ACID guarantees, and no separate message broker to run.

Features

  • ๐Ÿ’ก Minimal footprint: one pip install; bring your existing PostgreSQL connection and start enqueueing
  • ๐Ÿ” Transactional enqueue: commit a job in the same transaction as your data; no dual-write drift
  • โš›๏ธ Safe concurrency: workers claim jobs with FOR UPDATE SKIP LOCKED (never double-processed), with per-entrypoint limits and serialized dispatch when you need them
  • ๐Ÿš€ Instant dispatch: LISTEN/NOTIFY wakes workers the moment a job lands (with a polling fallback)
  • โฐ Scheduling & deferral: cron-style recurring tasks and execute_after, no separate scheduler process
  • ๐Ÿ“Š Observability: completion tracking, Prometheus metrics, tracing (Logfire/Sentry), and a live dashboard
  • ๐Ÿงช In-memory mode: run the whole queue without Postgres for tests and prototyping

Why PostgreSQL?

If you already run PostgreSQL, it can do double duty as your job queue. That means one fewer service to operate, and your queue and data stay consistent because they share the same database and transactions.

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  enqueue   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  NOTIFY   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Your App โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚            โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Worker 1 โ”‚โ”€โ”€โ”
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜            โ”‚            โ”‚           โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
                        โ”‚ PostgreSQL โ”‚  NOTIFY   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
                        โ”‚            โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Worker 2 โ”‚โ”€โ”€โ”ค
                        โ”‚            โ”‚           โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
                        โ”‚            โ”‚  NOTIFY   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
                        โ”‚            โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Worker N โ”‚โ”€โ”€โ”ค
                        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜           โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
                              โ–ฒ  FOR UPDATE SKIP LOCKED         โ”‚
                              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Installation

PgQueuer targets Python 3.11+ and PostgreSQL 12+:

pip install pgqueuer
pgq install        # create tables and functions in your database

The CLI reads PGHOST, PGUSER, PGDATABASE and related environment variables. Use pgq install --dry-run to preview SQL, --prefix myapp_ to namespace tables, or pgq uninstall to remove the schema.

Quick Start

PgQueuer pairs consumers (workers that process jobs) with producers (code that enqueues jobs).

1. Define a consumer

Each entrypoint is a job handler. Run it with the CLI: pgq run examples.consumer:main.

import asyncpg
from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Job

async def main() -> PgQueuer:
    connection = await asyncpg.connect()
    pgq = PgQueuer(AsyncpgDriver(connection))

    @pgq.entrypoint("fetch")
    async def process(job: Job) -> None:
        print(f"Processed: {job!r}")

    return pgq

2. Enqueue jobs

From your web app, script, or anywhere else with a database connection:

import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries

async def main() -> None:
    connection = await asyncpg.connect()
    queries = Queries(AsyncpgDriver(connection))
    await queries.enqueue("fetch", b"hello world")

The job arrives instantly via LISTEN/NOTIFY, and your consumer's process handler picks it up.

Enqueue inside a transaction

This is what a database-backed queue buys you: the job and your business data commit together, or not at all.

order_id = 42

async with connection.transaction():
    await connection.execute(
        "INSERT INTO orders (id, status) VALUES ($1, 'paid')", order_id
    )
    await queries.enqueue("send_receipt", str(order_id).encode())
    # If the transaction rolls back, the job is never enqueued.

Run without a database

PgQueuer.in_memory() is a drop-in replacement that implements the same ports as the real backend, so your handlers stay identical. Good for unit tests and prototyping.

import asyncio
from pgqueuer import PgQueuer
from pgqueuer.models import Job
from pgqueuer.domain.types import QueueExecutionMode

async def main() -> None:
    pq = PgQueuer.in_memory()

    @pq.entrypoint("send_email")
    async def send_email(job: Job) -> None:
        print(f"Sending: {job.payload!r}")

    await pq.qm.queries.enqueue(["send_email"], [b"alice"], [0])
    await pq.qm.run(mode=QueueExecutionMode.drain)

asyncio.run(main())

The in-memory adapter has no durability or multi-process coordination, so use the PostgreSQL backend for production. See the in-memory reference.

Documentation

Topic What's inside
Core concepts Consumers, producers, entrypoints, the job lifecycle
Scheduling Cron-style recurring tasks and deferred execution
Concurrency control Per-entrypoint limits and serialized dispatch
Completion tracking Wait for jobs to finish with CompletionWatcher
Shared resources Inject DB pools, HTTP clients, and models into handlers
Custom executors Retry strategies and exponential backoff
Drivers asyncpg, psycopg async/sync: choosing and configuring
Architecture Ports & adapters, SKIP LOCKED, design decisions
Observability Prometheus metrics, tracing, and the dashboard
Framework integration FastAPI (example) and Flask (example)

Monitor your queues

Launch the interactive dashboard to watch queue activity in real time:

pgq dashboard --interval 10 --tail 25
+---------------------------+-------+------------+--------------------------+------------+----------+
|          Created          | Count | Entrypoint | Time in Queue (HH:MM:SS) |   Status   | Priority |
+---------------------------+-------+------------+--------------------------+------------+----------+
| 2024-05-05 16:44:26+00:00 |  49   |    sync    |         0:00:01          | successful |    0     |
| 2024-05-05 16:44:27+00:00 |  12   |   fetch    |         0:00:03          | queued     |    0     |
| 2024-05-05 16:44:28+00:00 |   3   |  api_call  |         0:00:00          | picked     |    5     |
+---------------------------+-------+------------+--------------------------+------------+----------+

Development

PgQueuer uses Testcontainers to spin up an ephemeral PostgreSQL instance for the test suite. Just have Docker running.

uv sync --all-extras      # install dependencies
make check                # lint, type-check, and run the test suite

License

PgQueuer is MIT licensed. See LICENSE for details.

Project details


Release history Release notifications | RSS feed

This version

1.1.0

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgqueuer-1.1.0.tar.gz (367.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgqueuer-1.1.0-py3-none-any.whl (86.4 kB view details)

Uploaded Python 3

File details

Details for the file pgqueuer-1.1.0.tar.gz.

File metadata

  • Download URL: pgqueuer-1.1.0.tar.gz
  • Upload date:
  • Size: 367.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pgqueuer-1.1.0.tar.gz
Algorithm Hash digest
SHA256 38a409ed8975b5e50f7b7629c1f230021ae3b262cea03ecaf503f6c95759475e
MD5 2483fb3b34fa6c9c159e7c70eb9e2394
BLAKE2b-256 4194db31147ce12b13d4f1820cb6b6fedaa71ce844c913f2b33ab12ad661e078

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgqueuer-1.1.0.tar.gz:

Publisher: release.yml on janbjorge/pgqueuer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pgqueuer-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: pgqueuer-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 86.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pgqueuer-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3c58f21324b6a1d59d43ec21fa17aa0f7d0b25ed08d8a36148890e7dace63a67
MD5 a329884f03374df1f08918058bb3adcf
BLAKE2b-256 8345bb520d6e85850371a4c62612d7d3fe85e5480b678f0fc74cc8eec34e32a3

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgqueuer-1.1.0-py3-none-any.whl:

Publisher: release.yml on janbjorge/pgqueuer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page