Platonic wrapper for Amazon Simple Queue Service
Project description
platonic-sqs
A platonic wrapper for Amazon SQS queue service. Abstracts away the implementation details of SQS, providing you with a clean and typed queue interface.
Installation
pip install platonic-sqs
Send
from platonic.sqs.queue import SQSSender
numbers_out = SQSSender[int](
url='https://sqs.us-west-2.amazonaws.com/123456789012/queue-name',
)
numbers_out.send(15)
numbers_out.send_many([1, 1, 2, 3, 5, 8, 13])
Receive & acknowledge
from platonic.sqs.queue import SQSReceiver
from platonic.timeout import ConstantTimeout
from datetime import timedelta
incoming_numbers = SQSReceiver[int](
url='https://sqs.us-west-2.amazonaws.com/123456789012/queue-name',
# Thus we prevent the receiver from blocking forever if queue is empty
timeout=ConstantTimeout(period=timedelta(minutes=3)),
)
# If the queue is empty, this call with block until there is a message.
cmd = incoming_numbers.receive()
assert cmd.value == 15
# Do complicated stuff with the value
print(cmd.value * 1234 + 5767)
incoming_numbers.acknowledge(cmd)
License
Credits
- This project was generated with
wemake-python-package
. Current template version is: c9e9ea8b9be2464cacd00b9c2a438e821da9121b. See what is updated since then. - This project is partially sponsored by Recall Masters - look out for issues with
sponsored
tag.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
platonic-sqs-1.2.7.tar.gz
(8.7 kB
view hashes)
Built Distribution
Close
Hashes for platonic_sqs-1.2.7-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 070da0c4e1da689a80c62390ac1a1b3edf1792d496f954fe76c709e21d141009 |
|
MD5 | 8321142097af77b1bc129e9136ae2c8d |
|
BLAKE2b-256 | 575a62918e6441cffc6cccd691f2c385b108a58ae49eb9a0aab273a67de211e7 |