Skip to main content

A Django app to run new background tasks from either admin or cron, and inspect task history from admin; based on django-rq

Project description

===========
django-task
===========

.. image:: https://badge.fury.io/py/django-task.svg
:target: https://badge.fury.io/py/django-task

.. image:: https://travis-ci.org/morlandi/django-task.svg?branch=master
:target: https://travis-ci.org/morlandi/django-task

.. image:: https://codecov.io/gh/morlandi/django-task/branch/master/graph/badge.svg
:target: https://codecov.io/gh/morlandi/django-task

A Django app to run new background tasks from either admin or cron, and inspect task history from admin; based on django-rq

Quickstart
----------

Install Django Task::

pip install django-task

Add it to your `INSTALLED_APPS`:

.. code-block:: python

INSTALLED_APPS = (
...
'django_rq',
'django_task',
...
)

Add Django Task's URL patterns:

.. code-block:: python

urlpatterns = [
...
url(r'^django_task/', include('django_task.urls', namespace='django_task')),
...
]

Features
--------

**Purposes**

- create async tasks either programmatically or from admin
- monitor async tasks from admin
- log all tasks in the database for later inspection
- optionally save task-specific logs in a TextField and/or in a FileField

**Details**

1. each specific job is described my a Model derived from models.Task, which
is responsible for:

- selecting the name for the consumer queue among available queues
- collecting and saving all parameters required by the associated job
- running the specific job asyncronously

2. a new job can be run either:

- creating a Task from the Django admin
- creating a Task from code, then calling Task.run()

3. job execution workflow:

- job execution is triggered by task.run(is_async)
- job will receive the task.id, and retrieve paramerts from it (task.retrieve_params_as_dict())
- on start, job will update task status to 'STARTED' and save job.id for reference
- during execution, the job can update the progress indicator
- on completion, task status is finally updated to either 'SUCCESS' or 'FAILURE'
- See example.jobs.count_beans for an example


Screenshots
-----------

.. image:: example/etc/screenshot_001.png

.. image:: example/etc/screenshot_002.png


App settings
------------

DJANGOTASK_LOG_ROOT
Path for log files.

Default: None

Example: os.path.abspath(os.path.join(BASE_DIR, '..', 'protected', 'tasklog'))

DJANGOTASK_ALWAYS_EAGER
When True, all task are execute syncronously (useful for debugging and unit testing).

Default: False

DJANGOTASK_JOB_TRACE_ENABLED
Enables low level tracing in Job.run() - for debugging challenging race conditions

Default: False

DJANGOTASK_REJECT_IF_NO_WORKER_ACTIVE_FOR_QUEUE
Rejects task if not active worker is available for the specific task queue
when task.run() is called

Default: False

REDIS_URL
Redis server to connect to

Default: 'redis://localhost:6379/0'


Running Tests
-------------

Does the code actually work?

::

source <YOURVIRTUALENV>/bin/activate
(myenv) $ pip install tox
(myenv) $ tox


Support Job class
-----------------

Starting from version 0.3.0, some conveniences have been added:

- The @job decorator for job functions is no more required, as Task.run() now
uses queue.enqueue() instead of jobfunc.delay(), and retrieves the queue
name directly from the Task itself

- each Task can set it's own TASK_TIMEOUT value (expressed in seconds),
that when provided overrides the default queue timeout

- a new Job class has been provided to share suggested common logic before and
after jobfunc execution

.. code :: python

class Job(object):

@classmethod
def run(job_class, task_class, task_id):
job_trace('job.run() enter')
task = None
result = 'SUCCESS'
failure_reason = ''

try:

# this raises a "Could not resolve a Redis connection" exception in sync mode
#job = get_current_job()
job = get_current_job(connection=redis.Redis.from_url(REDIS_URL))

# Retrieve task obj and set as Started
task = task_class.get_task_from_id(task_id)
task.set_status(status='STARTED', job_id=job.get_id())

# Execute job passing by task
job_class.execute(job, task)

except Exception as e:
job_trace('ERROR: %s' % str(e))
job_trace(traceback.format_exc())

if task:
task.log(logging.ERROR, str(e))
task.log(logging.ERROR, traceback.format_exc())
result = 'FAILURE'
failure_reason = str(e)

