Skip to main content

Django application to schedule and run functions on an AWS SQS queue.

Project description

beanstalk-dispatch

PyPI version PyPI Github Actions

beanstalk-dispatch is a Django application that runs functions that have been scheduled to run an AWS SQS queue and executes them on Elastic Beanstalk Worker machines that are listening to that queue.

This application was originally written by @marcua for @b12io's open source application orchestra.

The library supports Django 4 to Django 5.2 across Python versions 3.10 to 3.12. If you would like to see a feature or find a bug, please let me know by opening an issue or pull request.

Getting started in 5 minutes

To install:

pip install beanstalk-dispatch
  • create an Elastic Beanstalk environment for an application that has the following two parameters in settings.py:
     BEANSTALK_DISPATCH_SQS_KEY = 'your AWS key for accessing SQS'
     BEANSTALK_DISPATCH_SQS_SECRET = 'your AWS secret for accessing SQS'
  • Add beanstalk_dispatch to settings.py's INSTALLED_APPS
INSTALLED_APPS = (
    # ...other installed applications...
    'beanstalk_dispatch',
)
  • Add url(r'^beanstalk_dispatch/', include('beanstalk_dispatch.urls')), to your main urls.py

  • Add /beanstalk_dispatch/dispatcher as the HTTP endpoint or your beanstalk worker configuration in the AWS console.

  • Add a dispatch table. The dispatcher works by creating an HTTP endpoint that a local SQS/Beanstalk daemon POSTs requests to. That endpoint consults a BEANSTALK_DISPATCH_TABLE, which maps function names onto functions to run. Here's an example:

      if os.environ.get('BEANSTALK_WORKER') == 'True':
        BEANSTALK_DISPATCH_TABLE = {
            'a_function_to_dispatch': ('some_package.beanstalk_tasks.'
                                      'the_name_of_the_function_in_the_module')
        }

The first line is a check we have that ensures this type of machine should be a beanstalk worker. We set a BEANSTALK_WORKER environment variable to 'True' in the environment's configuration only on our worker machines. This avoids other environments (e.g., our web servers) from serving as open proxies for running arbitrary code.

The second line is the dispatch table. It maps a path to the function to be executed.

Scheduling a function to run

The beanstalk_dispatch.client.schedule_function schedules a function to run on a given SQS queue. The function name you pass it must be a key in the BEANSTALK_DISPATCH_TABLE, and the queue_name you pass it must be a queue for which a beanstalk worker is configured.

from beanstalk_dispatch.client import schedule_function

schedule_function('a-queue', 'a_function_to_dispatch',
    '1', '2', kwarg1=1, kwarg2=2)

SafeTasks

By default, every function run by beanstalk_dispatch is wrapped in a SafeTask class that sets a @timeout decorator on the function and catches any exceptions for logging. If you would like to customize the behavior of the SafeTask, create a subclass and reference this object in BEANSTALK_DISPATCH_TABLE.

The following parameters/functions are configurable on a SafeTask

timeout_timedelta: maximum number of seconds task can run, defaults to 2 minutes. verbose: boolean specifying if failures are logged, defaults to False. run: abstract method to fill in with task work. on_error: a function that runs if the task fails for any reason. on_success: a function that runs after the task completes successfully. on_completion: a function that runs after each task (after on_error or on_success).

For example:

# beanstalk_tasks.py
from datetime import timedelta

from beanstalk_dispatch.client import schedule_function
from beanstalk_dispatch.safe_task import SafeTask

class MySafeTask(SafeTask):

    timeout_timedelta = timedelta(seconds=1000)
    verbose = True

    def run(self, *args, **kwargs):
        # Run the task
        print('Running task')

    def on_error(self, e, *args, **kwargs):
        print('There was an error {}'.format(e))

    def on_success(self, *args, **kwargs):
        print('Success!')

    def on_completion(self, *args, **kwargs):
        print('Task completed')

schedule_function('a-queue', 'mysafetask',
    '1', '2', kwarg1=1, kwarg2=2)
# settings.py
  if os.environ.get('BEANSTALK_WORKER') == 'True':
    BEANSTALK_DISPATCH_TABLE = {
        'mysafetask': 'beanstalk_tasks.MySafeTask'
    }

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

beanstalk_dispatch-0.1.5.tar.gz (14.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

beanstalk_dispatch-0.1.5-py2.py3-none-any.whl (15.4 kB view details)

Uploaded Python 2Python 3

File details

Details for the file beanstalk_dispatch-0.1.5.tar.gz.

File metadata

  • Download URL: beanstalk_dispatch-0.1.5.tar.gz
  • Upload date:
  • Size: 14.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for beanstalk_dispatch-0.1.5.tar.gz
Algorithm Hash digest
SHA256 f8a400f886a9ee1f1ad8cce21643b17b032eff4d9c7e7f49d6992d442e697791
MD5 bfee98aa26e2400c48675265e881ddaf
BLAKE2b-256 f16b805fe5efabd4601f7d329709238d0c409ef837090cae31219c0f55611859

See more details on using hashes here.

File details

Details for the file beanstalk_dispatch-0.1.5-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for beanstalk_dispatch-0.1.5-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 8c23149dccb812dc6e8b89e6ae91e7fc0b9193a11e65b0bdd0cf3daf4d64182c
MD5 bba9d5cd048789425e15096114de6d23
BLAKE2b-256 99a4e52e217722c6e3b42c035f8b031c0800dd1b17f8fec4105ad91a87f9e42f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page