Poll AWS SQS using asyncio and execute callback.
Project description
sqs-polling
python sqs polling library
Example
worker/polling/task.py
import traceback
from pathlib import Path
from pprint import pprint
from sqs_polling import heartbeat, polling, ready
from sqs_polling.signal import shutdown
@polling(
queue_name="queue_name",
aws_profile={
"endpoint_url": "http://localstack:4566"
},
max_workers=5,
max_number_of_messages=10,
)
def task(*args, **kwargs):
# The contents of the body of sqs are output.
pprint({"kwargs": kwargs})
tmp_dir = get_env_value("TMPDIR")
HEARTBEAT_FILE = Path(f"{tmp_dir}/heartbeat")
READINESS_FILE = Path(f"{tmp_dir}/ready")
@heartbeat.connect
def heartbeat_sent_receiver(**_) -> None:
update = False
file_exists = HEARTBEAT_FILE.exists()
try:
# Process health checks of DBs, etc.
if healthCheck():
HEARTBEAT_FILE.touch()
if file_exists:
logger.info(f"{HEARTBEAT_FILE} updated")
else:
logger.info(f"{HEARTBEAT_FILE} created")
update = True
except Exception as e:
logger.error("Health check failure")
raise e
finally:
if not update:
logger.error("health check failure")
@ready.connect
def worker_ready_receiver(**_) -> None:
READINESS_FILE.touch()
@shutdown.connect
def worker_shutdown_receiver(**_) -> None:
for file in (HEARTBEAT_FILE, READINESS_FILE):
if file.is_file():
file.unlink()
management/commands/handler.py
from pathlib import Path
from typing import Any
from django.core.management.base import BaseCommand
from sqs_polling import main
class Command(BaseCommand):
_command_name = Path(__file__).stem
def handle(self, *args: Any, **options: Any) -> None:
main("worker.polling.task.task")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
sqs-apolling-0.0.5.tar.gz
(56.6 kB
view details)
Built Distribution
File details
Details for the file sqs-apolling-0.0.5.tar.gz
.
File metadata
- Download URL: sqs-apolling-0.0.5.tar.gz
- Upload date:
- Size: 56.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9b8fe499c30d0bd84001f635145bbd7cb4ac69b60f6138ce8d1f93599a05f401 |
|
MD5 | 37abc70feac97ce8dba482ecbb58ff06 |
|
BLAKE2b-256 | 801820949ab0526f9121ec3a01676b1eb10ee69a0d7e6c0850815b292efa0bd8 |
File details
Details for the file sqs_apolling-0.0.5-py3-none-any.whl
.
File metadata
- Download URL: sqs_apolling-0.0.5-py3-none-any.whl
- Upload date:
- Size: 13.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c870751114339ac4842dc92e0001c8a9763488c52188f36ba1e7659acf1fb369 |
|
MD5 | 075667c4ca4b84225bb7b174378c9eca |
|
BLAKE2b-256 | 39d8c97cc1ca68209c00ff6205ad75d55bea57bf3f46fd09b38d75fbc3dea13e |