Skip to main content

No project description provided

Project description

Async SQS consumer

Python asynchronous (async / await) worker for consuming messages from AWS SQS.

This is a hobby project, if you find the project interesting any contribution is welcome.

Usage

You must create an instance of the worker with the url of the queue.

Aws credentials are taken from environment variables, you must set the following environment variables. Or you can provide a Context object with the aws credentials async_sqs_consumer.types.Context

  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY

Example:

You can get the queue url with the follow aws cli command aws sqs get-queue-url --queue-name xxxxxx

# test_worker.py

from async_sqs_consumer.worker import (
    Worker,
)

worker = Worker(
    queue_url="https://sqs.us-east-1.amazonaws.com/xxxxxxx/queue_name"
)


@worker.task("report")
async def report(text: str) -> None:
    print(text)

if __name__: "__main__":
    worker.start()

Now you can initialize the worker python test_worker.py

Now you need to post a message for the worker to process

import json
import boto3
import uuid

client = boto3.client("sqs")

client.send_message(
    QueueUrl="https://sqs.us-east-1.amazonaws.com/xxxxxxx/queue_name",
    MessageBody=json.dumps(
        {
            "task": "report",
            "id": uuid.uuid4().hex,
            "args": ["hello world"],
        }
    ),
)

Or you can use aioboto3

import asyncio
import json
import aioboto3
import uuid


async def main() -> None:
    session = aioboto3.Session()
    async with session.client("sqs") as client:
        await client.send_message(
            QueueUrl="https://sqs.us-east-1.amazonaws.com/xxxxxxx/queue_name",
            MessageBody=json.dumps(
                {
                    "task": "report",
                    "id": uuid.uuid4().hex,
                    "args": ["hello world"],
                }
            ),
        )


if __name__ == "__main__":
    asyncio.run(main())

To publish the messages they must have the following structure

{
    "type": "object",
    "properties": {
        "task": {"type": "string"},
        "id": {"type": "string"},
        "args": {"type": "array"},
        "kwargs": {"type": "object"},
        "retries": {"type": "number"},
        "eta": {"type": "string"},
        "expires": {"type": "string"},
    },
    "required": ["task", "id"],
}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_sqs_consumer-0.3.8.tar.gz (9.7 kB view details)

Uploaded Source

Built Distribution

async_sqs_consumer-0.3.8-py3-none-any.whl (11.4 kB view details)

Uploaded Python 3

File details

Details for the file async_sqs_consumer-0.3.8.tar.gz.

File metadata

  • Download URL: async_sqs_consumer-0.3.8.tar.gz
  • Upload date:
  • Size: 9.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.3.2 CPython/3.10.10 Linux/6.2.5

File hashes

Hashes for async_sqs_consumer-0.3.8.tar.gz
Algorithm Hash digest
SHA256 1afdcafc0a494de6fec8027f4c12fc6dd805535cbdaf6330da1b831a244a45ed
MD5 5f32cd53363fcff04d268ae936624aee
BLAKE2b-256 e315c9825ec20f363c1520e2d8de8ccbaf5b80a055220f19b49bd9992039d608

See more details on using hashes here.

File details

Details for the file async_sqs_consumer-0.3.8-py3-none-any.whl.

File metadata

File hashes

Hashes for async_sqs_consumer-0.3.8-py3-none-any.whl
Algorithm Hash digest
SHA256 7b96c90e27bdc2af34889053f5285b40733a59cea27d419171fede797990041b
MD5 3c4b573a4af5655135a25d06f0828a06
BLAKE2b-256 cf58dea2ac04251349f2e044ec90646a985b772d77731e77b102bf13998919be

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page