Skip to main content

A simple cloud agnostic Python SQS utility package

Project description

[WIP] DOC TO BE UPDATED TO LATEST CHANGES This package takes care of the boilerplate involved in listening to an SQS queue, as well as sending messages to a queue. Works with python 2.7 & 3.6+.

Publish new version

  • (if not yet done) Configure your ~/.pypirc : https://packaging.python.org/en/latest/specifications/pypirc/

  • Install build tool : python3 -m pip install --upgrade build && python3 -m pip install --upgrade twine

  • Update version number in setup.py

  • python3 -m build && python3 -m twine upload --repository cloudAgnosticPySqsListener dist/*

Installation

pip install cloudAgnosticPySqsListener

Listening to a queue

Using the listener is very straightforward - just inherit from the SqsListener class and implement the handle_message() method. The queue will be created at runtime if it doesn’t already exist. You can also specify an error queue to automatically push any errors to.

Here is a basic code sample:

Standard Listener

from sqs_listener import SqsListener

class MyListener(SqsListener):
    def handle_message(self, body, attributes, messages_attributes):
        run_my_function(body['param1'], body['param2'])

listener = MyListener('my-message-queue', error_queue='my-error-queue', region_name='us-east-1')
listener.listen()

Error Listener

from sqs_listener import SqsListener
class MyErrorListener(SqsListener):
    def handle_message(self, body, attributes, messages_attributes):
        save_to_log(body['exception_type'], body['error_message']

error_listener = MyErrorListener('my-error-queue')
error_listener.listen()
The options available as kwargs are as follows:
  • error_queue (str) - name of queue to push errors.

  • force_delete (boolean) - delete the message received from the queue, whether or not the handler function is successful. By default the message is deleted only if the handler function returns with no exceptions

  • interval (int) - number of seconds in between polls. Set to 60 by default

  • visibility_timeout (str) - Number of seconds the message will be invisible (‘in flight’) after being read. After this time interval it reappear in the queue if it wasn’t deleted in the meantime. Set to ‘600’ (10 minutes) by default

  • error_visibility_timeout (str) - Same as previous argument, for the error queue. Applicable only if the error_queue argument is set, and the queue doesn’t already exist.

  • wait_time (int) - number of seconds to wait for a message to arrive (for long polling). Set to 0 by default to provide short polling.

  • max_number_of_messages (int) - Max number of messages to receive from the queue. Set to 1 by default, max is 10

  • message_attribute_names (list) - message attributes by which to filter messages

  • attribute_names (list) - attributes by which to filter messages (see boto docs for difference between these two)

  • region_name (str) - AWS region name (defaults to us-east-1)

  • queue_url (str) - overrides queue parameter. Mostly useful for getting around this bug in the boto library

  • deserializer (function str -> dict) - Deserialization function that will be used to parse the message body. Set to python’s json.loads by default.

  • aws_access_key, aws_secret_key (str) - for manually providing AWS credentials

  • region_name (str), endpoint_name (str) - to manually providing SQS endpoint region & url

  • run_while_idle (Callable) - a function that will be called on every “empty” run, as a heartbeat, to make sure it’s still runing

Running as a Daemon

Typically, in a production environment, you’ll want to listen to an SQS queue with a daemonized process. The simplest way to do this is by running the listener in a detached process. On a typical Linux distribution it might look like this:
nohup python my_listener.py > listener.log &
And saving the resulting process id for later (for stopping the listener via the kill command).
A more complete implementation can be achieved easily by inheriting from the package’s Daemon class and overriding the run() method.

The sample_daemon.py file in the source root folder provides a clear example for achieving this. Using this example, you can run the listener as a daemon with the command python sample_daemon.py start. Similarly, the command python sample_daemon.py stop will stop the process. You’ll most likely need to run the start script using sudo.

Logging

The listener and launcher instances push all their messages to a logger instance, called ‘sqs_listener’. In order to view the messages, the logger needs to be redirected to stdout or to a log file.

For instance:
logger = logging.getLogger('sqs_listener')
logger.setLevel(logging.INFO)

sh = logging.StreamHandler()

formatstr = '[%(asctime)s - %(name)s - %(levelname)s]  %(message)s'
formatter = logging.Formatter(formatstr)

sh.setFormatter(formatter)
logger.addHandler(sh)

Or to a log file:
logger = logging.getLogger('sqs_listener')
logger.setLevel(logging.INFO)

sh = logging.FileHandler('mylog.log')
sh.setLevel(logging.INFO)

formatstr = '[%(asctime)s - %(name)s - %(levelname)s]  %(message)s'
formatter = logging.Formatter(formatstr)

sh.setFormatter(formatter)
logger.addHandler(sh)

Sending messages

In order to send a message, instantiate an SqsLauncher with the name of the queue. By default an exception will be raised if the queue doesn’t exist, but it can be created automatically if the create_queue parameter is set to true. In such a case, there’s also an option to set the newly created queue’s VisibilityTimeout via the third parameter. It is possible to provide a serializer function if custom types need to be sent. This function expects a dict object and should return a string. If not provided, python’s json.dumps is used by default.

After instantiation, use the launch_message() method to send the message. The message body should be a dict, and additional kwargs can be specified as stated in the SQS docs. The method returns the response from SQS.

Launcher Example

from sqs_launcher import SqsLauncher

launcher = SqsLauncher('my-queue')
response = launcher.launch_message({'param1': 'hello', 'param2': 'world'})

Important Notes

  • For both the main queue and the error queue, if the queue doesn’t exist (in the specified region), it will be created at runtime.

  • The error queue receives only two values in the message body: exception_type and error_message. Both are of type str

  • If the function that the listener executes involves connecting to a database, you should explicitly close the connection at the end of the function. Otherwise, you’re likely to get an error like this: OperationalError(2006, 'MySQL server has gone away')

  • Either the queue name or the queue url should be provided. When both are provided the queue url is used and the queue name is ignored.

Contributing

Fork the repo and make a pull request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cloudAgnosticPySqsListener-0.9.3.tar.gz (10.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cloudAgnosticPySqsListener-0.9.3-py3-none-any.whl (11.4 kB view details)

Uploaded Python 3

File details

Details for the file cloudAgnosticPySqsListener-0.9.3.tar.gz.

File metadata

File hashes

Hashes for cloudAgnosticPySqsListener-0.9.3.tar.gz
Algorithm Hash digest
SHA256 2d5b2526b38f2e58c834e7da2a779c5aa1d954864e97744a61de379eac73b6ad
MD5 47de22c22aa083f34c3fa8c1be8310d8
BLAKE2b-256 24345676f4b31755ea30dd20909f47d61eb35e6c002b37845b1756ed9f9e7f2f

See more details on using hashes here.

File details

Details for the file cloudAgnosticPySqsListener-0.9.3-py3-none-any.whl.

File metadata

File hashes

Hashes for cloudAgnosticPySqsListener-0.9.3-py3-none-any.whl
Algorithm Hash digest
SHA256 a805f650b8389950cdb695808926ce1325ded0912997209ffa888b3f8c477182
MD5 d279751cbd9fee980431291efdbc2843
BLAKE2b-256 6038e0407431b93fd44548d624bae009eca190e501dd041077b10b77b44d9e1f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page