Poll AWS SQS using asyncio and execute callback.
Project description
sqs-polling
python sqs polling library
Example
worker/polling/task.py
import traceback
from pathlib import Path
from pprint import pprint
from sqs_polling import heartbeat, polling, ready
from sqs_polling.signal import shutdown
@polling(
queue_name="queue_name",
aws_profile={
"endpoint_url": "http://localstack:4566"
},
max_workers=5,
max_number_of_messages=10,
)
def task(*args, **kwargs):
# The contents of the body of sqs are output.
pprint({"kwargs": kwargs})
tmp_dir = get_env_value("TMPDIR")
HEARTBEAT_FILE = Path(f"{tmp_dir}/heartbeat")
READINESS_FILE = Path(f"{tmp_dir}/ready")
@heartbeat.connect
def heartbeat_sent_receiver(**_) -> None:
update = False
file_exists = HEARTBEAT_FILE.exists()
try:
# Process health checks of DBs, etc.
if healthCheck():
HEARTBEAT_FILE.touch()
if file_exists:
logger.info(f"{HEARTBEAT_FILE} updated")
else:
logger.info(f"{HEARTBEAT_FILE} created")
update = True
except Exception as e:
logger.error("Health check failure")
raise e
finally:
if not update:
logger.error("health check failure")
@ready.connect
def worker_ready_receiver(**_) -> None:
READINESS_FILE.touch()
@shutdown.connect
def worker_shutdown_receiver(**_) -> None:
for file in (HEARTBEAT_FILE, READINESS_FILE):
if file.is_file():
file.unlink()
management/commands/handler.py
from pathlib import Path
from typing import Any
from django.core.management.base import BaseCommand
from sqs_polling import main
class Command(BaseCommand):
_command_name = Path(__file__).stem
def handle(self, *args: Any, **options: Any) -> None:
main("worker.polling.task.task")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
sqs-apolling-0.1.dev1.tar.gz
(41.5 kB
view details)
Built Distribution
File details
Details for the file sqs-apolling-0.1.dev1.tar.gz
.
File metadata
- Download URL: sqs-apolling-0.1.dev1.tar.gz
- Upload date:
- Size: 41.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 067e91a6b45c7bbbd55b66e363c116fa0d89149ecf0a7809ed482603702ae5a2 |
|
MD5 | 31e99c07d2d8d05d200452ecb62ea30d |
|
BLAKE2b-256 | 1c090e219633cdaf56967d293827b5a85b6cd271cbe4dd1217b27be619ae7a6f |
File details
Details for the file sqs_apolling-0.1.dev1-py3-none-any.whl
.
File metadata
- Download URL: sqs_apolling-0.1.dev1-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 10435fc488122df5c59b9c2b779fee1dfb4c744cd5392c4a90b325d331192d07 |
|
MD5 | cdf6a6d2ac6818e29ad38702f9a79dd4 |
|
BLAKE2b-256 | 3266f8787bea23370ad468f49a22c1f8456d7105013bf2fe07451a1e8fb28bc1 |