Skip to main content

A Django app to run new background tasks from either admin or cron, and inspect task history from admin; based on django-rq

Project description

1   django-task

https://badge.fury.io/py/django-task.svg https://travis-ci.org/morlandi/django-task.svg?branch=master https://codecov.io/gh/morlandi/django-task/branch/master/graph/badge.svg

A Django app to run new background tasks from either admin or cron, and inspect task history from admin; based on django-rq

1.1   Quickstart

  1. Install Django Task:
pip install django-task
  1. Add it to your `INSTALLED_APPS`:
INSTALLED_APPS = (
    ...
    'django_rq',
    'django_task',
    ...
)
  1. Add Django Task’s URL patterns:
urlpatterns = [
    ...
    path('django_task/', include('django_task.urls', namespace='django_task')),
    ...
]
  1. Configure Redis and RQ in settings.py; example:
#REDIS_URL = 'redis://localhost:6379/0'
redis_host = os.environ.get('REDIS_HOST', 'localhost')
redis_port = 6379
REDIS_URL = 'redis://%s:%d/0' % (redis_host, redis_port)

CACHES = {
    'default': {
        'BACKEND': 'redis_cache.RedisCache',
        'LOCATION': REDIS_URL
    },
}

#
# RQ config
#

RQ_PREFIX = "myproject_"
QUEUE_DEFAULT = RQ_PREFIX + 'default'
QUEUE_HIGH = RQ_PREFIX + 'high'
QUEUE_LOW = RQ_PREFIX + 'low'

RQ_QUEUES = {
    QUEUE_DEFAULT: {
        'URL': REDIS_URL,
        #'PASSWORD': 'some-password',
        'DEFAULT_TIMEOUT': 360,
    },
    QUEUE_HIGH: {
        'URL': REDIS_URL,
        'DEFAULT_TIMEOUT': 500,
    },
    QUEUE_LOW: {
        'URL': REDIS_URL,
        #'ASYNC': False,
    },
}
  1. or, if you plan to install many instances of the project on the same server:
#
# RQ config
#

QUEUE_DEFAULT = 'default'
QUEUE_LOW = 'low'
QUEUE_HIGH = 'high'

def rq_queue_name(prefix, name):
    return prefix + '_' + name

def setup_rq_queues(prefix):
    """
    Purposes:
        - setup RQ_PREFIX setting for later inspection
        - setup RQ_QUEUES dictionary with instance-specific queues

    Invoke once from local.py providing an instance specific prefix;
    example:

        RQ_PREFIX = "myproject"
        RQ_QUEUES = setup_rq_queues(RQ_PREFIX)

    Alternatively, provide a fully customized RQ_QUEUES dictionary in local.py
    """
    data = {
        QUEUE_DEFAULT: {
            'URL': REDIS_URL,
            #'PASSWORD': 'some-password',
            #'DEFAULT_TIMEOUT': 5 * 60,
            'DEFAULT_TIMEOUT': -1,  # -1 means infinite
        },
        QUEUE_LOW: {
            'URL': REDIS_URL,
            #'ASYNC': False,
        },
        QUEUE_HIGH: {
            'URL': REDIS_URL,
            'DEFAULT_TIMEOUT': 500,
        },
    }

    queues = {rq_queue_name(prefix, key): value for key, value in data.items()}
    return queues

then, in your “local.py”:

#
# RQ configuration
#

RQ_PREFIX = "project_instance_XYZ"
RQ_QUEUES = setup_rq_queues(RQ_PREFIX)

print('RQ_QUEUES: ')
print(RQ_QUEUES)
  1. Customize django-task specific settings (optional):
RQ_SHOW_ADMIN_LINK = False
DJANGOTASK_LOG_ROOT = os.path.abspath(os.path.join(BASE_DIR, '..', 'protected', 'tasklog'))
DJANGOTASK_ALWAYS_EAGER = False
DJANGOTASK_JOB_TRACE_ENABLED = False
DJANGOTASK_REJECT_IF_NO_WORKER_ACTIVE_FOR_QUEUE = True
  1. Optionally, revoke pending tasks at startapp;

file main/apps.py:

