A simple cloud agnostic Python SQS utility package
Project description
Publish new version
(if not yet done) Configure your ~/.pypirc : https://packaging.python.org/en/latest/specifications/pypirc/
Install build tool : python3 -m pip install --upgrade build && python3 -m pip install --upgrade twine
Update version number in setup.py
python3 -m build && python3 -m twine upload --repository pypi dist/*
Installation
pip install cloudAgnosticPySqsListener
Listening to a queue
Here is a basic code sample:
Standard Listener
from sqs_listener import SqsListener class MyListener(SqsListener): def handle_message(self, body, attributes, messages_attributes): run_my_function(body['param1'], body['param2']) listener = MyListener('my-message-queue', error_queue='my-error-queue', region_name='us-east-1') listener.listen()
Error Listener
from sqs_listener import SqsListener class MyErrorListener(SqsListener): def handle_message(self, body, attributes, messages_attributes): save_to_log(body['exception_type'], body['error_message'] error_listener = MyErrorListener('my-error-queue') error_listener.listen()
error_queue (str) - name of queue to push errors.
force_delete (boolean) - delete the message received from the queue, whether or not the handler function is successful. By default the message is deleted only if the handler function returns with no exceptions
interval (int) - number of seconds in between polls. Set to 60 by default
visibility_timeout (str) - Number of seconds the message will be invisible (‘in flight’) after being read. After this time interval it reappear in the queue if it wasn’t deleted in the meantime. Set to ‘600’ (10 minutes) by default
error_visibility_timeout (str) - Same as previous argument, for the error queue. Applicable only if the error_queue argument is set, and the queue doesn’t already exist.
wait_time (int) - number of seconds to wait for a message to arrive (for long polling). Set to 0 by default to provide short polling.
max_number_of_messages (int) - Max number of messages to receive from the queue. Set to 1 by default, max is 10
message_attribute_names (list) - message attributes by which to filter messages
attribute_names (list) - attributes by which to filter messages (see boto docs for difference between these two)
region_name (str) - AWS region name (defaults to us-east-1)
queue_url (str) - overrides queue parameter. Mostly useful for getting around this bug in the boto library
deserializer (function str -> dict) - Deserialization function that will be used to parse the message body. Set to python’s json.loads by default.
aws_access_key, aws_secret_key (str) - for manually providing AWS credentials
region_name (str), endpoint_name (str) - to manually providing SQS endpoint region & url
run_while_idle (Callable) - a function that will be called on every “empty” run, as a heartbeat, to make sure it’s still runing
Running as a Daemon
Logging
logger = logging.getLogger('sqs_listener') logger.setLevel(logging.INFO) sh = logging.StreamHandler() formatstr = '[%(asctime)s - %(name)s - %(levelname)s] %(message)s' formatter = logging.Formatter(formatstr) sh.setFormatter(formatter) logger.addHandler(sh)
logger = logging.getLogger('sqs_listener') logger.setLevel(logging.INFO) sh = logging.FileHandler('mylog.log') sh.setLevel(logging.INFO) formatstr = '[%(asctime)s - %(name)s - %(levelname)s] %(message)s' formatter = logging.Formatter(formatstr) sh.setFormatter(formatter) logger.addHandler(sh)
Sending messages
Launcher Example
from sqs_launcher import SqsLauncher launcher = SqsLauncher('my-queue') response = launcher.launch_message({'param1': 'hello', 'param2': 'world'})
Important Notes
For both the main queue and the error queue, if the queue doesn’t exist (in the specified region), it will be created at runtime.
The error queue receives only two values in the message body: exception_type and error_message. Both are of type str
If the function that the listener executes involves connecting to a database, you should explicitly close the connection at the end of the function. Otherwise, you’re likely to get an error like this: OperationalError(2006, 'MySQL server has gone away')
Either the queue name or the queue url should be provided. When both are provided the queue url is used and the queue name is ignored.
Contributing
Fork the repo and make a pull request.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file cloudAgnosticPySqsListener-0.9.4.tar.gz
.
File metadata
- Download URL: cloudAgnosticPySqsListener-0.9.4.tar.gz
- Upload date:
- Size: 10.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b170e4cac08487289f1c567a96ecf33e6148a7c56896fa5c936fb7aa0e7ba59d |
|
MD5 | ae087dde2484519c3d10eb1d4630e4f9 |
|
BLAKE2b-256 | dfe77fd1a89c58b2946852d56b937548cd009c4bbfd40b93420c3db82532980d |
File details
Details for the file cloudAgnosticPySqsListener-0.9.4-py3-none-any.whl
.
File metadata
- Download URL: cloudAgnosticPySqsListener-0.9.4-py3-none-any.whl
- Upload date:
- Size: 11.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 02d2616e92b8f2b53ae742360bebae2349ccb6f4cec507356523942827d371fa |
|
MD5 | 89c4aa1f0f5fb681e551967a623042f4 |
|
BLAKE2b-256 | cc29d1b6e31214b3bf4175fcdb5c7336cb3ab8210f57bb9424e2f691ed4a3c2f |