Skip to main content

Async Iterators for SQS Queues

Project description

pypi

This project (sqsi, short for SQS iterable, pronounced "squeezy") is a way to access SQS queues as async iterators in Python, because I am tired of writing custom SQS logic every time I just want to read stuff out of a queue.

The basic operation of the library is like this:

async with sqsi.QueueIterator(uri=...) as queue:
    async for item in queue:
        ... # process the item

        # If no exception is thrown, the item will be automatically deleted
        # from the queue when the next item is requested.

If you want more fine-grained control over when items are deleted from the queue, you can manually delete the items. For convenience, the mark_complete method can accept any number of handles as positional arguments.

async with sqsi.QueueIterator(uri=..., deletion_mode='handle') as queue:
    async for item, receipt_handle in queue:
        ... # process the item

        await queue.mark_complete(receipt_handle)

Or, to avoid having to pass around the queue object to other classes, you can ask for a callback method instead (though make sure you are still somehow within the async context where queue is valid):

async with sqsi.QueueIterator(uri=..., deletion_mode='callback') as queue:
    async for item, complete in queue:
        ... # process the item

        await complete()

You can also specify a way to transform the input from SQS, if you wish to process items other than strings:

async with sqsi.QueueIterator(uri=..., transformer=json.loads, transformer_exceptions='skip-delete') as queue:
    async for item in queue:
        ... # item is a JSON object here

In this case, we use transformer_exceptions='skip-delete' to silence JSON exceptions and skip the item (and delete it from the queue) when it does not parse.

If you need to process messages in chunks of a certain size, a builtin method is provided to generate those chunks.

async with sqsi.QueueIterator(uri=..., deletion_mode='callback') as queue:
    async for chunk in queue.chunks(size=10):
        for item, complete in chunk:
            ... # process each item in the chunk
            await complete()

Note that a deletion mode of callback or handle is recommended when processing chunks, as otherwise an entire chunk is marked completed at the same time when the chunk is finished, which can lead to counter-intuitive behavior.

If a timeout is passed to chunks, it will wait until the timeout or until it has a chunk of sufficient size, whichever comes first.

async with sqsi.QueueIterator(uri=..., deletion_mode='callback') as queue:
    async for chunk in queue.chunks(size=10, timeout=30):
        for item, complete in chunk:
            ... # process each item in the chunk
            await complete()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqsi-0.1.9.tar.gz (5.4 kB view details)

Uploaded Source

Built Distribution

sqsi-0.1.9-py3-none-any.whl (5.9 kB view details)

Uploaded Python 3

File details

Details for the file sqsi-0.1.9.tar.gz.

File metadata

  • Download URL: sqsi-0.1.9.tar.gz
  • Upload date:
  • Size: 5.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for sqsi-0.1.9.tar.gz
Algorithm Hash digest
SHA256 3015afac496b9480c1f522722bda223706d39236b5a299853d2ff3c621031e5c
MD5 ffcb8ec7bc63ec2fd483dadfb3592ff5
BLAKE2b-256 e1e063fc6000ec10d0cfd819a7bcce000041730a678e123f30f1d4a823b326ce

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqsi-0.1.9.tar.gz:

Publisher: python-publish.yml on rchowe/sqsi

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sqsi-0.1.9-py3-none-any.whl.

File metadata

  • Download URL: sqsi-0.1.9-py3-none-any.whl
  • Upload date:
  • Size: 5.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for sqsi-0.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 88b783dffa8bd7daab6fbeef504b7eb141656af443145d272d8452147e890567
MD5 d1577fd394859575579917149fcbc4d8
BLAKE2b-256 82ee16a6ff8904e005c844bfa8d52ed59a8740bfd82b32df57487e425a2f81f0

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqsi-0.1.9-py3-none-any.whl:

Publisher: python-publish.yml on rchowe/sqsi

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page