class MainConfig(AppConfig):

    ...

    def ready(self):

        ...
        try:
            from django_task.utils import revoke_pending_tasks
            revoke_pending_tasks()
        except Exception as e:
            print(e)

1.2   Features

Purposes

  • create async tasks either programmatically or from admin
  • monitor async tasks from admin
  • log all tasks in the database for later inspection
  • optionally save task-specific logs in a TextField and/or in a FileField

Details

  1. each specific job is described my a Model derived from models.Task, which is responsible for:
    • selecting the name for the consumer queue among available queues
    • collecting and saving all parameters required by the associated job
    • running the specific job asyncronously
  2. a new job can be run either:
    • creating a Task from the Django admin
    • creating a Task from code, then calling Task.run()
  3. job execution workflow:
    • job execution is triggered by task.run(is_async)
    • job will receive the task.id, and retrieve paramerts from it (task.retrieve_params_as_dict())
    • on start, job will update task status to ‘STARTED’ and save job.id for reference
    • during execution, the job can update the progress indicator
    • on completion, task status is finally updated to either ‘SUCCESS’ or ‘FAILURE’
    • See example.jobs.count_beans for an example

1.4   App settings

DJANGOTASK_LOG_ROOT

Path for log files.

Default: None

Example: os.path.abspath(os.path.join(BASE_DIR, ‘..’, ‘protected’, ‘tasklog’))

DJANGOTASK_ALWAYS_EAGER

When True, all task are execute syncronously (useful for debugging and unit testing).

Default: False

DJANGOTASK_JOB_TRACE_ENABLED

Enables low level tracing in Job.run() - for debugging challenging race conditions

Default: False

DJANGOTASK_REJECT_IF_NO_WORKER_ACTIVE_FOR_QUEUE

Rejects task if not active worker is available for the specific task queue when task.run() is called

Default: False

REDIS_URL

Redis server to connect to

Default: ‘redis://localhost:6379/0’

1.5   Running Tests

Does the code actually work?

source <YOURVIRTUALENV>/bin/activate
(myenv) $ pip install tox
(myenv) $ tox

1.6   Support Job class

Starting from version 0.3.0, some conveniences have been added:

  • The @job decorator for job functions is no more required, as Task.run() now uses queue.enqueue() instead of jobfunc.delay(), and retrieves the queue name directly from the Task itself
  • each Task can set it’s own TASK_TIMEOUT value (expressed in seconds), that when provided overrides the default queue timeout
  • a new Job class has been provided to share suggested common logic before and after jobfunc execution
class Job(object):

    @classmethod
    def run(job_class, task_class, task_id):
        job_trace('job.run() enter')
        task = None
        result = 'SUCCESS'
        failure_reason = ''

        try:

            # this raises a "Could not resolve a Redis connection" exception in sync mode
            #job = get_current_job()
            job = get_current_job(connection=redis.Redis.from_url(REDIS_URL))

            # Retrieve task obj and set as Started
            task = task_class.get_task_from_id(task_id)
            task.set_status(status='STARTED', job_id=job.get_id())

            # Execute job passing by task
            job_class.execute(job, task)

        except Exception as e:
            job_trace('ERROR: %s' % str(e))
            job_trace(traceback.format_exc())

            if task:
                task.log(logging.ERROR, str(e))
                task.log(logging.ERROR, traceback.format_exc())
            result = 'FAILURE'
            failure_reason = str(e)

        finally:
            if task:
                task.set_status(status=result, failure_reason=failure_reason)
            try:
                job_class.on_complete(job, task)
            except Exception as e:
                job_trace('NESTED ERROR: Job.on_completed() raises error "%s"' % str(e))
                job_trace(traceback.format_exc())
        job_trace('job.run() leave')

    @staticmethod
    def on_complete(job, task):
        pass

    @staticmethod
    def execute(job, task):
        pass

so you can either override run() to implement a different logic, or (in most cases) just supply your own execute() method, and optionally override on_complete() to execute cleanup actions after job completion;

example:

