Python async api for creating and managing queues in postgres
Project description
Install
> pip install async-pq
Quick start
To work with async-pq
we need asyncpg
library:
import asyncpg
conn = await asyncpg.connect('postgresql://postgres@localhost/test')
QueueFabric.find_queue
method will create needed
tables in database if it is new queue.
Also it has is_exists_queue
method for situations when you
need to know that it will be the new queue.
from async_pq import Queue, QueueFabric
queue: Queue = await QueueFabric(conn).find_queue('items')
Operations with queue
Put new items (dumped JSONs) in queue:
await queue.put('{"id":1,"data":[1,2,3]}', '{"id":2,"data":[3,2,6]}')
Pop items from queue with some limit
(it is possible to use acknowledge pattern):
# If with_ack=False (default from > 0.2.1), massage will be acknowledged in place automatically
request_id, data = await queue.pop(limit=2, with_ack=True)
Acknowledge request:
# returns False if request does not found or acked already
is_acked: bool = await queue.ack(request_id)
Or vice versa:
# returns False if request does not found or acked already
is_unacked: bool = await queue.unack(request_id)
Return to queue all unacknowledged massages older than timeout
seconds
(default limit=1000 entities):
await queue.return_unacked(timeout=300)
Clean queue (delete acknowledged massages) to not overfill database with old data (default limit=1000 entities):
await queue.clean_acked_queue()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.