Skip to main content

A simple task manager for AWS SQS

Project description

PyPI version License: MIT CircleCI

Django EB SQS - Background Tasks for Amazon SQS

django-eb-sqs is a simple task manager for AWS SQS. It uses SQS and the boto3 library.

Installation

Install the module with pip install git+git://github.com/cuda-networks/django-eb-sqs.git or add it to your requirements.txt.

Don't forget to add django-eb-sqs app to your Django INSTALLED_APPS settings:

INSTALLED_APPS = (
    ...,
    'eb_sqs',
)

Usage

Creating Tasks

Adding a task to a queue is simple.

from eb_sqs.decorators import task

@task(queue_name='test')
def echo(message):
    print(message)

echo.delay(message='Hello World!')

NOTE: This assumes that you have your AWS keys in the appropriate environment variables, or are using IAM roles. Consult the boto3 documentation for further info.

If you don't pass a queue name, the EB_SQS_DEFAULT_QUEUE setting is used. If not set, the queue name is eb-sqs-default.

Additionally the task decorator supports max_retries (default 0) and use_pickle (default False) attributes for advanced control task execution.

You can also delay the execution of a task by specifying the delay time in seconds.

echo.delay(message='Hello World!', delay=60)

During development it is sometimes useful to execute a task immediately without using SQS. This is possible with the execute_inline argument.

echo.delay(message='Hello World!', execute_inline=True)

NOTE: delay is not applied when execute_inline is set to True.

Failed tasks can be retried by using the retry method. See the following example:

from eb_sqs.decorators import task

@task(queue_name='test', max_retries=5)
def upload_file(message):
    print('# of retries: {}'.format(upload_file.retry_num))
    try:
        # upload ...
    except ConnectionException:
        upload_file.retry()

The retry call supports the delay and execute_inline arguments in order to delay the retry or execute it inline. If the retry shall not be counted for the max retry limit set count_retries to false. Use 'retry_num' to get the number of retries for the current task.

NOTE: retry() throws a MaxRetriesReachedException exception if the maximum number of retries is reached.

Executing Tasks

In order to execute tasks, use the Django command process_queue. This command can work with one or more queues, reading from the queues infinitely and executing tasks as they come-in.

python manage.py process_queue --queues <comma-delimited list of queue names>

You can either use full queue names, or queue prefix using prefix:*my_example_prefix* notation.

Examples:

python manage.py process_queue --queues queue1,queue2 # process queue1 and queue2
python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-'

Use the signals MESSAGES_RECEIVED, MESSAGES_PROCESSED, MESSAGES_DELETED of the WorkerService to get informed about the current SQS batch being processed by the management command.

Auto Tasks

This is a helper tool for the case you wish to define one of your class method as a task, and make it seamless to all callers. This makes the code much simpler, and allows using classes to invoke your method directly without considering whether it's invoked async or not.

This is how you would define your class:

from eb_sqs.auto_tasks.service import AutoTaskService

class MyService:
    def __init__(self, p1=default1, ..., pN=defaultN, auto_task_service=None):
        self._auto_task_service = auto_task_service or AutoTaskService()

        self._auto_task_service.register_task(self.my_task_method)

    def my_task_method(self, *args, **kwargs):
        ...

Notice the following:

  1. Your class needs to have defaults for all parameters in the c'tor
  2. The c'tor must have a parameter named auto_task_service
  3. The method shouldn't have any return value (as it's invoked async)

In case you want your method to retry certain cases, you need to raise RetryableTaskException. You can provide on optional delay time for the retry, set count_retries=False in case you don't want to limit retries, or use max_retries_func to specify a function which will be invoked when the defined maximum number of retries is exhausted.

Settings