finally:
if task:
task.set_status(status=result, failure_reason=failure_reason)
try:
job_class.on_complete(job, task)
except Exception as e:
job_trace('NESTED ERROR: Job.on_completed() raises error "%s"' % str(e))
job_trace(traceback.format_exc())
job_trace('job.run() leave')

@staticmethod
def on_complete(job, task):
pass

@staticmethod
def execute(job, task):
pass

so you can either override `run()` to implement a different logic,
or (in most cases) just supply your own `execute()` method, and optionally
override `on_complete()` to execute cleanup actions after job completion;

example:

.. code :: python

class CountBeansJob(Job):

@staticmethod
def execute(job, task):
params = task.retrieve_params_as_dict()
num_beans = params['num_beans']
for i in range(0, num_beans):
time.sleep(0.01)
task.set_progress((i + 1) * 100 / num_beans, step=10)

@staticmethod
def on_complete(job, task):
print('task "%s" completed with: %s' % (str(task.id), task.status))
# An more realistic example from a real project ...
# if task.status != 'SUCCESS' or task.error_counter > 0:
# task.alarm = BaseTask.ALARM_STATUS_ALARMED
# task.save(update_fields=['alarm', ])


**Execute**

Run consumer:

.. code:: bash

python manage.py runserver


Run worker(s):

.. code:: bash

python manage.py rqworker low high default
python manage.py rqworker low high default
...

**Sample Task**

.. code:: python

from django.db import models
from django.conf import settings
from django_task.models import Task


class SendEmailTask(Task):

sender = models.CharField(max_length=256, null=False, blank=False)
recipients = models.TextField(null=False, blank=False,
help_text='put addresses in separate rows')
subject = models.CharField(max_length=256, null=False, blank=False)
message = models.TextField(null=False, blank=True)

TASK_QUEUE = settings.QUEUE_LOW
TASK_TIMEOUT = 60
LOG_TO_FIELD = True
LOG_TO_FILE = False
DEFAULT_VERBOSITY = 2

@staticmethod
def get_jobfunc():
from .jobs import SendEmailJob
return SendEmailJob

You can change the `verbosity` dinamically by overridding the verbosity property:

.. code:: python

class SendEmailTask(Task):

@property
def verbosity(self):
#return self.DEFAULT_VERBOSITY
return 1 # either 0, 1 or 2

**Sample Job**

.. code:: python

from __future__ import print_function
import redis
import logging
import traceback
from django.conf import settings
from .models import SendEmailTask
from django_task.job import Job


class SendEmailJob(Job):

@staticmethod
def execute(job, task):
params = task.retrieve_params_as_dict()
recipient_list = params['recipients'].split()
sender = params['sender'].strip()
subject = params['subject'].strip()
message = params['message']
from django.core.mail import send_mail
send_mail(subject, message, sender, recipient_list)

**Sample management command**

.. code:: python

from django_task.task_command import TaskCommand

class Command(TaskCommand):

def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument('sender')
parser.add_argument('subject')
parser.add_argument('message')
parser.add_argument('-r', '--recipients', nargs='*')

def handle(self, *args, **options):
from tasks.models import SendEmailTask

# transform the list of recipents into text
# (one line for each recipient)
options['recipients'] = '\n'.join(options['recipients']) if options['recipients'] is not None else ''

# format multiline message
options['message'] = options['message'].replace('\\n', '\n')

self.run_task(SendEmailTask, **options)

**Deferred Task retrieval to avoid job vs. Task race condition**

An helper Task.get_task_from_id() classmethod is supplied to retrieve Task object
from task_id safely.

*Task queues create a new type of race condition. Why ?
Because message queues are fast !
How fast ?
Faster than databases.*

See:

https://speakerdeck.com/siloraptor/django-tasty-salad-dos-and-donts-using-celery

A similar generic helper is available for Job-derived needs::

django_task.utils.get_model_from_id(model_cls, id, timeout=1000, retry_count=10)


**Howto separate jobs for different instances on the same machine**

To sepatare jobs for different instances on the same machine (or more precisely
for the same redis connection), override queues names for each instance;

for example:

.. code:: python

# file "settings.py"

REDIS_URL = 'redis://localhost:6379/0'
...

#
# RQ config
#

RQ_PREFIX = "myproject_"
QUEUE_DEFAULT = RQ_PREFIX + 'default'
QUEUE_HIGH = RQ_PREFIX + 'high'
QUEUE_LOW = RQ_PREFIX + 'low'

