A simplistic task queue and cron-like scheduler for Django
Project description
Django Too Simple Queue
This packages provides a simplistic task queue and scheduler for Django.
It is geared towards basic apps, where simplicity primes. The package offers simple decorator syntax, including cron-like schedules.
Features :
- no celery/redis/rabbitmq/whatever... just Django !
- clean decorator syntax to register tasks and schedules
- simple queuing syntax
- cron-like scheduling
- tasks.py autodiscovery
- django admin integration
- tasks results stored using the Django ORM
Limitations :
- no multithreading yet (but running multiple workers should work)
- not well suited for projects spawning a high volume of tasks
Compatibility:
- Django 3.2 and 4.0
- Python 3.8, 3.9, 3.10
Installation
Install the library :
pip install django-toosimple-q
Enable the app in settings.py
:
INSTALLED_APPS = [
# ...
'django_toosimple_q',
# ...
]
Quickstart
Tasks need to be registered using the @register_task()
decorator. Once registered, they can be added to the queue by calling the .queue()
function.
from django_toosimple_q.decorators import register_task
# Register a task
@register_task()
def my_task(name):
return f"Hello {name} !"
# Enqueue tasks
my_task.queue("John")
my_task.queue("Peter")
Registered tasks can be scheduled from code using this cron-like syntax :
from django_toosimple_q.decorators import register_task, schedule_task
# Register and schedule tasks
@schedule_task(cron="30 8 * * *", args=['John'])
@register_task()
def morning_routine(name):
return f"Good morning {name} !"
To consume the tasks, you need to run at least one worker :
python manage.py worker
The workers will take care of adding scheduled tasks to the queue when needed, and will execute the tasks.
The package autoloads tasks.py
from all installed apps. While this is the recommended place to define your tasks, you can do so from anywhere in your code.
Advanced usage
Tasks
You can optionnaly give a custom name to your tasks. This is required when your task is defined in a local scope.
@register_task(name="my_favourite_task")
def my_task():
...
You can set task priorities.
@register_task(priority=0)
def my_favourite_task():
...
@register_task(priority=1)
def my_other_task():
...
# Enqueue tasks
my_other_task.queue()
my_favourite_task.queue() # will be executed before the other one
You can define retries=N
and retry_delay=S
to retry the task in case of failure. The delay (in second) will double on each failure.
@register_task(retries=10, retry_delay=60)
def download_data():
...
You can mark a task as unique=True
if the task shouldn't be queued again if already queued with the same arguments. This is usefull for tasks such as cleaning or refreshing.
@register_task(unique=True)
def cleanup():
...
cleanup.queue()
cleanup.queue() # this will be ignored as long as the first one is still queued
You can assign tasks to specific queues, and then have your worker only consume tasks from specific queues using --queue myqueue
or --exclude_queue myqueue
. By default, workers consume all tasks.
@register_task(queue='long_running')
def long_task():
...
@register_task()
def short_task():
...
# Then run those with these workers, so that long
# running tasks don't prevent short running tasks
# from being run :
# manage.py worker --exclude_queue long_running
# manage.py worker
You can enqueue tasks with a specific due date.
@register_task()
def my_task():
...
# Enqueue tasks
from datetime import datetime, timedelta
my_task.queue("John", due=datetime.now() + timedelta(hours=1))
The queue()
function returns a TaskExec
model instance, which holds information about the task execution, including the task result.
from django.core.management import call_command
from django_toosimple_q.models import TaskExec
@register_task()
def multiply(a, b):
return a * b
t = multiply.queue(3, 4)
assert t.state == TaskExec.States.QUEUED
assert t.result == None
call_command("worker", "--until_done") # equivalent to `python manage.py worker --until_done`
t.refresh_from_db()
assert t.state == TaskExec.States.SUCCEEDED
assert t.result == 12
Schedules
You may define multiple schedules for the same task. In this case, it is mandatory to specify a unique name :
@schedule_task(name="afternoon_routine", cron="30 16 * * *", args=['afternoon'])
@schedule_task(name="morning_routine", cron="30 8 * * *", args=['morning'])
@register_task()
def my_task(time_of_day):
return f"Good {time_of_day} John !"
By default, last_tick
is set to now()
on schedule creation. This means they will only run on next cron occurence. If you need your schedules to be run as soon as possible after initialisation, you can specify run_on_creation=True
.
@schedule_task(cron="30 8 * * *", run_on_creation=True)
@register_task()
def my_task():
...
By default, if some crons where missed (e.g. after a server shutdown or if the workers can't keep up with all tasks), the missed tasks will be lost. If you need the tasks to catch up, set catch_up=True
.
@schedule_task(cron="30 8 * * *", catch_up=True)
@register_task()
def my_task():
...
You may get the schedule's cron datetime provided as a keyword argument to the task using the datetime_kwarg
argument. This is often useful in combination with catch_up, for things like report generation.
@schedule_task(cron="30 8 * * *", datetime_kwarg="scheduled_on", catch_up=True)
@register_task()
def my_task(scheduled_on):
return f"This was scheduled for {scheduled_on.isoformat()}."
Similarly to tasks, you can assign schedules to specific queues, and then have your worker only consume tasks from specific queues using --queue myqueue
or --exclude_queue myqueue
.
@schedule_task(cron="30 8 * * *", queue='scheduler')
@register_task(queue='worker')
def task():
...
# Then run those with these workers
# manage.py worker --queue scheduler
# manage.py worker --queue worker
Management comment
Besides standard django management commands arguments, the management command supports following arguments.
usage: manage.py worker [--queue QUEUE | --exclude_queue EXCLUDE_QUEUE]
[--tick TICK]
[--once | --until_done]
[--label LABEL]
[--timeout TIMEOUT]
optional arguments:
--queue QUEUE which queue to run (can be used several times, all
queues are run if not provided)
--exclude_queue EXCLUDE_QUEUE
which queue not to run (can be used several times, all
queues are run if not provided)
--tick TICK frequency in seconds at which the database is checked
for new tasks/schedules
--once run once then exit (useful for debugging)
--until_done run until no tasks are available then exit (useful for
debugging)
--label LABEL the name of the worker to help identifying it ('{pid}'
will be replaced by the process id)
--timeout TIMEOUT the time in seconds after which this worker will be considered
offline (set this to a value higher than the longest tasks this
worker will execute)
Demo project
A demo project with pre-configured tasks is provided.
python demoproject/manage.py migrate
python demoproject/manage.py createsuperuser
python demoproject/manage.py runserver
python demoproject/manage.py worker # from a different shell
Then open http://127.0.0.1:8000/admin in your browser
Contrib apps
django_toosimple_q.contrib.mail
A queued email backend to send emails asynchronously, preventing your website from failing completely in case the upstream backend is down.
Enable and configure the app in settings.py
:
INSTALLED_APPS = [
# ...
'django_toosimple_q.contrib.mail',
# ...
]
EMAIL_BACKEND = 'django_toosimple_q.contrib.mail.backends.QueueBackend'
# Actual Django email backend used, defaults to django.core.mail.backends.smtp.EmailBackend, see https://docs.djangoproject.com/en/3.2/ref/settings/#email-backend
TOOSIMPLEQ_EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
Head to the Django documentation for usage.
Dev
Tests
To run tests locally (by default, tests runs against an in-memory sqlite database):
pip install -r requirements-dev.txt
python manage.py test
To run tests against postgres, run the following commands before :
# Start a local postgres database
docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres -d postgres
# Set and env var
export TOOSIMPLEQ_TEST_DB=postgres # on Windows: `$Env:TOOSIMPLEQ_TEST_DB = "postgres"`
Tests are run automatically on github.
Contribute
Code style is done with pre-commit :
pip install -r requirements-dev.txt
pre-commit install
Internals
Terms
Task: a callable with a known name in the registry. These are typically registered in tasks.py
.
TaskExecution: a specific planned or past call of a task, including inputs (arguments) and outputs. This is a model, whose instanced are typically created using mycallable.queue()
or from schedules.
Schedule: a configuration for repeated execution of tasks. These are typically configured in tasks.py
.
ScheduleExecution: the last execution of a schedule (e.g. keeps track of the last time a schedule actually lead to generate a task execution). This is a model, whose instances are created by the worker.
Registry: a dictionary keeping all registered schedules and tasks.
Worker: a management command that executes schedules and tasks on a regular basis.
Changelog
-
2022-10-28 : v1.0.0b ⚠ BACKWARDS INCOMPATIBLE RELEASE ⚠
- feature: added workerstatus to the admin, allowing to monitor workers
- feature: queue tasks for later (
mytask.queue(due=now()+timedelta(hours=2))
) - feature: assign queues to schedules (
@schedule_task(queue="schedules")
) - refactor: removed non-execution related data from the database (clarifying the fact tha the source of truth is the registry)
- refactor: better support for concurrent workers
- refactor: better names for models and decorators
- infra: included a demo project
- infra: improved testing, including for concurrency behaviour
- infra: updated compatibility to Django 3.2/4.1 and Python 3.8-3.10
- quick migration guide:
- rename
@schedule
->@schedule_task
- task name must now be provided as a kwarg:
@register_task("mytask")
->@register_task(name="mytask")
) - replace
@schedule_task(..., last_check=None)
->@schedule_task(..., run_on_creation=True)
- models:
Schedule
->ScheduleExec
andTask
->TaskExec
- rename
-
2022-03-24 : v0.4.0
- made
last_check
andlast_run
optional in the admin - defined
id
fields
- made
-
2021-07-15 : v0.3.0
- added
contrib.mail
- task replacement now tracked with a FK instead of a state
- also run tests on postgres
- added
datetime_kwarg
argument to schedules
- added
-
2021-06-11 : v0.2.0
- added
retries
,retry_delay
options for tasks - improve logging
- added
-
2020-11-12 : v0.1.0
- fixed bug where updating schedule failed
- fixed worker not doing all available tasks for each tick
- added --tick argument
- enforce uniqueness of schedule
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file django-toosimple-q-1.0.0b3.tar.gz
.
File metadata
- Download URL: django-toosimple-q-1.0.0b3.tar.gz
- Upload date:
- Size: 30.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.7.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5ed5e569e2c604c889aa93c019066a55add74506b1b2c2e6193c02f8cf4d4964 |
|
MD5 | a99e2349c356692f29509a3e30bda2e6 |
|
BLAKE2b-256 | 805c1f3762d89b6f88396ede8fca708e0f765635c3a7d8f5123d641572a39cbd |
File details
Details for the file django_toosimple_q-1.0.0b3-py3-none-any.whl
.
File metadata
- Download URL: django_toosimple_q-1.0.0b3-py3-none-any.whl
- Upload date:
- Size: 40.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.7.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | eb10a34884b0e423a6c94a271982636d7a277ac014f5a72c5c4a79e70ddfe366 |
|
MD5 | ce0355b27bc380c65d9bc107dc0e9927 |
|
BLAKE2b-256 | f8af136e5516eabc405063770f1b3df49c142901ea7ee25978e195789af58c93 |