The following settings can be used to fine tune django-eb-sqs. Copy them into your Django settings.py file.

  • EB_AWS_REGION (us-east-1): The AWS region to use when working with SQS.
  • EB_SQS_MAX_NUMBER_OF_MESSAGES (10): The maximum number of messages to read in a single call from SQS (<= 10).
  • EB_SQS_WAIT_TIME_S (2): The time to wait (seconds) when receiving messages from SQS.
  • NO_QUEUES_WAIT_TIME_S (5): The time a workers waits if there are no SQS queues available to process.
  • EB_SQS_AUTO_ADD_QUEUE (False): If queues should be added automatically to AWS if they don't exist.
  • EB_SQS_QUEUE_MESSAGE_RETENTION (1209600): The value (in seconds) to be passed to MessageRetentionPeriod parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True).
  • EB_SQS_QUEUE_VISIBILITY_TIMEOUT (300): The value (in seconds) to be passed to VisibilityTimeout parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True).
  • EB_SQS_DEAD_LETTER_MODE (False): Enable if this worker is handling the SQS dead letter queue. Tasks won't be executed but group callback is.
  • EB_SQS_DEFAULT_DELAY (0): Default task delay time in seconds.
  • EB_SQS_DEFAULT_MAX_RETRIES (0): Default retry limit for all tasks.
  • EB_SQS_DEFAULT_COUNT_RETRIES (True): Count retry calls. Needed if max retries check shall be executed.
  • EB_SQS_DEFAULT_QUEUE (eb-sqs-default): Default queue name if none is specified when creating a task.
  • EB_SQS_EXECUTE_INLINE (False): Execute tasks immediately without using SQS. Useful during development. Global setting True will override setting it on a task level.
  • EB_SQS_FORCE_SERIALIZATION (False): Forces serialization of tasks when executed inline. This setting is helpful during development to see if all arguments are serialized and deserialized properly.
  • EB_SQS_QUEUE_PREFIX (``): Prefix to use for the queues. The prefix is added to the queue name.
  • EB_SQS_USE_PICKLE (False): Enable to use pickle to serialize task parameters. Uses json as default.
  • EB_SQS_AWS_MAX_RETRIES (30): Default retry limit on a boto3 call to AWS SQS.
  • EB_SQS_REFRESH_PREFIX_QUEUES_S (10): Minimal number of seconds to wait between refreshing queue list, in case prefix is used

Development

Make sure to install the development dependencies from development.txt.

Tests

The build in tests can be executed with the Django test runner.

python -m django test --settings=eb_sqs.test_settings

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django-eb-sqs-1.44.tar.gz (15.6 kB view details)

Uploaded Source

Built Distribution

django_eb_sqs-1.44-py3-none-any.whl (24.9 kB view details)

Uploaded Python 3

File details

Details for the file django-eb-sqs-1.44.tar.gz.

File metadata

  • Download URL: django-eb-sqs-1.44.tar.gz
  • Upload date:
  • Size: 15.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/47.3.1 requests-toolbelt/0.9.1 tqdm/4.47.0 CPython/3.7.4

File hashes

Hashes for django-eb-sqs-1.44.tar.gz
Algorithm Hash digest
SHA256 d0a391b420e5357062b06bb8aa3b668452e37d64cb87e9db82a0c10a572c0415
MD5 810efadc1ada1b4fbdc6a7e151fc5ed1
BLAKE2b-256 90b698d599f0b17d5c6d7fdcf442f183d89b9456caf58289920ad6931f8e9711

See more details on using hashes here.

File details

Details for the file django_eb_sqs-1.44-py3-none-any.whl.

File metadata

  • Download URL: django_eb_sqs-1.44-py3-none-any.whl
  • Upload date:
  • Size: 24.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/47.3.1 requests-toolbelt/0.9.1 tqdm/4.47.0 CPython/3.7.4

File hashes

Hashes for django_eb_sqs-1.44-py3-none-any.whl
Algorithm Hash digest
SHA256 239c045825e727077075db4690eeb607ae3f8fac8f730ed5f0bd3f4c9ff601b4
MD5 82c8cc90ca46f75b0e4689e6e5667431
BLAKE2b-256 192ed3abc557b160e17af786813f22c1a8b4bd6c23cd0eab0c32ed0a338c6e0a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page