More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn.
Project description
pgmq-sqlalchemy
More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines
, sessionmakers
or built from dsn
.
Table of Contents
Features
- Supports async and sync
engines
andsessionmakers
, or built fromdsn
. - Automatically creates
pgmq
(orpg_partman
) extension on the database if not exists. - Supports all postgres DBAPIs supported by sqlalchemy.
e.g.
psycopg
,psycopg2
,asyncpg
..
See SQLAlchemy Postgresql Dialects
Installation
Install with pip:
pip install pgmq-sqlalchemy
Install with additional DBAPIs packages:
pip install "pgmq-sqlalchemy[asyncpg]"
pip install "pgmq-sqlalchemy[psycopg2-binary]"
# pip install "pgmq-sqlalchemy[postgres-python-driver]"
Getting Started
Postgres Setup
Prerequisites: Postgres with PGMQ extension installed.
For quick setup:
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
For more information, see PGMQ
Usage
[!NOTE]
Check pgmq-sqlalchemy Document for more examples and detailed usage.
For dispatcher.py
:
from typing import List
from pgmq_sqlalchemy import PGMQueue
postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'
pgmq = PGMQueue(dsn=postgres_dsn)
pgmq.create_queue('my_queue')
msg = {'key': 'value', 'key2': 'value2'}
msg_id:int = pgmq.send('my_queue', msg)
# could also send a list of messages
msg_ids:List[int] = pgmq.send_batch('my_queue', [msg, msg])
For consumer.py
:
from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message
postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'
pgmq = PGMQueue(dsn=postgres_dsn)
# read a single message
msg:Message = pgmq.read('my_queue')
# read a batch of messages
msgs:List[Message] = pgmq.read_batch('my_queue', 10)
For monitor.py
:
from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import QueueMetrics
postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'
pgmq = PGMQueue(dsn=postgres_dsn)
# get queue metrics
metrics:QueueMetrics = pgmq.metrics('my_queue')
print(metrics.queue_length)
print(metrics.total_messages)
Issue/ Contributing / Development
Welcome to open an issue or pull request !
See Development
on Online Document or CONTRIBUTING.md for more information.
TODO
- Add time-based partition option and validation to
create_partitioned_queue
method. - Read(single/batch) Archive Table (
read_archive
method ) - Detach Archive Table (
detach_archive
method ) - Add
set_vt
utils method.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pgmq_sqlalchemy-0.1.2.tar.gz
.
File metadata
- Download URL: pgmq_sqlalchemy-0.1.2.tar.gz
- Upload date:
- Size: 12.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8ab499aad73f488897e1acc1f39aacbe66554a005e3d86929f9c45589e42105c |
|
MD5 | 2c5e98097a78612fb8165278e477d846 |
|
BLAKE2b-256 | efb37dcc4e06d12105082c7f3ec0390ce70c3c1121aa0d7fa33798b14ed46826 |
File details
Details for the file pgmq_sqlalchemy-0.1.2-py3-none-any.whl
.
File metadata
- Download URL: pgmq_sqlalchemy-0.1.2-py3-none-any.whl
- Upload date:
- Size: 12.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1f3ddf4c1b5f77874bb2eca1d63ae7a901d2f58a920c42862efc0f2cb3b78af7 |
|
MD5 | 91142ec6f71ab379780bc743fbee8e63 |
|
BLAKE2b-256 | b60a6aedbbc22ea76c38cf6a426d8b74b35070dfcc55ece430bf50b7b88177e2 |