class CountBeansJob(Job):

    @staticmethod
    def execute(job, task):
        params = task.retrieve_params_as_dict()
        num_beans = params['num_beans']
        for i in range(0, num_beans):
            time.sleep(0.01)
            task.set_progress((i + 1) * 100 / num_beans, step=10)

    @staticmethod
    def on_complete(job, task):
        print('task "%s" completed with: %s' % (str(task.id), task.status))
        # An more realistic example from a real project ...
        # if task.status != 'SUCCESS' or task.error_counter > 0:
        #    task.alarm = BaseTask.ALARM_STATUS_ALARMED
        #    task.save(update_fields=['alarm', ])

Execute

Run consumer:

python manage.py runserver

Run worker(s):

python manage.py rqworker low high default
python manage.py rqworker low high default
...

Sample Task

from django.db import models
from django.conf import settings
from django_task.models import Task


class SendEmailTask(Task):

    sender = models.CharField(max_length=256, null=False, blank=False)
    recipients = models.TextField(null=False, blank=False,
        help_text='put addresses in separate rows')
    subject = models.CharField(max_length=256, null=False, blank=False)
    message = models.TextField(null=False, blank=True)

    TASK_QUEUE = settings.QUEUE_LOW
    TASK_TIMEOUT = 60
    LOG_TO_FIELD = True
    LOG_TO_FILE = False
    DEFAULT_VERBOSITY = 2

    @staticmethod
    def get_jobfunc():
        from .jobs import SendEmailJob
        return SendEmailJob

You can change the verbosity dynamically by overridding the verbosity property:

When using LOG_TO_FILE = True, you might want to add a cleanup handler to remove the log file when the corresponding record is deleted:

import os
from django.dispatch import receiver

@receiver(models.signals.post_delete, sender=ImportaCantieriTask)
def on_sendemailtask_delete_cleanup(sender, instance, **kwargs):
    """
    Autodelete logfile on Task delete
    """
    logfile = instance._logfile()
    if os.path.isfile(logfile):
        os.remove(logfile)
class SendEmailTask(Task):

    @property
    def verbosity(self):
        #return self.DEFAULT_VERBOSITY
        return 1  # either 0, 1 or 2

Sample Job

from __future__ import print_function
import redis
import logging
import traceback
from django.conf import settings
from .models import SendEmailTask
from django_task.job import Job


class SendEmailJob(Job):

    @staticmethod
    def execute(job, task):
        params = task.retrieve_params_as_dict()
        recipient_list = params['recipients'].split()
        sender = params['sender'].strip()
        subject = params['subject'].strip()
        message = params['message']
        from django.core.mail import send_mail
        send_mail(subject, message, sender, recipient_list)

Sample management command

from django_task.task_command import TaskCommand
from django.contrib.auth import get_user_model

class Command(TaskCommand):

    def add_arguments(self, parser):
        super(Command, self).add_arguments(parser)
        parser.add_argument('sender')
        parser.add_argument('subject')
        parser.add_argument('message')
        parser.add_argument('-r', '--recipients', nargs='*')
        parser.add_argument('-u', '--user', type=str, help="Specify username for 'created_by' task field")

    def handle(self, *args, **options):
        from tasks.models import SendEmailTask

        # transform the list of recipents into text
        # (one line for each recipient)
        options['recipients'] = '\n'.join(options['recipients']) if options['recipients'] is not None else ''

        # format multiline message
        options['message'] = options['message'].replace('\\n', '\n')

        if 'user' in options:
            created_by = get_user_model().objects.get(username=options['user'])
        else:
            created_by = None

        self.run_task(SendEmailTask, created_by=created_by, **options)

Deferred Task retrieval to avoid job vs. Task race condition

An helper Task.get_task_from_id() classmethod is supplied to retrieve Task object from task_id safely.

Task queues create a new type of race condition. Why ? Because message queues are fast ! How fast ? Faster than databases.

See:

https://speakerdeck.com/siloraptor/django-tasty-salad-dos-and-donts-using-celery

A similar generic helper is available for Job-derived needs:

django_task.utils.get_model_from_id(model_cls, id, timeout=1000, retry_count=10)

Howto separate jobs for different instances on the same machine

To sepatare jobs for different instances on the same machine (or more precisely for the same redis connection), override queues names for each instance;

for example:

# file "settings.py"

REDIS_URL = 'redis://localhost:6379/0'
...

#
# RQ config
#

