Skip to main content

Python client for PgQue -- PgQ Universal Edition

Project description

pgque-py

Python client for PgQue — the PgQ-based universal PostgreSQL queue. Thin wrapper over pgque-api SQL functions: send, receive, ack, nack, force_next_tick, plus a polling Consumer with LISTEN/NOTIFY wakeup.

Install

After the first Python client release:

pip install pgque-py

Requires Python 3.10+ and PostgreSQL 14+ with the PgQue schema installed (\i pgque.sql — no extension required).

Database permissions

The connecting database role needs pgque_reader to consume (receive, ack, nack, subscribe, unsubscribe) and pgque_writer to produce (send, send_batch). The two are siblings — neither inherits the other. An app that both produces and consumes (the typical case for code using this client) must be granted both roles:

grant pgque_reader to your_app_user;
grant pgque_writer to your_app_user;

See docs/reference.md — Roles and grants for the full role table.

Quickstart

import pgque

with pgque.connect("postgresql://localhost/mydb") as client:
    # one-time setup (typically in a migration)
    client.conn.execute("select pgque.subscribe('orders', 'order_worker')")
    client.conn.commit()

    # producer: commit once to publish both calls atomically
    event_id = client.send("orders", {"order_id": 42}, type="order.created")
    batch_ids = client.send_batch("orders", "order.created", [
        {"order_id": 43},
        {"order_id": 44},
    ])
    client.conn.commit()
    print(event_id, batch_ids)

# consumer (separate process / thread)
consumer = pgque.Consumer(
    dsn="postgresql://localhost/mydb",
    queue="orders",
    name="order_worker",
)

@consumer.on("order.created")
def handle_order(msg: pgque.Message) -> None:
    print(f"got {msg.type}: {msg.payload}")

# Optional: catch-all handler for types with no specific handler.
# Without it, messages with unhandled types are nacked by default
# (sent to retry_queue, or to the dead-letter queue once
# queue_max_retries is exhausted). Register a "*" handler to take
# explicit control.
@consumer.on("*")
def handle_unknown(msg: pgque.Message) -> None:
    print(f"unhandled type {msg.type!r}: {msg.payload}")

consumer.start()  # blocks until SIGTERM / SIGINT

Consumer options

Consumer(..., max_messages=...) controls the per-receive limit. The default is PostgreSQL's int maximum, so the consumer requests the whole PgQ batch before acknowledging it. ack() finishes the entire underlying PgQ batch, including rows beyond max_messages; only lower this value when it is at least as large as the queue's worst-case batch size, otherwise rows past the limit are silently skipped by the batch ack.

Handling unknown event types

By default the consumer nacks any message whose type has no registered handler and no "*" catch-all. The message is retried (or dead-lettered once queue_max_retries is exhausted) so unknown types are never silently dropped.

To ack unknown types instead, pass unknown_handler_policy="ack":

consumer = pgque.Consumer(
    dsn="postgresql://localhost/mydb",
    queue="orders",
    name="order_worker",
    unknown_handler_policy="ack",  # log WARNING and ack; do not nack
)

Experimental: cooperative consumers

Experimental in PgQue 0.2. Function names, edge-case behavior, and client API shape may change before this feature is marked stable. Do not use this as the only processing path for critical workloads without idempotent handlers and stale-worker takeover tests.

Cooperative consumers let several worker processes share one logical consumer. Each batch is handed to exactly one subconsumer; the main row owns the group cursor, member rows own active batches. See docs/reference.md — Cooperative consumers / subconsumers for the SQL surface.

Two-worker example (each worker holds its own connection / process):

import pgque

# worker-1
c1 = pgque.Consumer(
    dsn="postgresql://localhost/mydb",
    queue="orders",
    name="order_worker",
    subconsumer="worker-1",
    dead_interval="5 minutes",  # optional: take over a stale sibling
)

@c1.on("order.created")
def handle(msg):
    process(msg)

