Skip to main content

Async Iterators for SQS Queues

Project description

pypi

This project (sqsi, short for SQS iterable, pronounced "squeezy") is a way to access SQS queues as async iterators in Python, because I am tired of writing custom SQS logic every time I just want to read stuff out of a queue.

The basic operation of the library is like this:

async with sqsi.QueueIterator(uri=...) as queue:
    async for item in queue:
        ... # process the item

        # If no exception is thrown, the item will be automatically deleted
        # from the queue when the next item is requested.

If you want more fine-grained control over when items are deleted from the queue, you can manually delete the items. For convenience, the mark_complete method can accept any number of handles as positional arguments.

async with sqsi.QueueIterator(uri=..., deletion_mode='handle') as queue:
    async for item, receipt_handle in queue:
        ... # process the item

        await queue.mark_complete(receipt_handle)

Or, to avoid having to pass around the queue object to other classes, you can ask for a callback method instead (though make sure you are still somehow within the async context where queue is valid):

async with sqsi.QueueIterator(uri=..., deletion_mode='callback') as queue:
    async for item, complete in queue:
        ... # process the item

        await complete()

You can also specify a way to transform the input from SQS, if you wish to process items other than strings:

async with sqsi.QueueIterator(uri=..., transformer=json.loads, transformer_exceptions='skip-delete') as queue:
    async for item in queue:
        ... # item is a JSON object here

In this case, we use transformer_exceptions='skip-delete' to silence JSON exceptions and skip the item (and delete it from the queue) when it does not parse.

If you need to process messages in chunks of a certain size, a builtin method is provided to generate those chunks.

async with sqsi.QueueIterator(uri=..., deletion_mode='callback') as queue:
    async for chunk in queue.chunks(size=10):
        for item, complete in chunk:
            ... # process each item in the chunk
            await complete()

Note that a deletion mode of callback or handle is recommended when processing chunks, as otherwise an entire chunk is marked completed at the same time when the chunk is finished, which can lead to counter-intuitive behavior.

If a timeout is passed to chunks, it will wait until the timeout or until it has a chunk of sufficient size, whichever comes first.

async with sqsi.QueueIterator(uri=..., deletion_mode='callback') as queue:
    async for chunk in queue.chunks(size=10, timeout=30):
        for item, complete in chunk:
            ... # process each item in the chunk
            await complete()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqsi-0.1.10.tar.gz (5.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sqsi-0.1.10-py3-none-any.whl (5.9 kB view details)

Uploaded Python 3

File details

Details for the file sqsi-0.1.10.tar.gz.

File metadata

  • Download URL: sqsi-0.1.10.tar.gz
  • Upload date:
  • Size: 5.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for sqsi-0.1.10.tar.gz
Algorithm Hash digest
SHA256 19bdc4b6a8796956f60a4dc841b81e093af170ce51c1cf12f209a2fcdac14bee
MD5 47585dea8883b7b0ade766415241183e
BLAKE2b-256 4a70d7e76e84dc47dd1cfab78ec3684684d74051b0f764ad013ed127eb294f1e

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqsi-0.1.10.tar.gz:

Publisher: python-publish.yml on rchowe/sqsi

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sqsi-0.1.10-py3-none-any.whl.

File metadata

  • Download URL: sqsi-0.1.10-py3-none-any.whl
  • Upload date:
  • Size: 5.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for sqsi-0.1.10-py3-none-any.whl
Algorithm Hash digest
SHA256 d4c4643ede918f070c11abf14bf752193d9ec3f37ce4e367607eec6a490b434c
MD5 727438853fa59ec73a3c0d9454ac2e82
BLAKE2b-256 931f7a5ea3c45de06576b95bdd731c979de0ed0fd0256e92e3a2c57cb11e9ffa

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqsi-0.1.10-py3-none-any.whl:

Publisher: python-publish.yml on rchowe/sqsi

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page