No project description provided
Project description
Async SQS consumer
Python asynchronous (async / await) worker for consuming messages from AWS SQS.
This is a hobby project, if you find the project interesting any contribution is welcome.
Usage
You must create an instance of the worker with the url of the queue.
Aws credentials are taken from environment variables, you must set the
following environment variables. Or you can provide a Context object with the
aws credentials async_sqs_consumer.types.Context
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
Example:
You can get the queue url with the follow aws cli command
aws sqs get-queue-url --queue-name xxxxxx
# test_worker.py
from async_sqs_consumer.worker import (
Worker,
)
worker = Worker(
queue_url="https://sqs.us-east-1.amazonaws.com/xxxxxxx/queue_name"
)
@worker.task("report")
async def report(text: str) -> None:
print(text)
if __name__: "__main__":
worker.start()
Now you can initialize the worker python test_worker.py
Now you need to post a message for the worker to process
import json
import boto3
import uuid
client = boto3.client("sqs")
client.send_message(
QueueUrl="https://sqs.us-east-1.amazonaws.com/xxxxxxx/queue_name",
MessageBody=json.dumps(
{
"task": "report",
"id": uuid.uuid4().hex,
"args": ["hello world"],
}
),
)
Or you can use aioboto3
import asyncio
import json
import aioboto3
import uuid
async def main() -> None:
session = aioboto3.Session()
async with session.client("sqs") as client:
await client.send_message(
QueueUrl="https://sqs.us-east-1.amazonaws.com/xxxxxxx/queue_name",
MessageBody=json.dumps(
{
"task": "report",
"id": uuid.uuid4().hex,
"args": ["hello world"],
}
),
)
if __name__ == "__main__":
asyncio.run(main())
To publish the messages they must have the following structure
{
"type": "object",
"properties": {
"task": {"type": "string"},
"id": {"type": "string"},
"args": {"type": "array"},
"kwargs": {"type": "object"},
"retries": {"type": "number"},
"eta": {"type": "string"},
"expires": {"type": "string"},
},
"required": ["task", "id"],
}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file async_sqs_consumer-0.3.8.tar.gz
.
File metadata
- Download URL: async_sqs_consumer-0.3.8.tar.gz
- Upload date:
- Size: 9.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.2 CPython/3.10.10 Linux/6.2.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1afdcafc0a494de6fec8027f4c12fc6dd805535cbdaf6330da1b831a244a45ed |
|
MD5 | 5f32cd53363fcff04d268ae936624aee |
|
BLAKE2b-256 | e315c9825ec20f363c1520e2d8de8ccbaf5b80a055220f19b49bd9992039d608 |
File details
Details for the file async_sqs_consumer-0.3.8-py3-none-any.whl
.
File metadata
- Download URL: async_sqs_consumer-0.3.8-py3-none-any.whl
- Upload date:
- Size: 11.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.2 CPython/3.10.10 Linux/6.2.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7b96c90e27bdc2af34889053f5285b40733a59cea27d419171fede797990041b |
|
MD5 | 3c4b573a4af5655135a25d06f0828a06 |
|
BLAKE2b-256 | cf58dea2ac04251349f2e044ec90646a985b772d77731e77b102bf13998919be |