RQ_PREFIX = "myproject_"
QUEUE_DEFAULT = RQ_PREFIX + 'default'
QUEUE_HIGH = RQ_PREFIX + 'high'
QUEUE_LOW = RQ_PREFIX + 'low'

RQ_QUEUES = {
    QUEUE_DEFAULT: {
        'URL': REDIS_URL,
        #'PASSWORD': 'some-password',
        'DEFAULT_TIMEOUT': 360,
    },
    QUEUE_HIGH: {
        'URL': REDIS_URL,
        'DEFAULT_TIMEOUT': 500,
    },
    QUEUE_LOW: {
        'URL': REDIS_URL,
        #'ASYNC': False,
    },
}

RQ_SHOW_ADMIN_LINK = False
DJANGOTASK_LOG_ROOT = os.path.abspath(os.path.join(BASE_DIR, '..', 'protected', 'tasklog'))
DJANGOTASK_ALWAYS_EAGER = False
DJANGOTASK_JOB_TRACE_ENABLED = False
DJANGOTASK_REJECT_IF_NO_WORKER_ACTIVE_FOR_QUEUE = True

then run worker as follows:

python manage.py rqworker myproject_default

Howto schedule jobs with cron

Call management command ‘count_beans’, which in turn executes the required job.

For example:

SHELL=/bin/bash
PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin

0 * * * *  {{username}}    timeout 55m {{django.pythonpath}}/python {{django.website_home}}/manage.py count_beans 1000 >> {{django.logto}}/cron.log 2>&1

A base class TaskCommand has been provided to simplify the creation of any specific task-related management commad;

a derived management command is only responsible for:

  • defining suitable command-line parameters
  • selecting the specific Task class and job function

for example:

from django_task.task_command import TaskCommand


class Command(TaskCommand):

    def add_arguments(self, parser):
        super(Command, self).add_arguments(parser)
        parser.add_argument('num_beans', type=int)

    def handle(self, *args, **options):
        from tasks.models import CountBeansTask
        self.run_task(CountBeansTask, **options)

1.7   Javascript helpers

A few utility views have been supplied for interacting with tasks from javascript.

1.7.1   tasks_info_api

Retrieve informations about a list of existing tasks

Sample usage:

var tasks = [{
    id: 'c50bf040-a886-4aed-bf41-4ae794db0941',
    model: 'tasks.devicetesttask'
}, {
    id: 'e567c651-c8d5-4dc7-9cbf-860988f55022',
    model: 'tasks.devicetesttask'
}];

$.ajax({
    url: '/django_task/info/',
    data: JSON.stringify(tasks),
    cache: false,
    type: 'post',
    dataType: 'json',
    headers: {'X-CSRFToken': getCookie('csrftoken')}
}).done(function(data) {
    console.log('data: %o', data);
});

Result:

[
  {
    "id": "c50bf040-a886-4aed-bf41-4ae794db0941",
    "created_on": "2018-10-11T17:45:14.399491+00:00",
    "created_on_display": "10/11/2018 19:45:14",
    "created_by": "4f943f0b-f5a3-4fd8-bb2e-451d2be107e2",
    "started_on": null,
    "started_on_display": "",
    "completed_on": null,
    "completed_on_display": "",
    "job_id": "",
    "status": "PENDING",
    "status_display": "<div class=\"task_status\" data-task-model=\"tasks.devicetesttask\" data-task-id=\"c50bf040-a886-4aed-bf41-4ae794db0941\" data-task-status=\"PENDING\" data-task-complete=\"0\">PENDING</div>",
    "log_link_display": "",
    "failure_reason": "",
    "progress": null,
    "progress_display": "-",
    "completed": false,
    "duration": null,
    "duration_display": "",
    "extra_fields": {
    }
  },
  ...
]

1.7.2   task_add_api

Create and run a new task based on specified parameters

Expected parameters:

  • ‘task-model’ = “<app_name>.<model_name>”
  • … task parameters …

Returns the id of the new task.

Sample usage:

