Skip to main content

Poll AWS SQS using asyncio and execute callback.

Project description

sqs-polling

python sqs polling library

Example

worker/polling/task.py

import traceback
from pathlib import Path
from pprint import pprint

from sqs_polling import heartbeat, polling, ready
from sqs_polling.signal import shutdown


@polling(
    queue_name="queue_name",
    aws_profile={
        "endpoint_url": "http://localstack:4566"
    },
    max_workers=5,
    max_number_of_messages=10,
)
def task(*args, **kwargs):
    # The contents of the body of sqs are output.
    pprint({"kwargs": kwargs})


tmp_dir = get_env_value("TMPDIR")
HEARTBEAT_FILE = Path(f"{tmp_dir}/heartbeat")
READINESS_FILE = Path(f"{tmp_dir}/ready")


@heartbeat.connect
def heartbeat_sent_receiver(**_) -> None:
    update = False
    file_exists = HEARTBEAT_FILE.exists()
    try:
        # Process health checks of DBs, etc.
        if healthCheck():
            HEARTBEAT_FILE.touch()
            if file_exists:
                logger.info(f"{HEARTBEAT_FILE} updated")
            else:
                logger.info(f"{HEARTBEAT_FILE} created")
            update = True
    except Exception as e:
        logger.error("Health check failure")
        raise e
    finally:
        if not update:
            logger.error("health check failure")


@ready.connect
def worker_ready_receiver(**_) -> None:
    READINESS_FILE.touch()


@shutdown.connect
def worker_shutdown_receiver(**_) -> None:
    for file in (HEARTBEAT_FILE, READINESS_FILE):
        if file.is_file():
            file.unlink()

management/commands/handler.py

from pathlib import Path
from typing import Any

from django.core.management.base import BaseCommand
from sqs_polling import main


class Command(BaseCommand):
    _command_name = Path(__file__).stem

    def handle(self, *args: Any, **options: Any) -> None:
        main("worker.polling.task.task")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqs-apolling-0.0.3.tar.gz (41.8 kB view details)

Uploaded Source

Built Distribution

sqs_apolling-0.0.3-py3-none-any.whl (13.0 kB view details)

Uploaded Python 3

File details

Details for the file sqs-apolling-0.0.3.tar.gz.

File metadata

  • Download URL: sqs-apolling-0.0.3.tar.gz
  • Upload date:
  • Size: 41.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for sqs-apolling-0.0.3.tar.gz
Algorithm Hash digest
SHA256 4c9f28f2978f5eae917d309b5251039b3f43a8b0ec83ff82be05afb91231a7e9
MD5 64c56fb25f4834895cff1c86100d83e0
BLAKE2b-256 fc945b3480f298175196f6b747d5e2a55e5fa2734a64483cda5b92e0fe21eedb

See more details on using hashes here.

File details

Details for the file sqs_apolling-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: sqs_apolling-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 13.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for sqs_apolling-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 28b1e18fa8625f07eeee7d0ca1402da0421794eac4bb71c3386cd0d11efbd3af
MD5 c7826d27fa9470b93dfea9e9002789b5
BLAKE2b-256 337eea5b4b59b99cff3e47083c7b60e8ea1c699414ad08f2877c1e831aa68d30

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page