RQ_QUEUES = {
QUEUE_DEFAULT: {
'URL': REDIS_URL,
#'PASSWORD': 'some-password',
'DEFAULT_TIMEOUT': 360,
},
QUEUE_HIGH: {
'URL': REDIS_URL,
'DEFAULT_TIMEOUT': 500,
},
QUEUE_LOW: {
'URL': REDIS_URL,
#'ASYNC': False,
},
}

RQ_SHOW_ADMIN_LINK = False
DJANGOTASK_LOG_ROOT = os.path.abspath(os.path.join(BASE_DIR, '..', 'protected', 'tasklog'))
DJANGOTASK_ALWAYS_EAGER = False

then run worker as follows:

.. code:: python

python manage.py rqworker myproject_default

**Howto schedule jobs with cron**

Call management command 'count_beans', which in turn executes the required job.

For example::

SHELL=/bin/bash
PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin

0 * * * * {{username}} timeout 55m {{django.pythonpath}}/python {{django.website_home}}/manage.py count_beans 1000 >> {{django.logto}}/cron.log 2>&1

A base class TaskCommand has been provided to simplify the creation of any specific
task-related management commad;

a derived management command is only responsible for:

- defining suitable command-line parameters
- selecting the specific Task class and job function

for example:

.. code:: python

from django_task.task_command import TaskCommand


class Command(TaskCommand):

def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument('num_beans', type=int)

def handle(self, *args, **options):
from tasks.models import CountBeansTask
self.run_task(CountBeansTask, **options)


Javascript helpers
------------------

A few utility views have been supplied for interacting with tasks from javascript.

tasks_info_api
..............

Retrieve informations about a list of existing tasks

Sample usage:

.. code:: javascript

var tasks = [{
id: 'c50bf040-a886-4aed-bf41-4ae794db0941',
model: 'tasks.devicetesttask'
}, {
id: 'e567c651-c8d5-4dc7-9cbf-860988f55022',
model: 'tasks.devicetesttask'
}];

$.ajax({
url: '/django_task/info/',
data: JSON.stringify(tasks),
cache: false,
type: 'post',
dataType: 'json',
headers: {'X-CSRFToken': getCookie('csrftoken')}
}).done(function(data) {
console.log('data: %o', data);
});

Result::

[
{
"id": "c50bf040-a886-4aed-bf41-4ae794db0941",
"created_on": "2018-10-11T17:45:14.399491+00:00",
"created_on_display": "10/11/2018 19:45:14",
"created_by": "4f943f0b-f5a3-4fd8-bb2e-451d2be107e2",
"started_on": null,
"started_on_display": "",
"completed_on": null,
"completed_on_display": "",
"job_id": "",
"status": "PENDING",
"status_display": "<div class=\"task_status\" data-task-model=\"tasks.devicetesttask\" data-task-id=\"c50bf040-a886-4aed-bf41-4ae794db0941\" data-task-status=\"PENDING\" data-task-complete=\"0\">PENDING</div>",
"log_link_display": "",
"failure_reason": "",
"progress": null,
"progress_display": "-",
"completed": false,
"duration": null,
"duration_display": "",
"extra_fields": {
}
},
...
]

task_add_api
............

Create and run a new task based on specified parameters

Expected parameters:

- 'task-model' = "<app_name>.<model_name>"
- ... task parameters ...

Returns the id of the new task

TODO: provide a real usage example

task_run_api
............

Schedule execution of specified task.

Returns job.id or throws error (400).

Parameters:

- app_label
- model_name
- pk
- is_async (0 or 1, default=1)

Sample usage:

.. code:: javascript

var task_id = 'c50bf040-a886-4aed-bf41-4ae794db0941';

$.ajax({
url: sprintf('/django_task/tasks/devicetesttask/%s/run/', task_id),
cache: false,
type: 'get'
}).done(function(data) {
console.log('data: %o', data);
}).fail(function(jqXHR, textStatus, errorThrown) {
display_server_error(jqXHR.responseText);
});

Credits
-------

References:

