Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.
Project description
๐ PgQueuer: PostgreSQL-powered job queues for Python
๐ Docs ยท ๐ป Source ยท ๐ฌ Discord
Your PostgreSQL database is already a job queue.
PgQueuer turns PostgreSQL into a fast, reliable background job processor. Jobs live in the same database as your application data. One stack, full ACID guarantees, and no separate message broker to run.
Features
- ๐ก Minimal footprint: one
pip install; bring your existing PostgreSQL connection and start enqueueing - ๐ Transactional enqueue: commit a job in the same transaction as your data; no dual-write drift
- โ๏ธ Safe concurrency: workers claim jobs with
FOR UPDATE SKIP LOCKED(never double-processed), with per-entrypoint limits and serialized dispatch when you need them - ๐ Instant dispatch:
LISTEN/NOTIFYwakes workers the moment a job lands (with a polling fallback) - โฐ Scheduling & deferral: cron-style recurring tasks and
execute_after, no separate scheduler process - ๐ Observability: completion tracking, Prometheus metrics, tracing (Logfire/Sentry), and a live dashboard
- ๐งช In-memory mode: run the whole queue without Postgres for tests and prototyping
Why PostgreSQL?
If you already run PostgreSQL, it can do double duty as your job queue. That means one fewer service to operate, and your queue and data stay consistent because they share the same database and transactions.
โโโโโโโโโโโโ enqueue โโโโโโโโโโโโโโ NOTIFY โโโโโโโโโโโโ
โ Your App โโโโโโโโโโโโโถโ โโโโโโโโโโโโถโ Worker 1 โโโโ
โโโโโโโโโโโโ โ โ โโโโโโโโโโโโ โ
โ PostgreSQL โ NOTIFY โโโโโโโโโโโโ โ
โ โโโโโโโโโโโโถโ Worker 2 โโโโค
โ โ โโโโโโโโโโโโ โ
โ โ NOTIFY โโโโโโโโโโโโ โ
โ โโโโโโโโโโโโถโ Worker N โโโโค
โโโโโโโโโโโโโโ โโโโโโโโโโโโ โ
โฒ FOR UPDATE SKIP LOCKED โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Installation
PgQueuer targets Python 3.11+ and PostgreSQL 12+:
pip install pgqueuer
pgq install # create tables and functions in your database
The CLI reads PGHOST, PGUSER, PGDATABASE and related environment variables. Use pgq install --dry-run to preview SQL, --prefix myapp_ to namespace tables, or pgq uninstall to remove the schema.
Quick Start
PgQueuer pairs consumers (workers that process jobs) with producers (code that enqueues jobs).
1. Define a consumer
Each entrypoint is a job handler. Run it with the CLI: pgq run examples.consumer:main.
import asyncpg
from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Job
async def main() -> PgQueuer:
connection = await asyncpg.connect()
pgq = PgQueuer(AsyncpgDriver(connection))
@pgq.entrypoint("fetch")
async def process(job: Job) -> None:
print(f"Processed: {job!r}")
return pgq
2. Enqueue jobs
From your web app, script, or anywhere else with a database connection:
import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
async def main() -> None:
connection = await asyncpg.connect()
queries = Queries(AsyncpgDriver(connection))
await queries.enqueue("fetch", b"hello world")
The job arrives instantly via LISTEN/NOTIFY, and your consumer's process handler picks it up.
Enqueue inside a transaction
This is what a database-backed queue buys you: the job and your business data commit together, or not at all.
order_id = 42
async with connection.transaction():
await connection.execute(
"INSERT INTO orders (id, status) VALUES ($1, 'paid')", order_id
)
await queries.enqueue("send_receipt", str(order_id).encode())
# If the transaction rolls back, the job is never enqueued.
Run without a database
PgQueuer.in_memory() is a drop-in replacement that implements the same ports as the real backend, so your handlers stay identical. Good for unit tests and prototyping.
import asyncio
from pgqueuer import PgQueuer
from pgqueuer.models import Job
from pgqueuer.domain.types import QueueExecutionMode
async def main() -> None:
pq = PgQueuer.in_memory()
@pq.entrypoint("send_email")
async def send_email(job: Job) -> None:
print(f"Sending: {job.payload!r}")
await pq.qm.queries.enqueue(["send_email"], [b"alice"], [0])
await pq.qm.run(mode=QueueExecutionMode.drain)
asyncio.run(main())
The in-memory adapter has no durability or multi-process coordination, so use the PostgreSQL backend for production. See the in-memory reference.
Documentation
| Topic | What's inside |
|---|---|
| Core concepts | Consumers, producers, entrypoints, the job lifecycle |
| Scheduling | Cron-style recurring tasks and deferred execution |
| Concurrency control | Per-entrypoint limits and serialized dispatch |
| Completion tracking | Wait for jobs to finish with CompletionWatcher |
| Shared resources | Inject DB pools, HTTP clients, and models into handlers |
| Custom executors | Retry strategies and exponential backoff |
| Drivers | asyncpg, psycopg async/sync: choosing and configuring |
| Architecture | Ports & adapters, SKIP LOCKED, design decisions |
| Observability | Prometheus metrics, tracing, and the dashboard |
| Framework integration | FastAPI (example) and Flask (example) |
Monitor your queues
Launch the interactive dashboard to watch queue activity in real time:
pgq dashboard --interval 10 --tail 25
+---------------------------+-------+------------+--------------------------+------------+----------+
| Created | Count | Entrypoint | Time in Queue (HH:MM:SS) | Status | Priority |
+---------------------------+-------+------------+--------------------------+------------+----------+
| 2024-05-05 16:44:26+00:00 | 49 | sync | 0:00:01 | successful | 0 |
| 2024-05-05 16:44:27+00:00 | 12 | fetch | 0:00:03 | queued | 0 |
| 2024-05-05 16:44:28+00:00 | 3 | api_call | 0:00:00 | picked | 5 |
+---------------------------+-------+------------+--------------------------+------------+----------+
Development
PgQueuer uses Testcontainers to spin up an ephemeral PostgreSQL instance for the test suite. Just have Docker running.
uv sync --all-extras # install dependencies
make check # lint, type-check, and run the test suite
License
PgQueuer is MIT licensed. See LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pgqueuer-1.1.0.tar.gz.
File metadata
- Download URL: pgqueuer-1.1.0.tar.gz
- Upload date:
- Size: 367.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
38a409ed8975b5e50f7b7629c1f230021ae3b262cea03ecaf503f6c95759475e
|
|
| MD5 |
2483fb3b34fa6c9c159e7c70eb9e2394
|
|
| BLAKE2b-256 |
4194db31147ce12b13d4f1820cb6b6fedaa71ce844c913f2b33ab12ad661e078
|
Provenance
The following attestation bundles were made for pgqueuer-1.1.0.tar.gz:
Publisher:
release.yml on janbjorge/pgqueuer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pgqueuer-1.1.0.tar.gz -
Subject digest:
38a409ed8975b5e50f7b7629c1f230021ae3b262cea03ecaf503f6c95759475e - Sigstore transparency entry: 2055800068
- Sigstore integration time:
-
Permalink:
janbjorge/pgqueuer@bb5bf6a05161efbcceb04310638210c2e80e48a3 -
Branch / Tag:
refs/tags/v1.1.0 - Owner: https://github.com/janbjorge
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@bb5bf6a05161efbcceb04310638210c2e80e48a3 -
Trigger Event:
release
-
Statement type:
File details
Details for the file pgqueuer-1.1.0-py3-none-any.whl.
File metadata
- Download URL: pgqueuer-1.1.0-py3-none-any.whl
- Upload date:
- Size: 86.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3c58f21324b6a1d59d43ec21fa17aa0f7d0b25ed08d8a36148890e7dace63a67
|
|
| MD5 |
a329884f03374df1f08918058bb3adcf
|
|
| BLAKE2b-256 |
8345bb520d6e85850371a4c62612d7d3fe85e5480b678f0fc74cc8eec34e32a3
|
Provenance
The following attestation bundles were made for pgqueuer-1.1.0-py3-none-any.whl:
Publisher:
release.yml on janbjorge/pgqueuer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pgqueuer-1.1.0-py3-none-any.whl -
Subject digest:
3c58f21324b6a1d59d43ec21fa17aa0f7d0b25ed08d8a36148890e7dace63a67 - Sigstore transparency entry: 2055800857
- Sigstore integration time:
-
Permalink:
janbjorge/pgqueuer@bb5bf6a05161efbcceb04310638210c2e80e48a3 -
Branch / Tag:
refs/tags/v1.1.0 - Owner: https://github.com/janbjorge
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@bb5bf6a05161efbcceb04310638210c2e80e48a3 -
Trigger Event:
release
-
Statement type: