Platonic wrapper for Amazon Simple Queue Service
Project description
platonic-sqs
A platonic wrapper for Amazon SQS queue service. Abstracts away the implementation details of SQS, providing you with a clean and typed queue interface.
Installation
pip install platonic-sqs
Send
from platonic.sqs.queue import SQSSender
numbers_out = SQSSender[int](
url='https://sqs.us-west-2.amazonaws.com/123456789012/queue-name',
)
numbers_out.send(15)
numbers_out.send_many([1, 1, 2, 3, 5, 8, 13])
Receive & acknowledge
from platonic.sqs.queue import SQSReceiver
from platonic.timeout import ConstantTimeout
from datetime import timedelta
incoming_numbers = SQSReceiver[int](
url='https://sqs.us-west-2.amazonaws.com/123456789012/queue-name',
# Thus we prevent the receiver from blocking forever if queue is empty
timeout=ConstantTimeout(period=timedelta(minutes=3)),
)
# If the queue is empty, this call with block until there is a message.
cmd = incoming_numbers.receive()
assert cmd.value == 15
# Do complicated stuff with the value
print(cmd.value * 1234 + 5767)
incoming_numbers.acknowledge(cmd)
License
Credits
- This project was generated with
wemake-python-package
. Current template version is: c9e9ea8b9be2464cacd00b9c2a438e821da9121b. See what is updated since then. - This project is partially sponsored by Recall Masters - look out for issues with
sponsored
tag.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
platonic-sqs-1.2.8.tar.gz
(8.7 kB
view hashes)
Built Distribution
Close
Hashes for platonic_sqs-1.2.8-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5060c24e9abc212af49d73b19e3e18eb1f6c8e470f078ff0e54c7026ebb6be58 |
|
MD5 | e63f68a90a0d3c3639ab60c73c752bb6 |
|
BLAKE2b-256 | 1c8f44e298a5502d0c53b91df8f785b6547812486157c387f83a10479881187d |