Skip to main content

Simple async SQS client

Project description

simple-async-sqs

simple-async-sqs is a opinionated minimalistic async Python client to interact with SQS.

simple-async-sqs is for developers who are tired of repeated configurations in task frameworks and prefer a simple message processing library.

Installation

uv add simple-async-sqs

Optionally add the type stubs for used in development:

uv add --dev simple-async-sqs[stubs]

Usage

A consumer can be simply created by Client.consume which is simply an AsyncIterator.

Messages must be either ack'd or nack'd after processing.

import asyncio

from simple_async_sqs.queue_client import QueueClient


async def process(client: QueueClient):
    async for message in client.consume():
        try:
            print(message.get("Body"))
            ...
        except Exception:
            await client.nack(message, retry_timeout=20)
        else:
            await client.ack(message)


async def process_single():
    async with QueueClient.create("my-queue") as client:
        await process(client)

We can also easily parallelise the work here using asyncio.TaskGroup:

async def process_parallel(workers: int):
    async with QueueClient.create("my-queue") as client:
        async with asyncio.TaskGroup() as tg:
            for _ in range(workers):
                tg.create_task(process(client))

Lifecycles

Lifecycles define what happens during message processing - on success, error, and during processing. They provide a way to handle retries, heartbeats, and other message lifecycle concerns automatically.

RetryLifeCycle

Retries failed messages after a fixed interval indefinitely. The number of retries can be configured via SQS DLQ settings.

from simple_async_sqs.lifecycle import RetryLifeCycle

async def process_with_retry():
    async with QueueClient.create("my-queue") as client:
        lifecycle = RetryLifeCycle(client, retry_interval=30)
        async for message in client.consume():
            async with lifecycle(message) as msg:
                # Process message
                print(msg.get("Body"))
                # On exception: automatically retries after 30 seconds
                # On success: automatically acks the message

ExponentialRetryLifeCycle

Retries failed messages with exponential backoff. Each retry doubles the wait time based on the message's receive count.

from simple_async_sqs.lifecycle import ExponentialRetryLifeCycle

async def process_with_exponential_retry():
    async with QueueClient.create("my-queue") as client:
        lifecycle = ExponentialRetryLifeCycle(client, retry_interval=10)
        async for message in client.consume():
            async with lifecycle(message) as msg:
                # Process message
                print(msg.get("Body"))
                # On exception: retries with exponential backoff (10s, 20s, 40s, etc.)
                # On success: automatically acks the message

HeartbeatLifeCycle

Keeps messages alive by extending their visibility timeout with periodic heartbeats. Useful for long-running message processing.

from simple_async_sqs.lifecycle import HeartbeatLifeCycle, RetryLifeCycle

async def process_with_heartbeat():
    async with QueueClient.create("my-queue") as client:
        # Heartbeat every 60 seconds, with retry on failure
        lifecycle = HeartbeatLifeCycle(
            client, 
            interval=60, 
            inner_life_cycle=RetryLifeCycle(client, retry_interval=30)
        )
        async for message in client.consume():
            async with lifecycle(message) as msg:
                # Message visibility extended automatically every 60 seconds
                await asyncio.sleep(300)  # Long processing
                print(msg.get("Body"))
                # On exception: retries after 30 seconds
                # On success: automatically acks the message

Producer

We can simply produce a message by:

await client.producer("my_message_payload", delay=10)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simple_async_sqs-0.2.0.tar.gz (3.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

simple_async_sqs-0.2.0-py3-none-any.whl (4.9 kB view details)

Uploaded Python 3

File details

Details for the file simple_async_sqs-0.2.0.tar.gz.

File metadata

  • Download URL: simple_async_sqs-0.2.0.tar.gz
  • Upload date:
  • Size: 3.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for simple_async_sqs-0.2.0.tar.gz
Algorithm Hash digest
SHA256 4fe9f275435da74db49d9ab808ecd8cd82e552952b0192d7953eede949d29153
MD5 89d8a771b6780227d9fd9eb5f709718c
BLAKE2b-256 28d21eecb6425c2042aa9725a09e80a6b2def5faa5724bbe4b5afc8e8485c695

See more details on using hashes here.

Provenance

The following attestation bundles were made for simple_async_sqs-0.2.0.tar.gz:

Publisher: python-publish.yml on Jamie-Chang/simple-async-sqs

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file simple_async_sqs-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for simple_async_sqs-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d73dd275ad2619e76222539482ee7e3c230e98cdee8e1736be7e74c32a58638c
MD5 f76b62439a6a48a6eff906bdd471c398
BLAKE2b-256 0bc740cc0a17818a977313130b1ccdf579adc6b4b3feb63dbe37f2579cc8b3d0

See more details on using hashes here.

Provenance

The following attestation bundles were made for simple_async_sqs-0.2.0-py3-none-any.whl:

Publisher: python-publish.yml on Jamie-Chang/simple-async-sqs

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page