function exportAcquisition(object_id) {
    if (confirm('Do you want to export data ?')) {

        var url = '/django_task/add/';
        var data = JSON.stringify({
            'task-model': 'tasks.exportdatatask',
            'source': 'backend.acquisition',
            'object_id': object_id
        });

        $.ajax({
            type: 'POST',
            url: url,
            data: data,
            cache: false,
            crossDomain: true,
            dataType: 'json',
            headers: {'X-CSRFToken': getCookie('csrftoken')}
        }).done(function(data) {
            console.log('data: %o', data);
            alert('New task created: "' + data.task_id + '"');
        }).fail(function(jqXHR, textStatus, errorThrown) {
            console.log('ERROR: ' + jqXHR.responseText);
            alert(errorThrown);
        });
    }
    return;
}

1.7.3   task_run_api

Schedule execution of specified task.

Returns job.id or throws error (400).

Parameters:

  • app_label
  • model_name
  • pk
  • is_async (0 or 1, default=1)

Sample usage:

var task_id = 'c50bf040-a886-4aed-bf41-4ae794db0941';

$.ajax({
    url: sprintf('/django_task/tasks/devicetesttask/%s/run/', task_id),
    cache: false,
    type: 'get'
}).done(function(data) {
    console.log('data: %o', data);
}).fail(function(jqXHR, textStatus, errorThrown) {
    display_server_error(jqXHR.responseText);
});

1.8   Updating the tasks listing dynamically in the frontend

The list of Tasks in the admin changelist_view is automatically updated to refresh the progess and status of each running Task.

You can obtain the same result in the frontend by calling the DjangoTask.update_tasks() javascript helper, provided you’re listing the tasks in an HTML table with a similar layout.

The simplest way to do it is to use the render_task_column_names_as_table_row and render_task_as_table_row template tags.

Example:

{% load i18n django_task_tags %}

{% if not export_data_tasks %}
    <div>{% trans 'No recent jobs available' %}</div>
{% else %}
    <table id="export_data_tasks" class="table table-striped">
        {% with excluded='created_by,created_on,job_id,log_text,mode' %}
        <thead>
            <tr>
                {{ export_data_tasks.0|render_task_column_names_as_table_row:excluded }}
            </tr>
        </thead>
        <tbody>
            {% for task in export_data_tasks %}
            <tr>
                {{ task|render_task_as_table_row:excluded }}
            </tr>
            {% endfor %}
        </tbody>
    </table>
    {% endwith %}
{% endif %}


{% block extrajs %}
    {{ block.super }}
    <script type="text/javascript" src="{% static 'js/django_task.js' %}"></script>
    <script>
        $(document).ready(function() {
            DjangoTask.update_tasks(1000, '#export_data_tasks');
        });
    </script>
{% endblock extrajs %}

For each fieldname included in the table rows, render_task_as_table_row will check if a FIELDNAME_display() method is available in the Task model, and in case will use it for rendering the field value; otherwise, the field value will be simply converted into a string.

If the specific derived Task model defines some additional fields (unknown to the base Task model) which need to be updated regularly by DjangoTask.update_tasks(), include them as “extra_fields” as follows:

def as_dict(self):
    data = super(ExportDataTask, self).as_dict()
    data['extra_fields'] = {
        'result_display': mark_safe(self.result_display())
    }
    return data
example/etc/screenshot_003.png

1.9   Example Project for django-task

As example project is provided as a convenience feature to allow potential users to try the app straight from the app repo without having to create a django project.

Please follow the instructions detailed in file example/README.rst.

2   History

2.1   1.5.0

  • Support for updating the tasks listing dynamically in the frontend
  • Example provided for task_add_api() javascript helper
  • POSSIBLY INCOMPATIBLE CHANGE: duration and duration_display are now methods rather then properties
  • it traslation for UI messages

2.2   1.4.7

  • Added optional “created_by” parameter to TaskCommand utility

2.3   1.4.6

  • replace namespace “django.jQuery” with more generic “jQuery” in js helpers
  • update example project
  • unit tests added to “tasks” app in example project

2.4   1.4.5

  • Quickstart revised in README

2.5   1.4.4

  • Task.get_logger() is now publicly available

2.6   1.4.3

  • restore compatibility with Django 1.11; upgrade rq and django-rq requirements

2.7   1.4.2

  • tasks_info_api() optimized to use a single query

2.8   1.4.1

  • Cleanup: remove redundant REJECTED status

