A Django app to run new background tasks from either admin or cron, and inspect task history from admin
Project description
A Django app to run new background tasks from either admin or cron, and inspect task history from admin
Quickstart
Install Django Task:
pip install django-task
Add it to your INSTALLED_APPS:
INSTALLED_APPS = (
...
'django_rq',
'django_task',
...
)
Add Django Task’s URL patterns:
urlpatterns = [
...
url(r'^django_task/', include('django_task.urls', namespace='django_task')),
...
]
Features
Purposes
create async tasks programmatically
create and monitor async tasks from admin
log all tasks in the database for later inspection
Details
each specific job is described my a Model derived from models.Task, which is responsible for:
selecting the name for the consumer queue among available queues
collecting and saving all parameters required by the associated job
running the specific job asyncronously
a new job can be run either:
creating a Task from the Django admin
creating a Task from code, then calling Task.run()
job execution workflow:
job execution is triggered by task.run(async)
job will receive the task.id, and retrieve paramerts from it (task.retrieve_params_as_dict())
on start, job will update task status to ‘STARTED’ and save job.id for reference
during execution, the job can update the progress indicator
on completion, task status is finally updated to either ‘SUCCESS’ or ‘FAILURE’
See example.jobs.count_beans for an example
Execute
Run consumer:
python manage.py runserver
Run worker(s):
python manage.py rqworker low high default
python manage.py rqworker low high default
...
Sample Task
from django.db import models
from django.conf import settings
from django_task.models import Task
class SendEmailTask(Task):
sender = models.CharField(max_length=256, null=False, blank=False)
recipients = models.TextField(null=False, blank=False,
help_text='put addresses in separate rows')
subject = models.CharField(max_length=256, null=False, blank=False)
message = models.TextField(null=False, blank=True)
TASK_QUEUE = settings.QUEUE_LOW
DEFAULT_VERBOSITY = 2
@staticmethod
def get_jobfunc():
from .jobs import send_email
return send_email
Sample Job
from __future__ import print_function
import redis
import logging
from django.conf import settings
from .models import SendEmailTask
from rq import get_current_job
from django_rq import job
@job(SendEmailTask.TASK_QUEUE)
def send_email(task_id):
task = None
result = 'SUCCESS'
failure_reason = ''
try:
# this raises a "Could not resolve a Redis connection" exception in sync mode
#job = get_current_job()
job = get_current_job(connection=redis.Redis.from_url(settings.REDIS_URL))
#task = SendEmailTask.objects.get(id=task_id)
task = SendEmailTask.get_task_from_id(task_id)
task.set_status(status='STARTED', job_id=job.get_id())
params = task.retrieve_params_as_dict()
recipient_list = params['recipients'].split()
sender = params['sender'].strip()
subject = params['subject'].strip()
message = params['message']
from django.core.mail import send_mail
send_mail(subject, message, sender, recipient_list)
except Exception as e:
if task:
task.log(logging.ERROR, str(e))
result = 'FAILURE'
failure_reason = str(e)
finally:
if task:
task.set_status(status=result, failure_reason=failure_reason)
Sample management command
from django_task.task_command import TaskCommand
class Command(TaskCommand):
def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument('sender')
parser.add_argument('subject')
parser.add_argument('message')
parser.add_argument('-r', '--recipients', nargs='*')
def handle(self, *args, **options):
from tasks.models import SendEmailTask
# transform the list of recipents into text
# (one line for each recipient)
options['recipients'] = '\n'.join(options['recipients']) if options['recipients'] is not None else ''
# format multiline message
options['message'] = options['message'].replace('\\n', '\n')
self.run_task(SendEmailTask, **options)
Deferred Task retrieval to avoid job vs. Task race condition
An helper Task.get_task_from_id() classmethod is supplied to retrieve Task object from task_id safely.
Task queues create a new type of race condition. Why ? Because message queues are fast ! How fast ? Faster than databases.
See:
https://speakerdeck.com/siloraptor/django-tasty-salad-dos-and-donts-using-celery
Howto separate jobs for different instances on the same machine
To sepatare jobs for different instances on the same machine (or more precisely for the same redis connection), override queues names for each instance;
for example:
# file "settings.py"
REDIS_URL = 'redis://localhost:6379/0'
...
#
# RQ config
#
RQ_PREFIX = "myproject_"
QUEUE_DEFAULT = RQ_PREFIX + 'default'
QUEUE_HIGH = RQ_PREFIX + 'high'
QUEUE_LOW = RQ_PREFIX + 'low'
RQ_QUEUES = {
QUEUE_DEFAULT: {
'URL': REDIS_URL,
#'PASSWORD': 'some-password',
'DEFAULT_TIMEOUT': 360,
},
QUEUE_HIGH: {
'URL': REDIS_URL,
'DEFAULT_TIMEOUT': 500,
},
QUEUE_LOW: {
'URL': REDIS_URL,
#'ASYNC': False,
},
}
RQ_SHOW_ADMIN_LINK = False
DJANGOTASK_LOG_ROOT = os.path.abspath(os.path.join(BASE_DIR, '..', 'protected', 'tasklog'))
DJANGOTASK_ALWAYS_EAGER = False
then run worker as follows:
python manage.py rqworker myproject_default
Howto schedule jobs with cron
Call management command ‘count_beans’, which in turn executes the required job.
For example:
SHELL=/bin/bash PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin 0 * * * * {{username}} timeout 55m {{django.pythonpath}}/python {{django.website_home}}/manage.py count_beans 1000 >> {{django.logto}}/cron.log 2>&1
A base class TaskCommand has been provided to simplify the creation of any specific task-related management commad;
a derived management command is only responsible for:
defining suitable command-line parameters
selecting the specific Task class and job function
for example:
from django_task.task_command import TaskCommand
class Command(TaskCommand):
def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument('num_beans', type=int)
def handle(self, *args, **options):
from tasks.models import CountBeansTask
self.run_task(CountBeansTask, **options)
Screenshots
App settings
- DJANGOTASK_LOG_ROOT
Path for log files.
Default: None
Example: os.path.abspath(os.path.join(BASE_DIR, ‘..’, ‘protected’, ‘tasklog’))
DJANGOTASK_ALWAYS_EAGER
When True, all task are execute syncronously (useful for debugging and unit testing).
Default: False
Running Tests
TODO
Does the code actually work?
source <YOURVIRTUALENV>/bin/activate (myenv) $ pip install tox (myenv) $ tox
Credits
References:
A simple app that provides django integration for RQ (Redis Queue)
Django tasty salad - DOs and DON’Ts using Celery by Roberto Rosario
Tools used in rendering this package:
History
0.1.12
Deferred Task retrieval to avoid job vs. Task race condition
Improved Readme
0.1.11
superuser can view all tasks, while other users have access to their own tasks only
js fix
0.1.10
prevent task.failure_reason overflow
0.1.9
app settings
0.1.8
always start job from task.run() to prevent any possible race condition
task.run(async) can now accept async=False
0.1.7
javascript: use POST to retrieve tasks state for UI update to prevent URL length limit exceed
0.1.6
Improved ui for TaskAdmin
Fix unicode literals for Python3
0.1.5
fixes for Django 1.10
send_email management command example added
0.1.4
Fix OneToOneRel import for Django < 1.9
0.1.3
Polymorphic behaviour or Task.get_child() restored
0.1.2
TaskCommand.run_task() renamed as TaskCommand.run_job()
New TaskCommand.run_task() creates a Task, then runs it; this guarantees that something is traced even when background job will fail
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for django_task-0.1.14-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 446645db15f5369c105449b4bc3a7a6b6fc8b976134ed4a3fbeac3924edaec52 |
|
MD5 | 4d3aa19f589973f95614409971d85423 |
|
BLAKE2b-256 | 3e46a1553bf590af02705dc9c70c86fc72f4befd5e2e47b87fe93ce5ba48a9c2 |