Async Iterators for SQS Queues
Project description
This project (sqsi, short for SQS iterable, pronounced "squeezy") is a way to access SQS queues as async iterators in Python, because I am tired of writing custom SQS logic every time I just want to read stuff out of a queue.
The basic operation of the library is like this:
async with sqsi.QueueIterator(uri=...) as queue:
async for item in queue:
... # process the item
# If no exception is thrown, the item will be automatically deleted
# from the queue when the next item is requested.
If you want more fine-grained control over when items are deleted from the queue, you can manually delete the items. For
convenience, the mark_complete
method can accept any number of handles as positional arguments.
async with sqsi.QueueIterator(uri=..., deletion_mode='handle') as queue:
async for item, receipt_handle in queue:
... # process the item
await queue.mark_complete(receipt_handle)
Or, to avoid having to pass around the queue
object to other classes, you can ask for a callback method instead
(though make sure you are still somehow within the async context where queue
is valid):
async with sqsi.QueueIterator(uri=..., deletion_mode='callback') as queue:
async for item, complete in queue:
... # process the item
await complete()
You can also specify a way to transform the input from SQS, if you wish to process items other than strings:
async with sqsi.QueueIterator(uri=..., transformer=json.loads, transformer_exceptions='skip-delete') as queue:
async for item in queue:
... # item is a JSON object here
In this case, we use transformer_exceptions='skip-delete'
to silence JSON exceptions and skip the item (and delete it
from the queue) when it does not parse.
If you need to process messages in chunks of a certain size, a builtin method is provided to generate those chunks.
async with sqsi.QueueIterator(uri=..., deletion_mode='callback') as queue:
async for chunk in queue.chunks(size=10):
for item, complete in chunk:
... # process each item in the chunk
await complete()
Note that a deletion mode of callback
or handle
is recommended when processing chunks, as otherwise an entire chunk
is marked completed at the same time when the chunk is finished, which can lead to counter-intuitive behavior.
If a timeout is passed to chunks
, it will wait until the timeout or until it has a chunk of sufficient size, whichever
comes first.
async with sqsi.QueueIterator(uri=..., deletion_mode='callback') as queue:
async for chunk in queue.chunks(size=10, timeout=30):
for item, complete in chunk:
... # process each item in the chunk
await complete()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file sqsi-0.1.9.tar.gz
.
File metadata
- Download URL: sqsi-0.1.9.tar.gz
- Upload date:
- Size: 5.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
3015afac496b9480c1f522722bda223706d39236b5a299853d2ff3c621031e5c
|
|
MD5 |
ffcb8ec7bc63ec2fd483dadfb3592ff5
|
|
BLAKE2b-256 |
e1e063fc6000ec10d0cfd819a7bcce000041730a678e123f30f1d4a823b326ce
|
Provenance
The following attestation bundles were made for sqsi-0.1.9.tar.gz
:
Publisher:
python-publish.yml
on rchowe/sqsi
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1
-
Predicate type:
https://docs.pypi.org/attestations/publish/v1
-
Subject name:
sqsi-0.1.9.tar.gz
-
Subject digest:
3015afac496b9480c1f522722bda223706d39236b5a299853d2ff3c621031e5c
- Sigstore transparency entry: 373596072
- Sigstore integration time:
-
Permalink:
rchowe/sqsi@aae22a36578204a31c542bde5282c1f0e26e30de
-
Branch / Tag:
refs/tags/0.1.9
- Owner: https://github.com/rchowe
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com
-
Runner Environment:
github-hosted
-
Publication workflow:
python-publish.yml@aae22a36578204a31c542bde5282c1f0e26e30de
-
Trigger Event:
release
-
Statement type:
File details
Details for the file sqsi-0.1.9-py3-none-any.whl
.
File metadata
- Download URL: sqsi-0.1.9-py3-none-any.whl
- Upload date:
- Size: 5.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
88b783dffa8bd7daab6fbeef504b7eb141656af443145d272d8452147e890567
|
|
MD5 |
d1577fd394859575579917149fcbc4d8
|
|
BLAKE2b-256 |
82ee16a6ff8904e005c844bfa8d52ed59a8740bfd82b32df57487e425a2f81f0
|
Provenance
The following attestation bundles were made for sqsi-0.1.9-py3-none-any.whl
:
Publisher:
python-publish.yml
on rchowe/sqsi
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1
-
Predicate type:
https://docs.pypi.org/attestations/publish/v1
-
Subject name:
sqsi-0.1.9-py3-none-any.whl
-
Subject digest:
88b783dffa8bd7daab6fbeef504b7eb141656af443145d272d8452147e890567
- Sigstore transparency entry: 373596095
- Sigstore integration time:
-
Permalink:
rchowe/sqsi@aae22a36578204a31c542bde5282c1f0e26e30de
-
Branch / Tag:
refs/tags/0.1.9
- Owner: https://github.com/rchowe
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com
-
Runner Environment:
github-hosted
-
Publication workflow:
python-publish.yml@aae22a36578204a31c542bde5282c1f0e26e30de
-
Trigger Event:
release
-
Statement type: