Skip to main content

No project description provided

Project description

Async SQS consumer

Python asynchronous (async / await) worker for consuming messages from AWS SQS.

This is a hobby project, if you find the project interesting any contribution is welcome.

Usage

You must create an instance of the worker with the url of the queue.

Aws credentials are taken from environment variables, you must set the following environment variables. Or you can provide a Context object with the aws credentials async_sqs_consumer.types.Context

  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY

Example:

You can get the queue url with the follow aws cli command aws sqs get-queue-url --queue-name xxxxxx

# test_worker.py

from async_sqs_consumer.worker import (
    Worker,
)

worker = Worker(
    queue_url="https://sqs.us-east-1.amazonaws.com/xxxxxxx/queue_name"
)


@worker.task("report")
async def report(text: str) -> None:
    print(text)

if __name__: "__main__":
    worker.start()

Now you can initialize the worker python test_worker.py

Now you need to post a message for the worker to process

import json
import boto3
import uuid

client = boto3.client("sqs")

client.send_message(
    QueueUrl="https://sqs.us-east-1.amazonaws.com/xxxxxxx/queue_name",
    MessageBody=json.dumps(
        {
            "task": "report",
            "id": uuid.uuid4().hex,
            "args": ["hello world"],
        }
    ),
)

Or you can use aioboto3

import asyncio
import json
import aioboto3
import uuid


async def main() -> None:
    session = aioboto3.Session()
    async with session.client("sqs") as client:
        await client.send_message(
            QueueUrl="https://sqs.us-east-1.amazonaws.com/xxxxxxx/queue_name",
            MessageBody=json.dumps(
                {
                    "task": "report",
                    "id": uuid.uuid4().hex,
                    "args": ["hello world"],
                }
            ),
        )


if __name__ == "__main__":
    asyncio.run(main())

To publish the messages they must have the following structure

{
    "type": "object",
    "properties": {
        "task": {"type": "string"},
        "id": {"type": "string"},
        "args": {"type": "array"},
        "kwargs": {"type": "object"},
        "retries": {"type": "number"},
        "eta": {"type": "string"},
        "expires": {"type": "string"},
    },
    "required": ["task", "id"],
}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_sqs_consumer-1.1.0.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_sqs_consumer-1.1.0-py3-none-any.whl (11.6 kB view details)

Uploaded Python 3

File details

Details for the file async_sqs_consumer-1.1.0.tar.gz.

File metadata

  • Download URL: async_sqs_consumer-1.1.0.tar.gz
  • Upload date:
  • Size: 8.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for async_sqs_consumer-1.1.0.tar.gz
Algorithm Hash digest
SHA256 eef32dede34524081f20d0b844c3834b4666ca67efafc423d6403bb13268a3c3
MD5 029aa16c5fd11c8136f5ff2129cbe420
BLAKE2b-256 2e39dc07470127ff481c90672b7ff541b5524ffd52add5553996e9718e00248e

See more details on using hashes here.

File details

Details for the file async_sqs_consumer-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: async_sqs_consumer-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for async_sqs_consumer-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 24751c7eb48271e77aecd539df684b1229ea681238dd984680258527579f8626
MD5 5d4ad15f5fb37c83679bb22a1e085946
BLAKE2b-256 eda3a991605ed11b0759ba2b870338040a8eefbdecb960404a8f96a4a834de0d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page