Skip to main content

Poll AWS SQS using asyncio and execute callback.

Project description

sqs-polling

python sqs polling library

Example

worker/polling/task.py

import traceback
from pathlib import Path
from pprint import pprint

from sqs_polling import heartbeat, polling, ready
from sqs_polling.signal import shutdown


@polling(
    queue_name="queue_name",
    aws_profile={
        "endpoint_url": "http://localstack:4566"
    },
    max_workers=5,
    max_number_of_messages=10,
)
def task(*args, **kwargs):
    # The contents of the body of sqs are output.
    pprint({"kwargs": kwargs})


tmp_dir = get_env_value("TMPDIR")
HEARTBEAT_FILE = Path(f"{tmp_dir}/heartbeat")
READINESS_FILE = Path(f"{tmp_dir}/ready")


@heartbeat.connect
def heartbeat_sent_receiver(**_) -> None:
    update = False
    file_exists = HEARTBEAT_FILE.exists()
    try:
        # Process health checks of DBs, etc.
        if healthCheck():
            HEARTBEAT_FILE.touch()
            if file_exists:
                logger.info(f"{HEARTBEAT_FILE} updated")
            else:
                logger.info(f"{HEARTBEAT_FILE} created")
            update = True
    except Exception as e:
        logger.error("Health check failure")
        raise e
    finally:
        if not update:
            logger.error("health check failure")


@ready.connect
def worker_ready_receiver(**_) -> None:
    READINESS_FILE.touch()


@shutdown.connect
def worker_shutdown_receiver(**_) -> None:
    for file in (HEARTBEAT_FILE, READINESS_FILE):
        if file.is_file():
            file.unlink()

management/commands/handler.py

from pathlib import Path
from typing import Any

from django.core.management.base import BaseCommand
from sqs_polling import main


class Command(BaseCommand):
    _command_name = Path(__file__).stem

    def handle(self, *args: Any, **options: Any) -> None:
        main("worker.polling.task.task")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqs-apolling-0.0.5.tar.gz (56.6 kB view details)

Uploaded Source

Built Distribution

sqs_apolling-0.0.5-py3-none-any.whl (13.3 kB view details)

Uploaded Python 3

File details

Details for the file sqs-apolling-0.0.5.tar.gz.

File metadata

  • Download URL: sqs-apolling-0.0.5.tar.gz
  • Upload date:
  • Size: 56.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for sqs-apolling-0.0.5.tar.gz
Algorithm Hash digest
SHA256 9b8fe499c30d0bd84001f635145bbd7cb4ac69b60f6138ce8d1f93599a05f401
MD5 37abc70feac97ce8dba482ecbb58ff06
BLAKE2b-256 801820949ab0526f9121ec3a01676b1eb10ee69a0d7e6c0850815b292efa0bd8

See more details on using hashes here.

File details

Details for the file sqs_apolling-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: sqs_apolling-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 13.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for sqs_apolling-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 c870751114339ac4842dc92e0001c8a9763488c52188f36ba1e7659acf1fb369
MD5 075667c4ca4b84225bb7b174378c9eca
BLAKE2b-256 39d8c97cc1ca68209c00ff6205ad75d55bea57bf3f46fd09b38d75fbc3dea13e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page