2.9   1.4.0

  • Update requirements (Django >= 2.0, django-rq>=2.0)

2.10   1.3.10

  • Use exceptions.TaskError class when raising specific exceptions

2.11   v1.3.9

  • removed forgotten pdb.set_trace() in revoke_pending_tasks()

2.14   v1.3.6

  • log queue name

2.15   v1.3.5

  • Readme updated

2.16   v1.3.4

  • javascript helper views
  • fix Task.set_progress(0)

2.17   v1.3.3

  • make sure fields are unique in TaskAdmin fieldsets

2.18   v1.3.1

  • unit tests verified with Python 2.7/3.6/3.7 and Django 1.10/2.0

2.19   v1.3.0

  • cleanup
  • classify as production/stable

2.20   v1.2.5

  • Tested with Django 2.0 and Python 3.7
  • Rename async to is_async to support Python 3.7
  • DJANGOTASK_REJECT_IF_NO_WORKER_ACTIVE_FOR_QUEUE app setting added
  • example cleanup

2.21   v1.2.4

  • API to create and run task via ajax

2.22   v1.2.3

  • TaskAdmin: postpone autorun to response_add() to have M2M task parameters (if any) ready
  • Task.clone() supports M2M parameters

2.23   v1.2.2

  • property to change verbosity dinamically

2.24   v1.2.1

  • util revoke_pending_tasks() added

2.25   v1.2.0

  • DJANGOTASK_JOB_TRACE_ENABLED setting added to enable low level tracing in Job.run()
  • Added missing import in utils.py

2.26   v1.1.3

  • cleanup: remove get_child() method being Task an abstract class
  • fix: skip Task model (being abstract) in dump_all_tasks and delete_all_tasks management commands
  • generic get_model_from_id() helper
  • Job.on_complete() callback

2.27   v1.1.2

  • provide list of pending and completed task status

2.28   v1.1.0

  • INCOMPATIBLE CHANGE: Make model Task abstract for better listing performances
  • redundant migrations removed
  • convert request.body to string for Python3
  • pretty print task params in log when task completes

2.29   v0.3.8

  • return verbose name as description

2.30   v0.3.7

  • description added to Task model

2.31   v0.3.6

  • More fixes

2.32   v0.3.5

  • log to field fix

2.33   v0.3.4

  • log quickview + view

2.34   v0.3.3

  • Optionally log to either file or text field
  • Management commands to dump and delete all tasks

2.35   v0.3.2

  • search by task.id and task.job_id

2.36   v0.3.1

  • Keep track of task mode (sync or async)

2.37   v0.3.0

  • new class Job provided to share task-related logic among job funcs

2.38   v0.2.0

  • fixes for django 2.x

2.39   v0.1.15

  • hack for prepopulated_fields

2.41   v0.1.13

  • minor fixes

2.42   v0.1.12

  • Deferred Task retrieval to avoid job vs. Task race condition
  • Improved Readme

2.43   v0.1.11

  • superuser can view all tasks, while other users have access to their own tasks only
  • js fix

2.44   v0.1.10

  • prevent task.failure_reason overflow

2.45   v0.1.9

  • app settings

2.46   v0.1.8

  • always start job from task.run() to prevent any possible race condition
  • task.run(async) can now accept async=False

2.47   v0.1.7

  • javascript: use POST to retrieve tasks state for UI update to prevent URL length limit exceed

2.48   v0.1.6

  • Improved ui for TaskAdmin
  • Fix unicode literals for Python3

2.49   v0.1.5

  • fixes for Django 1.10
  • send_email management command example added

2.50   v0.1.4

  • Fix OneToOneRel import for Django < 1.9

2.51   v0.1.3

  • Polymorphic behaviour or Task.get_child() restored

2.52   v0.1.2

  • TaskCommand.run_task() renamed as TaskCommand.run_job()
  • New TaskCommand.run_task() creates a Task, then runs it; this guarantees that something is traced even when background job will fail

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for django-task, version 1.5.0
Filename, size File type Python version Upload date Hashes
Filename, size django_task-1.5.0-py2.py3-none-any.whl (40.5 kB) File type Wheel Python version py2.py3 Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page