- `A simple app that provides django integration for RQ (Redis Queue) <https://github.com/ui/django-rq>`_
- `Asynchronous tasks in django with django-rq <https://spapas.github.io/2015/01/27/async-tasks-with-django-rq/>`_
- `django-rq redux: advanced techniques and tools <https://spapas.github.io/2015/09/01/django-rq-redux/>`_
- `Benchmark: Shared vs. Dedicated Redis Instances <https://redislabs.com/blog/benchmark-shared-vs-dedicated-redis-instances/>`_
- `Django tasty salad - DOs and DON'Ts using Celery by Roberto Rosario <https://speakerdeck.com/siloraptor/django-tasty-salad-dos-and-donts-using-celery>`_

Tools used in rendering this package:

* Cookiecutter_
* `cookiecutter-djangopackage`_

.. _Cookiecutter: https://github.com/audreyr/cookiecutter
.. _`cookiecutter-djangopackage`: https://github.com/pydanny/cookiecutter-djangopackage




History
=======

v1.3.4
------
* javascript helper views
* fix Task.set_progress(0)

v1.3.3
------
* make sure fields are unique in TaskAdmin fieldsets

v1.3.1
------
* unit tests verified with Python 2.7/3.6/3.7 and Django 1.10/2.0

v1.3.0
------
* cleanup
* classify as production/stable

v1.2.5
------
* Tested with Django 2.0 and Python 3.7
* Rename `async` to `is_async` to support Python 3.7
* DJANGOTASK_REJECT_IF_NO_WORKER_ACTIVE_FOR_QUEUE app setting added
* example cleanup

v1.2.4
------
* API to create and run task via ajax

v1.2.3
------
* TaskAdmin: postpone autorun to response_add() to have M2M task parameters (if any) ready
* Task.clone() supports M2M parameters

v1.2.2
------
* property to change verbosity dinamically

v1.2.1
------
* util revoke_pending_tasks() added

v1.2.0
------
* DJANGOTASK_JOB_TRACE_ENABLED setting added to enable low level tracing in Job.run()
* Added missing import in utils.py

v1.1.3
------
* cleanup: remove get_child() method being Task an abstract class
* fix: skip Task model (being abstract) in dump_all_tasks and delete_all_tasks management commands
* generic get_model_from_id() helper
* Job.on_complete() callback

v1.1.2
------
* provide list of pending and completed task status

v1.1.0
------
* INCOMPATIBLE CHANGE: Make model Task abstract for better listing performances
* redundant migrations removed
* convert request.body to string for Python3
* pretty print task params in log when task completes

v0.3.8
------
* return verbose name as description

v0.3.7
------
* description added to Task model

v0.3.6
------
* More fixes

v0.3.5
------
* log to field fix

v0.3.4
------
* log quickview + view

v0.3.3
------
* Optionally log to either file or text field
* Management commands to dump and delete all tasks

v0.3.2
------
* search by task.id and task.job_id

v0.3.1
------
* Keep track of task mode (sync or async)

v0.3.0
------
* new class Job provided to share task-related logic among job funcs

v0.2.0
------
* fixes for django 2.x

v0.1.15
-------
* hack for prepopulated_fields

v0.1.14
-------
* css fix

v0.1.13
-------
* minor fixes

v0.1.12
------
* Deferred Task retrieval to avoid job vs. Task race condition
* Improved Readme

v0.1.11
-------
* superuser can view all tasks, while other users have access to their own tasks only
* js fix

v0.1.10
-------
* prevent task.failure_reason overflow

v0.1.9
------
* app settings

v0.1.8
------
* always start job from task.run() to prevent any possible race condition
* task.run(async) can now accept async=False

v0.1.7
------
* javascript: use POST to retrieve tasks state for UI update to prevent URL length limit exceed

v0.1.6
------
* Improved ui for TaskAdmin
* Fix unicode literals for Python3

v0.1.5
------
* fixes for Django 1.10
* send_email management command example added

v0.1.4
------
* Fix OneToOneRel import for Django < 1.9

v0.1.3
------
* Polymorphic behaviour or Task.get_child() restored

v0.1.2
------
* TaskCommand.run_task() renamed as TaskCommand.run_job()
* New TaskCommand.run_task() creates a Task, then runs it;
this guarantees that something is traced even when background job will fail


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, size & hash SHA256 hash help File type Python version Upload date
django_task-1.3.4-py2.py3-none-any.whl (40.0 kB) Copy SHA256 hash SHA256 Wheel py2.py3

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page