Skip to main content

Python client for the PGMQ Postgres extension.

Project description

Tembo's Python Client for PGMQ

Installation

Install with pip from pypi.org:

pip install tembo-pgmq-python

Dependencies:

Postgres running the Tembo PGMQ extension.

Usage

Start a Postgres Instance with the Tembo extension installed

docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest

Using Environment Variables

Set environment variables:

export PG_HOST=127.0.0.1
export PG_PORT=5432
export PG_USERNAME=postgres
export PG_PASSWORD=postgres
export PG_DATABASE=test_db

Initialize a connection to Postgres using environment variables:

from tembo_pgmq_python import PGMQueue, Message

queue = PGMQueue()

Initialize a connection to Postgres without environment variables

from tembo_pgmq_python import PGMQueue, Message

queue = PGMQueue(
    host="0.0.0.0",
    port="5432",
    username="postgres",
    password="postgres",
    database="postgres"
)

Create a queue

queue.create_queue("my_queue")

or a partitioned queue

queue.create_partitioned_queue("my_partitioned_queue", partition_size=10000)

Send a message

msg_id: int = queue.send("my_queue", {"hello": "world"})

Send a batch of messages

msg_ids: list[int] = queue.send_batch("my_queue", [{"hello": "world"}, {"foo": "bar"}])

Read a message, set it invisible for 30 seconds

read_message: Message = queue.read("my_queue", vt=30)
print(read_message)

Read a batch of messages

read_messages: list[Message] = queue.read_batch("my_queue", vt=30, batch_size=5)
for message in read_messages:
    print(message)

Archive the message after we're done with it. Archived messages are moved to an archive table

archived: bool = queue.archive("my_queue", read_message.msg_id)

Delete a message completely

msg_id: int = queue.send("my_queue", {"hello": "world"})
read_message: Message = queue.read("my_queue")
deleted: bool = queue.delete("my_queue", read_message.msg_id)

Pop a message, deleting it and reading it in one transaction

popped_message: Message = queue.pop("my_queue")
print(popped_message)

Purge all messages from a queue

purged_count: int = queue.purge("my_queue")
print(f"Purged {purged_count} messages from the queue.")

Get queue metrics

metrics = queue.metrics("my_queue")
print(f"Queue name: {metrics.queue_name}")
print(f"Queue length: {metrics.queue_length}")
print(f"Newest message age (seconds): {metrics.newest_msg_age_sec}")
print(f"Oldest message age (seconds): {metrics.oldest_msg_age_sec}")
print(f"Total messages: {metrics.total_messages}")
print(f"Scrape time: {metrics.scrape_time}")

Access individual metrics

print(f"Queue length: {queue.metrics('my_queue').queue_length}")
print(f"Total messages: {queue.metrics('my_queue').total_messages}")

Get metrics for all queues

all_metrics = queue.metrics_all()
for metrics in all_metrics:
    print(f"Queue name: {metrics.queue_name}")
    print(f"Queue length: {metrics.queue_length}")
    print(f"Newest message age (seconds): {metrics.newest_msg_age_sec}")
    print(f"Oldest message age (seconds): {metrics.oldest_msg_age_sec}")
    print(f"Total messages: {metrics.total_messages}")
    print(f"Scrape time: {metrics.scrape_time}")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tembo_pgmq_python-0.7.0.tar.gz (3.9 kB view hashes)

Uploaded Source

Built Distribution

tembo_pgmq_python-0.7.0-py3-none-any.whl (4.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page