c1.start()  # in a second process: subconsumer="worker-2"

Consumer(subconsumer=...) switches the poll loop to receive_coop and auto-registers the coop_main + coop_member rows on the first call. dead_interval is only valid in cooperative mode; passing it without subconsumer raises ValueError.

The low-level methods on PgqueClient are also available for direct use:

client.subscribe_subconsumer("orders", "order_worker", "worker-1")
msgs = client.receive_coop(
    "orders", "order_worker", "worker-1",
    max_messages=100, dead_interval="5 minutes",
)
client.ack(msgs[0].batch_id)
client.touch_subconsumer("orders", "order_worker", "worker-1")
client.unsubscribe_subconsumer(
    "orders", "order_worker", "worker-1", batch_handling=1,
)

unsubscribe_subconsumer(..., batch_handling=0) (the default) raises if the subconsumer holds an active batch; pass batch_handling=1 to route active messages through retry/DLQ before removal.

A runnable two-worker demo lives at bench/coop_demo.py; run it against any pgque database with PGQUE_TEST_DSN set.

Manual ticking

For tests, demos, or manual operation without pg_cron, use client.force_next_tick(queue) to force the next pgque.ticker() call to materialize a tick. It does not insert the tick itself:

client.force_next_tick("orders")
client.conn.execute("select pgque.ticker()")
client.conn.commit()

client.force_tick(queue) remains as a deprecated compatibility alias.

Transactions

send → ticker → receive must each run in its own committed transaction (PgQue is snapshot-based). pgque.connect(dsn) is non-autocommit by default — commit between produce and consumer. The Consumer is autocommit + explicit conn.transaction() around receive + dispatch + ack.

Don't wrap send and receive in one explicit tx; same for maint_retry_events + ticker. See snapshot rule.

Tests

Integration tests require a running PostgreSQL with the PgQue schema installed. Set PGQUE_TEST_DSN and run pytest:

PGQUE_TEST_DSN=postgresql://postgres:pgque_test@localhost/pgque_test \
    pytest clients/python/tests

Without PGQUE_TEST_DSN, the tests skip.

Distribution

The PyPI distribution is pgque-py; the import package is pgque:

import pgque

See RELEASE.md for publishing steps.

More

License

Apache-2.0. Copyright 2026 Nikolay Samokhvalov.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgque_py-0.2.0rc1.tar.gz (32.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgque_py-0.2.0rc1-py3-none-any.whl (18.9 kB view details)

Uploaded Python 3

File details

Details for the file pgque_py-0.2.0rc1.tar.gz.

File metadata

  • Download URL: pgque_py-0.2.0rc1.tar.gz
  • Upload date:
  • Size: 32.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pgque_py-0.2.0rc1.tar.gz
Algorithm Hash digest
SHA256 1bfae70e917a35e6664cda7ce72f2d70e819ef8c7cbb04959e7904eaaf8559e7
MD5 903c7cfaf78fb060ce85dd383da4bcd4
BLAKE2b-256 1a889c8271affa78b41572a42ebd6f962ceaf9c2a65d36f3fe1dafb3aa910a75

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgque_py-0.2.0rc1.tar.gz:

Publisher: release-python.yml on NikolayS/PgQue

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pgque_py-0.2.0rc1-py3-none-any.whl.

File metadata

  • Download URL: pgque_py-0.2.0rc1-py3-none-any.whl
  • Upload date:
  • Size: 18.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pgque_py-0.2.0rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 252e4e2c6a0954eef5136759722f2ac3fc6a54caec0ebe094b579d4d1ca76e42
MD5 1b3c614c797840297d4957ae28839f97
BLAKE2b-256 fde8cc52e8b4a9675d9021115ad6705e1af02b48ed7b8b64e640f39edcff9e61

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgque_py-0.2.0rc1-py3-none-any.whl:

Publisher: release-python.yml on NikolayS/PgQue

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page