Python client for the PGMQ Postgres extension.
Project description
Tembo's Python Client for PGMQ
Installation
Install with pip
from pypi.org:
pip install tembo-pgmq-python
Dependencies:
Postgres running the Tembo PGMQ extension.
Usage
Start a Postgres Instance with the Tembo extension installed
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
Using Environment Variables
Set environment variables:
export PG_HOST=127.0.0.1
export PG_PORT=5432
export PG_USERNAME=postgres
export PG_PASSWORD=postgres
export PG_DATABASE=test_db
Initialize a connection to Postgres using environment variables:
from tembo_pgmq_python import PGMQueue, Message
queue = PGMQueue()
Initialize a connection to Postgres without environment variables
from tembo_pgmq_python import PGMQueue, Message
queue = PGMQueue(
host="0.0.0.0",
port="5432",
username="postgres",
password="postgres",
database="postgres"
)
Create a queue
queue.create_queue("my_queue")
or a partitioned queue
queue.create_partitioned_queue("my_partitioned_queue", partition_size=10000)
Send a message
msg_id: int = queue.send("my_queue", {"hello": "world"})
Send a batch of messages
msg_ids: list[int] = queue.send_batch("my_queue", [{"hello": "world"}, {"foo": "bar"}])
Read a message, set it invisible for 30 seconds
read_message: Message = queue.read("my_queue", vt=30)
print(read_message)
Read a batch of messages
read_messages: list[Message] = queue.read_batch("my_queue", vt=30, batch_size=5)
for message in read_messages:
print(message)
Archive the message after we're done with it. Archived messages are moved to an archive table
archived: bool = queue.archive("my_queue", read_message.msg_id)
Delete a message completely
msg_id: int = queue.send("my_queue", {"hello": "world"})
read_message: Message = queue.read("my_queue")
deleted: bool = queue.delete("my_queue", read_message.msg_id)
Pop a message, deleting it and reading it in one transaction
popped_message: Message = queue.pop("my_queue")
print(popped_message)
Purge all messages from a queue
purged_count: int = queue.purge("my_queue")
print(f"Purged {purged_count} messages from the queue.")
Get queue metrics
metrics = queue.metrics("my_queue")
print(f"Queue name: {metrics.queue_name}")
print(f"Queue length: {metrics.queue_length}")
print(f"Newest message age (seconds): {metrics.newest_msg_age_sec}")
print(f"Oldest message age (seconds): {metrics.oldest_msg_age_sec}")
print(f"Total messages: {metrics.total_messages}")
print(f"Scrape time: {metrics.scrape_time}")
Access individual metrics
print(f"Queue length: {queue.metrics('my_queue').queue_length}")
print(f"Total messages: {queue.metrics('my_queue').total_messages}")
Get metrics for all queues
all_metrics = queue.metrics_all()
for metrics in all_metrics:
print(f"Queue name: {metrics.queue_name}")
print(f"Queue length: {metrics.queue_length}")
print(f"Newest message age (seconds): {metrics.newest_msg_age_sec}")
print(f"Oldest message age (seconds): {metrics.oldest_msg_age_sec}")
print(f"Total messages: {metrics.total_messages}")
print(f"Scrape time: {metrics.scrape_time}")
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for tembo_pgmq_python-0.7.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 013b1489b68249c1f6215cb99dc596cd796d42103265b17c1503223dfe0c963a |
|
MD5 | cae06126b8315cbceabe0d13a0488bf9 |
|
BLAKE2b-256 | 3afeaf3326e2d16926f4835a0b381f8340e4745eaa385d4903e2188162500afd |