Poll AWS SQS using asyncio and execute callback.
Project description
sqs-polling
python sqs polling library
Example
worker/polling/task.py
import traceback
from pathlib import Path
from pprint import pprint
from sqs_polling import heartbeat, polling, ready
from sqs_polling.signal import shutdown
@polling(
queue_name="queue_name",
aws_profile={
"endpoint_url": "http://localstack:4566"
},
max_workers=5,
max_number_of_messages=10,
)
def task(*args, **kwargs):
# The contents of the body of sqs are output.
pprint({"kwargs": kwargs})
tmp_dir = get_env_value("TMPDIR")
HEARTBEAT_FILE = Path(f"{tmp_dir}/heartbeat")
READINESS_FILE = Path(f"{tmp_dir}/ready")
@heartbeat.connect
def heartbeat_sent_receiver(**_) -> None:
update = False
file_exists = HEARTBEAT_FILE.exists()
try:
# Process health checks of DBs, etc.
if healthCheck():
HEARTBEAT_FILE.touch()
if file_exists:
logger.info(f"{HEARTBEAT_FILE} updated")
else:
logger.info(f"{HEARTBEAT_FILE} created")
update = True
except Exception as e:
logger.error("Health check failure")
raise e
finally:
if not update:
logger.error("health check failure")
@ready.connect
def worker_ready_receiver(**_) -> None:
READINESS_FILE.touch()
@shutdown.connect
def worker_shutdown_receiver(**_) -> None:
for file in (HEARTBEAT_FILE, READINESS_FILE):
if file.is_file():
file.unlink()
management/commands/handler.py
from pathlib import Path
from typing import Any
from django.core.management.base import BaseCommand
from sqs_polling import main
class Command(BaseCommand):
_command_name = Path(__file__).stem
def handle(self, *args: Any, **options: Any) -> None:
main("worker.polling.task.task")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
File details
Details for the file sqs_apolling-0.0.1a2.dev0-py3-none-any.whl
.
File metadata
- Download URL: sqs_apolling-0.0.1a2.dev0-py3-none-any.whl
- Upload date:
- Size: 12.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c4a8e26ef1b27c1fbf4070bea81256128f6753d7059786ef5e5b5ed879402ce3 |
|
MD5 | 05a393708d83ba848fc595a1414ebe03 |
|
BLAKE2b-256 | a369902719199278d29f0a029683d9ef5921b4f63d215f0c4a00d655169456c3 |