A Django app to run new background tasks from either admin or cron, and inspect task history from admin; based on django-rq
Project description
django-task
A Django app to run new background tasks from either admin or cron, and inspect task history from admin; based on django-rq
Quickstart
Install Django Task:
pip install django-task
Add it to your INSTALLED_APPS:
INSTALLED_APPS = (
...
'django_rq',
'django_task',
...
)
Add Django Task’s URL patterns:
urlpatterns = [
...
url(r'^django_task/', include('django_task.urls', namespace='django_task')),
...
]
Features
Purposes
create async tasks either programmatically or from admin
monitor async tasks from admin
log all tasks in the database for later inspection
optionally save task-specific logs in a TextField and/or in a FileField
Details
each specific job is described my a Model derived from models.Task, which is responsible for:
selecting the name for the consumer queue among available queues
collecting and saving all parameters required by the associated job
running the specific job asyncronously
a new job can be run either:
creating a Task from the Django admin
creating a Task from code, then calling Task.run()
job execution workflow:
job execution is triggered by task.run(is_async)
job will receive the task.id, and retrieve paramerts from it (task.retrieve_params_as_dict())
on start, job will update task status to ‘STARTED’ and save job.id for reference
during execution, the job can update the progress indicator
on completion, task status is finally updated to either ‘SUCCESS’ or ‘FAILURE’
See example.jobs.count_beans for an example
Screenshots
App settings
- DJANGOTASK_LOG_ROOT
Path for log files.
Default: None
Example: os.path.abspath(os.path.join(BASE_DIR, ‘..’, ‘protected’, ‘tasklog’))
- DJANGOTASK_ALWAYS_EAGER
When True, all task are execute syncronously (useful for debugging and unit testing).
Default: False
- DJANGOTASK_JOB_TRACE_ENABLED
Enables low level tracing in Job.run() - for debugging challenging race conditions
Default: False
- DJANGOTASK_REJECT_IF_NO_WORKER_ACTIVE_FOR_QUEUE
Rejects task if not active worker is available for the specific task queue when task.run() is called
Default: False
- REDIS_URL
Redis server to connect to
Default: ‘redis://localhost:6379/0’
Running Tests
Does the code actually work?
source <YOURVIRTUALENV>/bin/activate (myenv) $ pip install tox (myenv) $ tox
Support Job class
Starting from version 0.3.0, some conveniences have been added:
The @job decorator for job functions is no more required, as Task.run() now uses queue.enqueue() instead of jobfunc.delay(), and retrieves the queue name directly from the Task itself
each Task can set it’s own TASK_TIMEOUT value (expressed in seconds), that when provided overrides the default queue timeout
a new Job class has been provided to share suggested common logic before and after jobfunc execution
class Job(object):
@classmethod
def run(job_class, task_class, task_id):
job_trace('job.run() enter')
task = None
result = 'SUCCESS'
failure_reason = ''
try:
# this raises a "Could not resolve a Redis connection" exception in sync mode
#job = get_current_job()
job = get_current_job(connection=redis.Redis.from_url(REDIS_URL))
# Retrieve task obj and set as Started
task = task_class.get_task_from_id(task_id)
task.set_status(status='STARTED', job_id=job.get_id())
# Execute job passing by task
job_class.execute(job, task)
except Exception as e:
job_trace('ERROR: %s' % str(e))
job_trace(traceback.format_exc())
if task:
task.log(logging.ERROR, str(e))
task.log(logging.ERROR, traceback.format_exc())
result = 'FAILURE'
failure_reason = str(e)
finally:
if task:
task.set_status(status=result, failure_reason=failure_reason)
try:
job_class.on_complete(job, task)
except Exception as e:
job_trace('NESTED ERROR: Job.on_completed() raises error "%s"' % str(e))
job_trace(traceback.format_exc())
job_trace('job.run() leave')
@staticmethod
def on_complete(job, task):
pass
@staticmethod
def execute(job, task):
pass
so you can either override run() to implement a different logic, or (in most cases) just supply your own execute() method, and optionally override on_complete() to execute cleanup actions after job completion;
example:
class CountBeansJob(Job):
@staticmethod
def execute(job, task):
params = task.retrieve_params_as_dict()
num_beans = params['num_beans']
for i in range(0, num_beans):
time.sleep(0.01)
task.set_progress((i + 1) * 100 / num_beans, step=10)
@staticmethod
def on_complete(job, task):
print('task "%s" completed with: %s' % (str(task.id), task.status))
# An more realistic example from a real project ...
# if task.status != 'SUCCESS' or task.error_counter > 0:
# task.alarm = BaseTask.ALARM_STATUS_ALARMED
# task.save(update_fields=['alarm', ])
Execute
Run consumer:
python manage.py runserver
Run worker(s):
python manage.py rqworker low high default
python manage.py rqworker low high default
...
Sample Task
from django.db import models
from django.conf import settings
from django_task.models import Task
class SendEmailTask(Task):
sender = models.CharField(max_length=256, null=False, blank=False)
recipients = models.TextField(null=False, blank=False,
help_text='put addresses in separate rows')
subject = models.CharField(max_length=256, null=False, blank=False)
message = models.TextField(null=False, blank=True)
TASK_QUEUE = settings.QUEUE_LOW
TASK_TIMEOUT = 60
LOG_TO_FIELD = True
LOG_TO_FILE = False
DEFAULT_VERBOSITY = 2
@staticmethod
def get_jobfunc():
from .jobs import SendEmailJob
return SendEmailJob
You can change the verbosity dynamically by overridding the verbosity property:
When using LOG_TO_FILE = True, you might want to add a cleanup handler to remove the log file when the corresponding record is deleted:
import os from django.dispatch import receiver @receiver(models.signals.post_delete, sender=ImportaCantieriTask) def on_sendemailtask_delete_cleanup(sender, instance, **kwargs): """ Autodelete logfile on Task delete """ logfile = instance._logfile() if os.path.isfile(logfile): os.remove(logfile)
class SendEmailTask(Task):
@property
def verbosity(self):
#return self.DEFAULT_VERBOSITY
return 1 # either 0, 1 or 2
Sample Job
from __future__ import print_function
import redis
import logging
import traceback
from django.conf import settings
from .models import SendEmailTask
from django_task.job import Job
class SendEmailJob(Job):
@staticmethod
def execute(job, task):
params = task.retrieve_params_as_dict()
recipient_list = params['recipients'].split()
sender = params['sender'].strip()
subject = params['subject'].strip()
message = params['message']
from django.core.mail import send_mail
send_mail(subject, message, sender, recipient_list)
Sample management command
from django_task.task_command import TaskCommand
class Command(TaskCommand):
def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument('sender')
parser.add_argument('subject')
parser.add_argument('message')
parser.add_argument('-r', '--recipients', nargs='*')
def handle(self, *args, **options):
from tasks.models import SendEmailTask
# transform the list of recipents into text
# (one line for each recipient)
options['recipients'] = '\n'.join(options['recipients']) if options['recipients'] is not None else ''
# format multiline message
options['message'] = options['message'].replace('\\n', '\n')
self.run_task(SendEmailTask, **options)
Deferred Task retrieval to avoid job vs. Task race condition
An helper Task.get_task_from_id() classmethod is supplied to retrieve Task object from task_id safely.
Task queues create a new type of race condition. Why ? Because message queues are fast ! How fast ? Faster than databases.
See:
https://speakerdeck.com/siloraptor/django-tasty-salad-dos-and-donts-using-celery
A similar generic helper is available for Job-derived needs:
django_task.utils.get_model_from_id(model_cls, id, timeout=1000, retry_count=10)
Howto separate jobs for different instances on the same machine
To sepatare jobs for different instances on the same machine (or more precisely for the same redis connection), override queues names for each instance;
for example:
# file "settings.py"
REDIS_URL = 'redis://localhost:6379/0'
...
#
# RQ config
#
RQ_PREFIX = "myproject_"
QUEUE_DEFAULT = RQ_PREFIX + 'default'
QUEUE_HIGH = RQ_PREFIX + 'high'
QUEUE_LOW = RQ_PREFIX + 'low'
RQ_QUEUES = {
QUEUE_DEFAULT: {
'URL': REDIS_URL,
#'PASSWORD': 'some-password',
'DEFAULT_TIMEOUT': 360,
},
QUEUE_HIGH: {
'URL': REDIS_URL,
'DEFAULT_TIMEOUT': 500,
},
QUEUE_LOW: {
'URL': REDIS_URL,
#'ASYNC': False,
},
}
RQ_SHOW_ADMIN_LINK = False
DJANGOTASK_LOG_ROOT = os.path.abspath(os.path.join(BASE_DIR, '..', 'protected', 'tasklog'))
DJANGOTASK_ALWAYS_EAGER = False
DJANGOTASK_JOB_TRACE_ENABLED = False
DJANGOTASK_REJECT_IF_NO_WORKER_ACTIVE_FOR_QUEUE = True
then run worker as follows:
python manage.py rqworker myproject_default
Howto schedule jobs with cron
Call management command ‘count_beans’, which in turn executes the required job.
For example:
SHELL=/bin/bash PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin 0 * * * * {{username}} timeout 55m {{django.pythonpath}}/python {{django.website_home}}/manage.py count_beans 1000 >> {{django.logto}}/cron.log 2>&1
A base class TaskCommand has been provided to simplify the creation of any specific task-related management commad;
a derived management command is only responsible for:
defining suitable command-line parameters
selecting the specific Task class and job function
for example:
from django_task.task_command import TaskCommand
class Command(TaskCommand):
def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument('num_beans', type=int)
def handle(self, *args, **options):
from tasks.models import CountBeansTask
self.run_task(CountBeansTask, **options)
Javascript helpers
A few utility views have been supplied for interacting with tasks from javascript.
tasks_info_api
Retrieve informations about a list of existing tasks
Sample usage:
var tasks = [{
id: 'c50bf040-a886-4aed-bf41-4ae794db0941',
model: 'tasks.devicetesttask'
}, {
id: 'e567c651-c8d5-4dc7-9cbf-860988f55022',
model: 'tasks.devicetesttask'
}];
$.ajax({
url: '/django_task/info/',
data: JSON.stringify(tasks),
cache: false,
type: 'post',
dataType: 'json',
headers: {'X-CSRFToken': getCookie('csrftoken')}
}).done(function(data) {
console.log('data: %o', data);
});
Result:
[ { "id": "c50bf040-a886-4aed-bf41-4ae794db0941", "created_on": "2018-10-11T17:45:14.399491+00:00", "created_on_display": "10/11/2018 19:45:14", "created_by": "4f943f0b-f5a3-4fd8-bb2e-451d2be107e2", "started_on": null, "started_on_display": "", "completed_on": null, "completed_on_display": "", "job_id": "", "status": "PENDING", "status_display": "<div class=\"task_status\" data-task-model=\"tasks.devicetesttask\" data-task-id=\"c50bf040-a886-4aed-bf41-4ae794db0941\" data-task-status=\"PENDING\" data-task-complete=\"0\">PENDING</div>", "log_link_display": "", "failure_reason": "", "progress": null, "progress_display": "-", "completed": false, "duration": null, "duration_display": "", "extra_fields": { } }, ... ]
task_add_api
Create and run a new task based on specified parameters
Expected parameters:
‘task-model’ = “<app_name>.<model_name>”
… task parameters …
Returns the id of the new task
TODO: provide a real usage example
task_run_api
Schedule execution of specified task.
Returns job.id or throws error (400).
Parameters:
app_label
model_name
pk
is_async (0 or 1, default=1)
Sample usage:
var task_id = 'c50bf040-a886-4aed-bf41-4ae794db0941';
$.ajax({
url: sprintf('/django_task/tasks/devicetesttask/%s/run/', task_id),
cache: false,
type: 'get'
}).done(function(data) {
console.log('data: %o', data);
}).fail(function(jqXHR, textStatus, errorThrown) {
display_server_error(jqXHR.responseText);
});
Credits
References:
History
v1.3.9
removed forgotten pdb.set_trace() in revoke_pending_tasks()
v1.3.8
cleanup
v1.3.7
cleanup
v1.3.6
log queue name
v1.3.5
Readme updated
v1.3.4
javascript helper views
fix Task.set_progress(0)
v1.3.3
make sure fields are unique in TaskAdmin fieldsets
v1.3.1
unit tests verified with Python 2.7/3.6/3.7 and Django 1.10/2.0
v1.3.0
cleanup
classify as production/stable
v1.2.5
Tested with Django 2.0 and Python 3.7
Rename async to is_async to support Python 3.7
DJANGOTASK_REJECT_IF_NO_WORKER_ACTIVE_FOR_QUEUE app setting added
example cleanup
v1.2.4
API to create and run task via ajax
v1.2.3
TaskAdmin: postpone autorun to response_add() to have M2M task parameters (if any) ready
Task.clone() supports M2M parameters
v1.2.2
property to change verbosity dinamically
v1.2.1
util revoke_pending_tasks() added
v1.2.0
DJANGOTASK_JOB_TRACE_ENABLED setting added to enable low level tracing in Job.run()
Added missing import in utils.py
v1.1.3
cleanup: remove get_child() method being Task an abstract class
fix: skip Task model (being abstract) in dump_all_tasks and delete_all_tasks management commands
generic get_model_from_id() helper
Job.on_complete() callback
v1.1.2
provide list of pending and completed task status
v1.1.0
INCOMPATIBLE CHANGE: Make model Task abstract for better listing performances
redundant migrations removed
convert request.body to string for Python3
pretty print task params in log when task completes
v0.3.8
return verbose name as description
v0.3.7
description added to Task model
v0.3.6
More fixes
v0.3.5
log to field fix
v0.3.4
log quickview + view
v0.3.3
Optionally log to either file or text field
Management commands to dump and delete all tasks
v0.3.2
search by task.id and task.job_id
v0.3.1
Keep track of task mode (sync or async)
v0.3.0
new class Job provided to share task-related logic among job funcs
v0.2.0
fixes for django 2.x
v0.1.15
hack for prepopulated_fields
v0.1.14
css fix
v0.1.13
minor fixes
v0.1.12
Deferred Task retrieval to avoid job vs. Task race condition
Improved Readme
v0.1.11
superuser can view all tasks, while other users have access to their own tasks only
js fix
v0.1.10
prevent task.failure_reason overflow
v0.1.9
app settings
v0.1.8
always start job from task.run() to prevent any possible race condition
task.run(async) can now accept async=False
v0.1.7
javascript: use POST to retrieve tasks state for UI update to prevent URL length limit exceed
v0.1.6
Improved ui for TaskAdmin
Fix unicode literals for Python3
v0.1.5
fixes for Django 1.10
send_email management command example added
v0.1.4
Fix OneToOneRel import for Django < 1.9
v0.1.3
Polymorphic behaviour or Task.get_child() restored
v0.1.2
TaskCommand.run_task() renamed as TaskCommand.run_job()
New TaskCommand.run_task() creates a Task, then runs it; this guarantees that something is traced even when background job will fail
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for django_task-1.3.9-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 759396c2252e323fa9eb972f7566f8ce3d863f85d46ecde34e2ea77256fa964d |
|
MD5 | 46757ab601f796659dcf6f47f1a83428 |
|
BLAKE2b-256 | 18fca0bc5cae76d414cd17f6d042a666b5cb73326bd54105f816fe57ee56119a |