Skip to main content

Poll AWS SQS using asyncio and execute callback.

Project description

sqs-polling

python sqs polling library

Example

worker/polling/task.py

import traceback
from pathlib import Path
from pprint import pprint

from sqs_polling import heartbeat, polling, ready
from sqs_polling.signal import shutdown


@polling(
    queue_name="queue_name",
    aws_profile={
        "endpoint_url": "http://localstack:4566"
    },
    max_workers=5,
    max_number_of_messages=10,
)
def task(*args, **kwargs):
    # The contents of the body of sqs are output.
    pprint({"kwargs": kwargs})


tmp_dir = get_env_value("TMPDIR")
HEARTBEAT_FILE = Path(f"{tmp_dir}/heartbeat")
READINESS_FILE = Path(f"{tmp_dir}/ready")


@heartbeat.connect
def heartbeat_sent_receiver(**_) -> None:
    update = False
    file_exists = HEARTBEAT_FILE.exists()
    try:
        # Process health checks of DBs, etc.
        if healthCheck():
            HEARTBEAT_FILE.touch()
            if file_exists:
                logger.info(f"{HEARTBEAT_FILE} updated")
            else:
                logger.info(f"{HEARTBEAT_FILE} created")
            update = True
    except Exception as e:
        logger.error("Health check failure")
        raise e
    finally:
        if not update:
            logger.error("health check failure")


@ready.connect
def worker_ready_receiver(**_) -> None:
    READINESS_FILE.touch()


@shutdown.connect
def worker_shutdown_receiver(**_) -> None:
    for file in (HEARTBEAT_FILE, READINESS_FILE):
        if file.is_file():
            file.unlink()

management/commands/handler.py

from pathlib import Path
from typing import Any

from django.core.management.base import BaseCommand
from sqs_polling import main


class Command(BaseCommand):
    _command_name = Path(__file__).stem

    def handle(self, *args: Any, **options: Any) -> None:
        main("worker.polling.task.task")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqs-apolling-0.0.4.tar.gz (42.4 kB view details)

Uploaded Source

Built Distribution

sqs_apolling-0.0.4-py3-none-any.whl (13.2 kB view details)

Uploaded Python 3

File details

Details for the file sqs-apolling-0.0.4.tar.gz.

File metadata

  • Download URL: sqs-apolling-0.0.4.tar.gz
  • Upload date:
  • Size: 42.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for sqs-apolling-0.0.4.tar.gz
Algorithm Hash digest
SHA256 16e4be4eb74ecd3d892ab2aabf70d700dde3c1cbd40f3e06ee0cbeeb985c7e84
MD5 b388dbba16e30742e553332e91d04908
BLAKE2b-256 ccea8472b26036fff913b2d3a3610e1287d875c21998cf1f34f77d0a4e1f2709

See more details on using hashes here.

File details

Details for the file sqs_apolling-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: sqs_apolling-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 13.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for sqs_apolling-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 14f47d4cf11c1b3e0bc9b2802a507db0b66bdd7d285c4b14b452a9761c11365f
MD5 02f6635feeac9e2d8c04f07ac39b60fe
BLAKE2b-256 feb35d8b1424119c24fcb8fed7c56bcc80b0cde530e2c4b03d86859457aea742

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page