Python client for the PGMQ Postgres extension.
Project description
What is PGMQ?
A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
PGMQ (Postgres Message Queue) is a message queue built on Postgres. It provides reliable, transactional message processing with the familiarity of SQL. The pgmq Python library exposes a clean, unified API for interacting with PGMQ across four different database backends.
Prerequisites
A running PostgreSQL instance with the PGMQ extension installed.
Docker (recommended)
The fastest way to get started is with the pre-built Docker image:
docker run -d --name pgmq-postgres \
-e POSTGRES_PASSWORD=postgres \
-p 5432:5432 \
ghcr.io/pgmq/pg18-pgmq:latest
Then connect and enable PGMQ:
psql postgres://postgres:postgres@localhost:5432/postgres -c "CREATE EXTENSION pgmq;"
SQL Only
You can also install PGMQ's objects directly into the pgmq schema. Use this on hosted Postgres services that do not support custom extensions.
git clone https://github.com/pgmq/pgmq.git
cd pgmq
psql -f pgmq-extension/sql/pgmq.sql postgres://postgres:postgres@localhost:5432/postgres
Installation
pip install pgmq
Optional backends:
| Extra | Backend |
|---|---|
pgmq[async] |
asyncpg |
pgmq[sqlalchemy] |
SQLAlchemy (sync) |
pgmq[sqlalchemy-async] |
SQLAlchemy (async) |
Features
Lightweight — No background workers or external dependencies. Just Postgres SQL objects.
Exactly-once delivery — Guaranteed delivery to a single consumer within a visibility timeout.
Four identical APIs — Swap between sync (psycopg), async (asyncpg), sync SQLAlchemy, and async SQLAlchemy with minimal changes.
Queue management — Create, drop, list, purge, and partition queues.
Message operations — Send, read, archive, delete, pop. Batch operations for high throughput.
FIFO queues — Ordered processing with message group keys.
Topic routing — Pattern-based bindings for publish-subscribe and content-based routing.
Visibility timeouts — Control how long a message stays hidden after reading.
Notifications — PostgreSQL NOTIFY/LISTEN for real-time message arrival events.
Transactions — Decorators and manual connection injection for complex workflows.
Structured logging — stdlib logging with optional loguru backend.
Documentation
- Getting Started — Installation, Docker setup, and first messages
- Configuration — Environment variables and connection strings
- Clients — Choosing and initializing backends
- Transactions — Transaction decorators and manual connections
- Topic Routing — Pattern-based message routing
- Notifications — Real-time NOTIFY/LISTEN listeners
Quick Start
Sync (psycopg):
from pgmq import PGMQueue
queue = PGMQueue() # reads PG_* env vars by default
# Create a queue
queue.create_queue("my_queue")
# Send a message
msg_id = queue.send("my_queue", {"hello": "world"})
# Send a batch
batch_ids = queue.send_batch("my_queue", [{"foo": "bar"}, {"baz": "qux"}])
# Read with 30s visibility timeout
msg = queue.read("my_queue", vt=30)
print(msg.message) # {'hello': 'world'}
# Archive when done
queue.archive("my_queue", msg.msg_id)
Async (asyncpg):
from pgmq import AsyncPGMQueue
queue = AsyncPGMQueue()
await queue.init()
# Create a queue
await queue.create_queue("my_queue")
# Send a message
msg_id = await queue.send("my_queue", {"hello": "world"})
# Send a batch
batch_ids = await queue.send_batch("my_queue", [{"foo": "bar"}, {"baz": "qux"}])
# Read with 30s visibility timeout
msg = await queue.read("my_queue", vt=30)
print(msg.message) # {'hello': 'world'}
# Archive when done
await queue.archive("my_queue", msg.msg_id)
Development
# Install dependencies
uv sync --all-groups --all-extras
# Run tests (spins up Docker Postgres automatically)
make test
# Run lints
make lint
# Serve docs locally
make docs-serve
License
Apache-2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pgmq-1.1.1.tar.gz.
File metadata
- Download URL: pgmq-1.1.1.tar.gz
- Upload date:
- Size: 29.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.9 {"installer":{"name":"uv","version":"0.11.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
289e1ca69e4adaf2110d53ca06d3c4f1a90577ae855129c43989007bc6918eb8
|
|
| MD5 |
de3acb2aa8c01f9281a7b283d06b9422
|
|
| BLAKE2b-256 |
7a4a101963903575cf9eb449bbaf5041b0463a972e0a548947fce9b566129656
|
File details
Details for the file pgmq-1.1.1-py3-none-any.whl.
File metadata
- Download URL: pgmq-1.1.1-py3-none-any.whl
- Upload date:
- Size: 41.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.9 {"installer":{"name":"uv","version":"0.11.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f69f9a3800dd669095f5657da34cb467f5083ab2bc58f1b41ddb13f9fef55ce1
|
|
| MD5 |
ffdf504bda65d90a77add8c24c89eceb
|
|
| BLAKE2b-256 |
4f0d289b1210f4ff67c25a732efb44284fccc1c75dda1a35d442beb3ee02910b
|