Poll AWS SQS using asyncio and execute callback.
Project description
sqs-polling
python sqs polling library
Example
worker/polling/task.py
import traceback
from pathlib import Path
from pprint import pprint
from sqs_polling import heartbeat, polling, ready
from sqs_polling.signal import shutdown
@polling(
queue_name="queue_name",
aws_profile={
"endpoint_url": "http://localstack:4566"
},
max_workers=5,
max_number_of_messages=10,
)
def task(*args, **kwargs):
# The contents of the body of sqs are output.
pprint({"kwargs": kwargs})
tmp_dir = get_env_value("TMPDIR")
HEARTBEAT_FILE = Path(f"{tmp_dir}/heartbeat")
READINESS_FILE = Path(f"{tmp_dir}/ready")
@heartbeat.connect
def heartbeat_sent_receiver(**_) -> None:
update = False
file_exists = HEARTBEAT_FILE.exists()
try:
# Process health checks of DBs, etc.
if healthCheck():
HEARTBEAT_FILE.touch()
if file_exists:
logger.info(f"{HEARTBEAT_FILE} updated")
else:
logger.info(f"{HEARTBEAT_FILE} created")
update = True
except Exception as e:
logger.error("Health check failure")
raise e
finally:
if not update:
logger.error("health check failure")
@ready.connect
def worker_ready_receiver(**_) -> None:
READINESS_FILE.touch()
@shutdown.connect
def worker_shutdown_receiver(**_) -> None:
for file in (HEARTBEAT_FILE, READINESS_FILE):
if file.is_file():
file.unlink()
management/commands/handler.py
from pathlib import Path
from typing import Any
from django.core.management.base import BaseCommand
from sqs_polling import main
class Command(BaseCommand):
_command_name = Path(__file__).stem
def handle(self, *args: Any, **options: Any) -> None:
main("worker.polling.task.task")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
sqs-apolling-0.0.3.tar.gz
(41.8 kB
view details)
Built Distribution
File details
Details for the file sqs-apolling-0.0.3.tar.gz
.
File metadata
- Download URL: sqs-apolling-0.0.3.tar.gz
- Upload date:
- Size: 41.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4c9f28f2978f5eae917d309b5251039b3f43a8b0ec83ff82be05afb91231a7e9 |
|
MD5 | 64c56fb25f4834895cff1c86100d83e0 |
|
BLAKE2b-256 | fc945b3480f298175196f6b747d5e2a55e5fa2734a64483cda5b92e0fe21eedb |
File details
Details for the file sqs_apolling-0.0.3-py3-none-any.whl
.
File metadata
- Download URL: sqs_apolling-0.0.3-py3-none-any.whl
- Upload date:
- Size: 13.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 28b1e18fa8625f07eeee7d0ca1402da0421794eac4bb71c3386cd0d11efbd3af |
|
MD5 | c7826d27fa9470b93dfea9e9002789b5 |
|
BLAKE2b-256 | 337eea5b4b59b99cff3e47083c7b60e8ea1c699414ad08f2877c1e831aa68d30 |