Skip to main content

More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn.

Project description

uv Ruff PyPI - Version PyPI - License PyPI - Python Version codecov Documentation Status

pgmq-sqlalchemy

More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn.

Table of Contents

Features

Installation

Install with pip:

pip install pgmq-sqlalchemy

Install with additional DBAPIs packages:

pip install "pgmq-sqlalchemy[asyncpg]"
pip install "pgmq-sqlalchemy[psycopg2-binary]"
# pip install "pgmq-sqlalchemy[postgres-python-driver]"

Getting Started

Postgres Setup

Prerequisites: Postgres with PGMQ extension installed.
For quick setup:

docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest

For more information, see PGMQ

Usage

[!NOTE]
Check pgmq-sqlalchemy Document for more examples and detailed usage.

For dispatcher.py:

from typing import List
from pgmq_sqlalchemy import PGMQueue

postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'

pgmq = PGMQueue(dsn=postgres_dsn)
pgmq.create_queue('my_queue')

msg = {'key': 'value', 'key2': 'value2'}
msg_id:int = pgmq.send('my_queue', msg)

# could also send a list of messages
msg_ids:List[int] = pgmq.send_batch('my_queue', [msg, msg])

For consumer.py:

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message

postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'

pgmq = PGMQueue(dsn=postgres_dsn)

# read a single message
msg:Message = pgmq.read('my_queue')

# read a batch of messages
msgs:List[Message] = pgmq.read_batch('my_queue', 10)

For monitor.py:

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import QueueMetrics

postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'

pgmq = PGMQueue(dsn=postgres_dsn)

# get queue metrics
metrics:QueueMetrics = pgmq.metrics('my_queue')
print(metrics.queue_length)
print(metrics.total_messages)

Transaction Usage

Use the op module to combine PGMQ operations with your business logic in a single transaction:

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from pgmq_sqlalchemy import op

engine = create_engine('postgresql://postgres:postgres@localhost:5432/postgres')
SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    try:
        # Create queue
        op.create_queue('orders_queue', session=session, commit=False)
        
        # Insert order into your database
        session.execute(
            text("INSERT INTO orders (user_id, total) VALUES (:user_id, :total)"),
            {"user_id": 123, "total": 99.99}
        )
        
        # Send message to queue
        op.send(
            'orders_queue',
            {'user_id': 123, 'action': 'process_order'},
            session=session,
            commit=False
        )
        
        # Commit everything together
        session.commit()
    except Exception as e:
        session.rollback()
        print(f"Transaction failed: {e}")

See Transaction Usage Documentation for more examples.

FastAPI Pub/Sub Example with tests

See the FastAPI Pub/Sub Example for a complete example of using pgmq-sqlalchemy in a FastAPI application with asynchronous message consumption and tests.

Issue/ Contributing / Development

Welcome to open an issue or pull request !
See Development on Online Document or CONTRIBUTING.md for more information.

TODO

  • Alembic compatible migration scripts for PGMQ extension and schema setup, upgrade, downgrade.
  • Compatibility tests with PGMQ across different PGMQ versions.
  • More examples
  • Smoothen contributing process with custom script for one step setup
  • Mypy strict type checking
  • Enable more ruff rules
  • Drop Python 3.9 support in next minor release

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgmq_sqlalchemy-0.2.0.tar.gz (200.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgmq_sqlalchemy-0.2.0-py3-none-any.whl (16.8 kB view details)

Uploaded Python 3

File details

Details for the file pgmq_sqlalchemy-0.2.0.tar.gz.

File metadata

  • Download URL: pgmq_sqlalchemy-0.2.0.tar.gz
  • Upload date:
  • Size: 200.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pgmq_sqlalchemy-0.2.0.tar.gz
Algorithm Hash digest
SHA256 dadad8b8774f384190ae7dc59d4de9666969b89940314d3bd1bda8186b7702ed
MD5 1e64e03fbc3ceb122258301a46a9c02d
BLAKE2b-256 4dd60dfcbc576f8271e3fafe0068dbb78dc3a43dc47b5a551b9758717cb647e4

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgmq_sqlalchemy-0.2.0.tar.gz:

Publisher: publish.yml on jason810496/pgmq-sqlalchemy

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pgmq_sqlalchemy-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pgmq_sqlalchemy-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 eedf8b61a3875a783660c285e4e81a05b1437548c864a441714a493045029611
MD5 4aaae604ef8ad27a721fe7ea442fa6e1
BLAKE2b-256 0f9edcf3f861fc4ec272f2264c689696b3e28a007fb9f1f0171ad91c811f9357

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgmq_sqlalchemy-0.2.0-py3-none-any.whl:

Publisher: publish.yml on jason810496/pgmq-sqlalchemy

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page