Platonic wrapper for Amazon Simple Queue Service
A platonic wrapper for Amazon SQS queue service. Abstracts away the implementation details of SQS, providing you with a clean and typed queue interface.
pip install platonic-sqs
from platonic.sqs.queue import SQSSender numbers_out = SQSSender[int]( url='https://sqs.us-west-2.amazonaws.com/123456789012/queue-name', ) numbers_out.send(15) numbers_out.send_many([1, 1, 2, 3, 5, 8, 13])
Receive & acknowledge
from platonic.sqs.queue import SQSReceiver from platonic.timeout import ConstantTimeout from datetime import timedelta incoming_numbers = SQSReceiver[int]( url='https://sqs.us-west-2.amazonaws.com/123456789012/queue-name', # Thus we prevent the receiver from blocking forever if queue is empty timeout=ConstantTimeout(period=timedelta(minutes=3)), ) # If the queue is empty, this call with block until there is a message. cmd = incoming_numbers.receive() assert cmd.value == 15 # Do complicated stuff with the value print(cmd.value * 1234 + 5767) incoming_numbers.acknowledge(cmd)
- This project was generated with
wemake-python-package. Current template version is: c9e9ea8b9be2464cacd00b9c2a438e821da9121b. See what is updated since then.
- This project is partially sponsored by Recall Masters - look out for issues with
Release history Release notifications | RSS feed
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
platonic-sqs-1.2.8.tar.gz (8.7 kB view hashes)
Hashes for platonic_sqs-1.2.8-py3-none-any.whl