Skip to main content

Poll AWS SQS using asyncio and execute callback.

Project description

sqs-polling

python sqs polling library

Example

worker/polling/task.py

import traceback
from pathlib import Path
from pprint import pprint

from sqs_polling import heartbeat, polling, ready
from sqs_polling.signal import shutdown


@polling(
    queue_name="queue_name",
    aws_profile={
        "endpoint_url": "http://localstack:4566"
    },
    max_workers=5,
    max_number_of_messages=10,
)
def task(*args, **kwargs):
    # The contents of the body of sqs are output.
    pprint({"kwargs": kwargs})


tmp_dir = get_env_value("TMPDIR")
HEARTBEAT_FILE = Path(f"{tmp_dir}/heartbeat")
READINESS_FILE = Path(f"{tmp_dir}/ready")


@heartbeat.connect
def heartbeat_sent_receiver(**_) -> None:
    update = False
    file_exists = HEARTBEAT_FILE.exists()
    try:
        # Process health checks of DBs, etc.
        if healthCheck():
            HEARTBEAT_FILE.touch()
            if file_exists:
                logger.info(f"{HEARTBEAT_FILE} updated")
            else:
                logger.info(f"{HEARTBEAT_FILE} created")
            update = True
    except Exception as e:
        logger.error("Health check failure")
        raise e
    finally:
        if not update:
            logger.error("health check failure")


@ready.connect
def worker_ready_receiver(**_) -> None:
    READINESS_FILE.touch()


@shutdown.connect
def worker_shutdown_receiver(**_) -> None:
    for file in (HEARTBEAT_FILE, READINESS_FILE):
        if file.is_file():
            file.unlink()

management/commands/handler.py

from pathlib import Path
from typing import Any

from django.core.management.base import BaseCommand
from sqs_polling import main


class Command(BaseCommand):
    _command_name = Path(__file__).stem

    def handle(self, *args: Any, **options: Any) -> None:
        main("worker.polling.task.task")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqs-apolling-0.1.dev1.tar.gz (41.5 kB view details)

Uploaded Source

Built Distribution

sqs_apolling-0.1.dev1-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file sqs-apolling-0.1.dev1.tar.gz.

File metadata

  • Download URL: sqs-apolling-0.1.dev1.tar.gz
  • Upload date:
  • Size: 41.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for sqs-apolling-0.1.dev1.tar.gz
Algorithm Hash digest
SHA256 067e91a6b45c7bbbd55b66e363c116fa0d89149ecf0a7809ed482603702ae5a2
MD5 31e99c07d2d8d05d200452ecb62ea30d
BLAKE2b-256 1c090e219633cdaf56967d293827b5a85b6cd271cbe4dd1217b27be619ae7a6f

See more details on using hashes here.

File details

Details for the file sqs_apolling-0.1.dev1-py3-none-any.whl.

File metadata

File hashes

Hashes for sqs_apolling-0.1.dev1-py3-none-any.whl
Algorithm Hash digest
SHA256 10435fc488122df5c59b9c2b779fee1dfb4c744cd5392c4a90b325d331192d07
MD5 cdf6a6d2ac6818e29ad38702f9a79dd4
BLAKE2b-256 3266f8787bea23370ad468f49a22c1f8456d7105013bf2fe07451a1e8fb28bc1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page