Skip to main content

More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn.

Project description

Poetry Ruff PyPI - Version PyPI - License PyPI - Python Version codecov Documentation Status

pgmq-sqlalchemy

More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn.

Table of Contents

Features

  • Supports async and sync engines and sessionmakers, or built from dsn.
  • Automatically creates pgmq (or pg_partman) extension on the database if not exists.
  • Supports all postgres DBAPIs supported by sqlalchemy.

    e.g. psycopg, psycopg2, asyncpg ..
    See SQLAlchemy Postgresql Dialects

Installation

Install with pip:

pip install pgmq-sqlalchemy

Install with additional DBAPIs packages:

pip install "pgmq-sqlalchemy[asyncpg]"
pip install "pgmq-sqlalchemy[psycopg2-binary]"
# pip install "pgmq-sqlalchemy[postgres-python-driver]"

Getting Started

Postgres Setup

Prerequisites: Postgres with PGMQ extension installed.
For quick setup:

docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest

For more information, see PGMQ

Usage

[!NOTE]
Check pgmq-sqlalchemy Document for more examples and detailed usage.

For dispatcher.py:

from typing import List
from pgmq_sqlalchemy import PGMQueue

postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'

pgmq = PGMQueue(dsn=postgres_dsn)
pgmq.create_queue('my_queue')

msg = {'key': 'value', 'key2': 'value2'}
msg_id:int = pgmq.send('my_queue', msg)

# could also send a list of messages
msg_ids:List[int] = pgmq.send_batch('my_queue', [msg, msg])

For consumer.py:

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message

postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'

pgmq = PGMQueue(dsn=postgres_dsn)

# read a single message
msg:Message = pgmq.read('my_queue')

# read a batch of messages
msgs:List[Message] = pgmq.read_batch('my_queue', 10)

For monitor.py:

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import QueueMetrics

postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'

pgmq = PGMQueue(dsn=postgres_dsn)

# get queue metrics
metrics:QueueMetrics = pgmq.metrics('my_queue')
print(metrics.queue_length)
print(metrics.total_messages)

Issue/ Contributing / Development

Welcome to open an issue or pull request !
See Development on Online Document or CONTRIBUTING.md for more information.

TODO

  • Add time-based partition option and validation to create_partitioned_queue method.
  • Read(single/batch) Archive Table ( read_archive method )
  • Detach Archive Table ( detach_archive method )
  • Add set_vt utils method.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgmq_sqlalchemy-0.1.2.tar.gz (12.5 kB view details)

Uploaded Source

Built Distribution

pgmq_sqlalchemy-0.1.2-py3-none-any.whl (12.3 kB view details)

Uploaded Python 3

File details

Details for the file pgmq_sqlalchemy-0.1.2.tar.gz.

File metadata

  • Download URL: pgmq_sqlalchemy-0.1.2.tar.gz
  • Upload date:
  • Size: 12.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.4

File hashes

Hashes for pgmq_sqlalchemy-0.1.2.tar.gz
Algorithm Hash digest
SHA256 8ab499aad73f488897e1acc1f39aacbe66554a005e3d86929f9c45589e42105c
MD5 2c5e98097a78612fb8165278e477d846
BLAKE2b-256 efb37dcc4e06d12105082c7f3ec0390ce70c3c1121aa0d7fa33798b14ed46826

See more details on using hashes here.

File details

Details for the file pgmq_sqlalchemy-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for pgmq_sqlalchemy-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 1f3ddf4c1b5f77874bb2eca1d63ae7a901d2f58a920c42862efc0f2cb3b78af7
MD5 91142ec6f71ab379780bc743fbee8e63
BLAKE2b-256 b60a6aedbbc22ea76c38cf6a426d8b74b35070dfcc55ece430bf50b7b88177e2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page