Python client for PgQue -- PgQ Universal Edition
Project description
pgque-py
Python client for PgQue — the PgQ-based
universal PostgreSQL queue. Thin wrapper over pgque-api SQL functions:
send, receive, ack, nack, force_next_tick, plus a polling
Consumer with LISTEN/NOTIFY wakeup.
Install
After the first Python client release:
pip install pgque-py
Requires Python 3.10+ and PostgreSQL 14+ with the PgQue schema installed
(\i pgque.sql — no extension required).
Database permissions
The connecting database role needs pgque_reader to consume (receive, ack, nack, subscribe, unsubscribe) and pgque_writer to produce (send, send_batch). The two are siblings — neither inherits the other. An app that both produces and consumes (the typical case for code using this client) must be granted both roles:
grant pgque_reader to your_app_user;
grant pgque_writer to your_app_user;
See docs/reference.md — Roles and grants for the full role table.
Quickstart
import pgque
with pgque.connect("postgresql://localhost/mydb") as client:
# one-time setup (typically in a migration)
client.conn.execute("select pgque.subscribe('orders', 'order_worker')")
client.conn.commit()
# producer: commit once to publish both calls atomically
event_id = client.send("orders", {"order_id": 42}, type="order.created")
batch_ids = client.send_batch("orders", "order.created", [
{"order_id": 43},
{"order_id": 44},
])
client.conn.commit()
print(event_id, batch_ids)
# consumer (separate process / thread)
consumer = pgque.Consumer(
dsn="postgresql://localhost/mydb",
queue="orders",
name="order_worker",
)
@consumer.on("order.created")
def handle_order(msg: pgque.Message) -> None:
print(f"got {msg.type}: {msg.payload}")
# Optional: catch-all handler for types with no specific handler.
# Without it, messages with unhandled types are nacked by default
# (sent to retry_queue, or to the dead-letter queue once
# queue_max_retries is exhausted). Register a "*" handler to take
# explicit control.
@consumer.on("*")
def handle_unknown(msg: pgque.Message) -> None:
print(f"unhandled type {msg.type!r}: {msg.payload}")
consumer.start() # blocks until SIGTERM / SIGINT
Consumer options
Consumer(..., max_messages=...) controls the per-receive limit.
The default is PostgreSQL's int maximum, so the consumer requests
the whole PgQ batch before acknowledging it. ack() finishes the
entire underlying PgQ batch, including rows beyond max_messages;
only lower this value when it is at least as large as the queue's
worst-case batch size, otherwise rows past the limit are silently
skipped by the batch ack.
Handling unknown event types
By default the consumer nacks any message whose type has no
registered handler and no "*" catch-all. The message is retried (or
dead-lettered once queue_max_retries is exhausted) so unknown types
are never silently dropped.
To ack unknown types instead, pass unknown_handler_policy="ack":
consumer = pgque.Consumer(
dsn="postgresql://localhost/mydb",
queue="orders",
name="order_worker",
unknown_handler_policy="ack", # log WARNING and ack; do not nack
)
Experimental: cooperative consumers
Experimental in PgQue 0.2. Function names, edge-case behavior, and client API shape may change before this feature is marked stable. Do not use this as the only processing path for critical workloads without idempotent handlers and stale-worker takeover tests.
Cooperative consumers let several worker processes share one logical
consumer. Each batch is handed to exactly one subconsumer; the main
row owns the group cursor, member rows own active batches. See
docs/reference.md — Cooperative consumers / subconsumers
for the SQL surface.
Two-worker example (each worker holds its own connection / process):
import pgque
# worker-1
c1 = pgque.Consumer(
dsn="postgresql://localhost/mydb",
queue="orders",
name="order_worker",
subconsumer="worker-1",
dead_interval="5 minutes", # optional: take over a stale sibling
)
@c1.on("order.created")
def handle(msg):
process(msg)
c1.start() # in a second process: subconsumer="worker-2"
Consumer(subconsumer=...) switches the poll loop to
receive_coop and auto-registers the coop_main + coop_member rows
on the first call. dead_interval is only valid in cooperative mode;
passing it without subconsumer raises ValueError.
The low-level methods on PgqueClient are also available for direct
use:
client.subscribe_subconsumer("orders", "order_worker", "worker-1")
msgs = client.receive_coop(
"orders", "order_worker", "worker-1",
max_messages=100, dead_interval="5 minutes",
)
client.ack(msgs[0].batch_id)
client.touch_subconsumer("orders", "order_worker", "worker-1")
client.unsubscribe_subconsumer(
"orders", "order_worker", "worker-1", batch_handling=1,
)
unsubscribe_subconsumer(..., batch_handling=0) (the default) raises if
the subconsumer holds an active batch; pass batch_handling=1 to route
active messages through retry/DLQ before removal.
A runnable two-worker demo lives at
bench/coop_demo.py; run it against any pgque
database with PGQUE_TEST_DSN set.
Manual ticking
For tests, demos, or manual operation without pg_cron, use
client.force_next_tick(queue) to force the next pgque.ticker() call to
materialize a tick. It does not insert the tick itself:
client.force_next_tick("orders")
client.conn.execute("select pgque.ticker()")
client.conn.commit()
client.force_tick(queue) remains as a deprecated compatibility alias.
Transactions
send → ticker → receive must each run in its own committed transaction (PgQue is snapshot-based). pgque.connect(dsn) is non-autocommit by default — commit between produce and consumer. The Consumer is autocommit + explicit conn.transaction() around receive + dispatch + ack.
Don't wrap send and receive in one explicit tx; same for maint_retry_events + ticker. See snapshot rule.
Tests
Integration tests require a running PostgreSQL with the PgQue schema
installed. Set PGQUE_TEST_DSN and run pytest:
PGQUE_TEST_DSN=postgresql://postgres:pgque_test@localhost/pgque_test \
pytest clients/python/tests
Without PGQUE_TEST_DSN, the tests skip.
Distribution
The PyPI distribution is pgque-py; the import package is pgque:
import pgque
See RELEASE.md for publishing steps.
More
- Schema install, full reference, tutorial: https://github.com/NikolayS/pgque
- Per-function SQL reference: https://github.com/NikolayS/pgque/blob/main/docs/reference.md
- Issues: https://github.com/NikolayS/pgque/issues
License
Apache-2.0. Copyright 2026 Nikolay Samokhvalov.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pgque_py-0.2.0rc1.tar.gz.
File metadata
- Download URL: pgque_py-0.2.0rc1.tar.gz
- Upload date:
- Size: 32.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1bfae70e917a35e6664cda7ce72f2d70e819ef8c7cbb04959e7904eaaf8559e7
|
|
| MD5 |
903c7cfaf78fb060ce85dd383da4bcd4
|
|
| BLAKE2b-256 |
1a889c8271affa78b41572a42ebd6f962ceaf9c2a65d36f3fe1dafb3aa910a75
|
Provenance
The following attestation bundles were made for pgque_py-0.2.0rc1.tar.gz:
Publisher:
release-python.yml on NikolayS/PgQue
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pgque_py-0.2.0rc1.tar.gz -
Subject digest:
1bfae70e917a35e6664cda7ce72f2d70e819ef8c7cbb04959e7904eaaf8559e7 - Sigstore transparency entry: 1463263675
- Sigstore integration time:
-
Permalink:
NikolayS/PgQue@b4baa6d3192fcbab22752075df45312ee2879baa -
Branch / Tag:
refs/heads/main - Owner: https://github.com/NikolayS
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-python.yml@b4baa6d3192fcbab22752075df45312ee2879baa -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file pgque_py-0.2.0rc1-py3-none-any.whl.
File metadata
- Download URL: pgque_py-0.2.0rc1-py3-none-any.whl
- Upload date:
- Size: 18.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
252e4e2c6a0954eef5136759722f2ac3fc6a54caec0ebe094b579d4d1ca76e42
|
|
| MD5 |
1b3c614c797840297d4957ae28839f97
|
|
| BLAKE2b-256 |
fde8cc52e8b4a9675d9021115ad6705e1af02b48ed7b8b64e640f39edcff9e61
|
Provenance
The following attestation bundles were made for pgque_py-0.2.0rc1-py3-none-any.whl:
Publisher:
release-python.yml on NikolayS/PgQue
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pgque_py-0.2.0rc1-py3-none-any.whl -
Subject digest:
252e4e2c6a0954eef5136759722f2ac3fc6a54caec0ebe094b579d4d1ca76e42 - Sigstore transparency entry: 1463263721
- Sigstore integration time:
-
Permalink:
NikolayS/PgQue@b4baa6d3192fcbab22752075df45312ee2879baa -
Branch / Tag:
refs/heads/main - Owner: https://github.com/NikolayS
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-python.yml@b4baa6d3192fcbab22752075df45312ee2879baa -
Trigger Event:
workflow_dispatch
-
Statement type: