Skip to main content

A simplistic task queue and cron-like scheduler for Django

Project description

Django Too Simple Queue

PyPI version Workflow

This packages provides a simplistic task queue and scheduler for Django.

It is geared towards basic apps, where simplicity primes. The package offers simple decorator syntax, including cron-like schedules.

Features :

  • no celery/redis/rabbitmq/whatever... just Django !
  • clean decorator syntax to register tasks and schedules
  • simple queuing syntax
  • cron-like scheduling
  • tasks.py autodiscovery
  • django admin integration
  • tasks results stored using the Django ORM

Limitations :

  • no multithreading yet (but running multiple workers should work)
  • not well suited for projects spawning a high volume of tasks

Compatibility:

  • Django 3.2 and 4.0
  • Python 3.8, 3.9, 3.10

Installation

Install the library :

$ pip install django-toosimple-q

Enable the app in settings.py :

INSTALLED_APPS = [
    ...
    'django_toosimple_q',
    ...
]

Quickstart

Tasks need to be registered using the @register_task() decorator. Once registered, they can be added to the queue by calling the .queue() function.

from django_toosimple_q.decorators import register_task

# Register a task
@register_task()
def my_task(name):
    return f"Hello {name} !"

# Enqueue tasks
my_task.queue("John")
my_task.queue("Peter")

Registered tasks can be scheduled from code using this cron-like syntax :

from django_toosimple_q.decorators import register_task, schedule

# Register and schedule tasks
@schedule(cron="30 8 * * *", args=['John'])
@register_task()
def morning_routine(name):
    return f"Good morning {name} !"

To consume the tasks, you need to run at least one worker :

$ python manage.py worker

The workers will take care of adding scheduled tasks to the queue when needed, and will execute the tasks.

The package autoloads tasks.py from all installed apps. While this is the recommended place to define your tasks, you can do so from anywhere in your code.

Advanced usage

Tasks

You can optionnaly give a custom name to your tasks. This is required when your task is defined in a local scope.

@register_task(name="my_favourite_task")
def my_task(name):
    return f"Good morning {name} !"

You can set task priorities.

@register_task(priority=0)
def my_favourite_task(name):
    return f"Good bye {name} !"

@register_task(priority=1)
def my_other_task(name):
    return f"Hello {name} !"

# Enqueue tasks
my_other_task.queue("John")
my_favourite_task.queue("Peter")  # will be executed before the other one

You can define retries=N and retry_delay=S to retry the task in case of failure. The delay (in second) will double on each failure.

@register_task(retries=10, retry_delay=60)
def send_email():
    ...

You can mark a task as unique=True if the task shouldn't be queued again if already queued with the same arguments. This is usefull for tasks such as cleaning or refreshing.

@register_task(unique=True)
def cleanup():
    ...

cleanup.queue()
cleanup.queue()  # this will be ignored as long as the first one is still queued

You can assign tasks to specific queues, and then have your worker only consume tasks from specific queues using --queue myqueue or --exclude_queue myqueue. By default, workers consume all tasks.

@register_task(queue='long_running')
def long_task():
    ...

@register_task()
def short_task():
    ...

# Then run those with these workers, so that long
# running tasks don't prevent short running tasks
# from being run :
# manage.py worker --exclude_queue long_running
# manage.py worker

You can enqueue tasks with a specific due date.

@register_task()
def my_task(name):
    return f"Hello {name} !"

# Enqueue tasks
my_task.queue("John", due=timezone.now() + timedelta(hours=1))

Schedules

By default, last_tick is set to now() on schedule creation. This means they will only run on next cron occurence. If you need your schedules to be run as soon as possible after initialisation, you can specify run_on_creation=True.

@schedule_task(cron="30 8 * * *", run_on_creation=True)
@register_task()
def my_task(name):
    return f"Good morning {name} !"

By default, if some crons where missed (e.g. after a server shutdown or if the workers can't keep up with all tasks), the missed tasks will be lost. If you need the tasks to catch up, set catch_up=True.

@schedule_task(cron="30 8 * * *", catch_up=True)
@register_task()
def my_task(name):
    ...

You may define multiple schedules for the same task. In this case, it is mandatory to specify a unique name :

@schedule_task(name="morning_routine", cron="30 16 * * *", args=['morning'])
@schedule_task(name="afternoon_routine", cron="30 8 * * *", args=['afternoon'])
@register_task()
def my_task(time):
    return f"Good {time} John !"

You may get the schedule's cron datetime provided as a keyword argument to the task using the datetime_kwarg argument :

@schedule_task(cron="30 8 * * *", datetime_kwarg="scheduled_on")
@register_task()
def my_task(scheduled_on):
    return f"This was scheduled for {scheduled_on.isoformat()}."

Similarly to tasks, you can assign schedules to specific queues, and then have your worker only consume tasks from specific queues using --queue myqueue or --exclude_queue myqueue.

@register_schedule(queue='scheduler')
@register_task(queue='worker')
def task():
    ...

# Then run those with these workers
# manage.py worker --queue scheduler
# manage.py worker --queue worker

Management comment

Besides standard django management commands arguments, the management command supports following arguments.

usage: manage.py worker [--queue QUEUE | --exclude_queue EXCLUDE_QUEUE]
                        [--tick TICK]
                        [--once | --until_done]
                        [--label LABEL]
                        [--timeout TIMEOUT]

optional arguments:
  --queue QUEUE         which queue to run (can be used several times, all
                        queues are run if not provided)
  --exclude_queue EXCLUDE_QUEUE
                        which queue not to run (can be used several times, all
                        queues are run if not provided)
  --tick TICK           frequency in seconds at which the database is checked
                        for new tasks/schedules
  --once                run once then exit (useful for debugging)
  --until_done          run until no tasks are available then exit (useful for
                        debugging)
  --label LABEL         the name of the worker to help identifying it ('{pid}'
                        will be replaced by the process id)
  --timeout TIMEOUT     the time in seconds after which this worker will be considered
                        offline (set this to a value higher than the longest tasks this
                        worker will execute)

Demo project

A demo project with pre-configured tasks is provided.

python demoproject/manage.py migrate
python demoproject/manage.py createsuperuser
python demoproject/manage.py runserver
python demoproject/manage.py worker  # from a different shell

Then open http://127.0.0.1:8000/admin in your browser

Contrib apps

django_toosimple_q.contrib.mail

A queued email backend to send emails asynchronously, preventing your website from failing completely in case the upstream backend is down.

Installation

Enable and configure the app in settings.py :

INSTALLED_APPS = [
    ...
    'django_toosimple_q.contrib.mail',
    ...
]

EMAIL_BACKEND = 'django_toosimple_q.contrib.mail.backends.QueueBackend'

# Actual Django email backend used, defaults to django.core.mail.backends.smtp.EmailBackend, see https://docs.djangoproject.com/en/3.2/ref/settings/#email-backend
TOOSIMPLEQ_EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'

Dev

Tests

To run tests locally (by default, tests runs against an in-memory sqlite database):

$ pip install -r requirements-dev.txt
$ python manage.py test

To run tests against postgres, run the following commands before :

# Start a local postgres database
$ docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres -d postgres
# Set and env var
$ export TOOSIMPLEQ_TEST_DB=postgres # on Windows: `$Env:TOOSIMPLEQ_TEST_DB = "postgres"`

Tests are run automatically on github.

Contribute

Code style is done with pre-commit :

$ pip install -r requirements-dev.txt
$ pre-commit install

Internals

Terms

Task: a callable with a known name in the registry. These are typically registered in tasks.py.

TaskExecution: a specific planned or past call of a task, including inputs (arguments) and outputs. This is a model, whose instanced are typically created using mycallable.queue() or from schedules.

Schedule: a configuration for repeated execution of tasks. These are typically configured in tasks.py.

ScheduleExecution: the last execution of a schedule (e.g. keeps track of the last time a schedule actually lead to generate a task execution). This is a model, whose instances are created by the worker.

Registry: a dictionary keeping all registered schedules and tasks.

Worker: a management command that executes schedules and tasks on a regular basis.

Changelog

  • 2022-03-28 : v1.0.0b ⚠ BACKWARDS INCOMPATIBLE RELEASE ⚠

    • feature: added workerstatus to the admin, allowing to monitor workers
    • feature: queue tasks for later (mytask.queue(due=now()+timedelta(hours=2)))
    • feature: assign queues to schedules (@register_schedule(queue="schedules"))
    • refactor: removed non-execution related data from the database (clarifying the fact tha the source of truth is the registry)
    • refactor: better support for concurrent workers
    • refactor: better names for models and decorators
    • infra: included a demo project
    • infra: improved testing, including for concurrency behaviour
    • infra: updated compatibility to Django 3.2/4.0 and Python 3.8-3.18
    • quick migration guide:
      • rename @schedule -> @schedule_task
      • task name must now be provided as a kwarg: @register_task("mytask") -> @register_task(name="mytask"))
      • replace @schedule_task(..., last_check=None) -> @schedule_task(..., run_on_creation=True)
      • models: Schedule -> ScheduleExec and Task -> TaskExec
  • 2022-03-24 : v0.4.0

    • made last_check and last_run optional in the admin
    • defined id fields
  • 2021-07-15 : v0.3.0

    • added contrib.mail
    • task replacement now tracked with a FK instead of a state
    • also run tests on postgres
    • added datetime_kwarg argument to schedules
  • 2021-06-11 : v0.2.0

    • added retries, retry_delay options for tasks
    • improve logging
  • 2020-11-12 : v0.1.0

    • fixed bug where updating schedule failed
    • fixed worker not doing all available tasks for each tick
    • added --tick argument
    • enforce uniqueness of schedule

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django-toosimple-q-1.0.0b0.tar.gz (30.5 kB view details)

Uploaded Source

Built Distribution

django_toosimple_q-1.0.0b0-py3-none-any.whl (41.3 kB view details)

Uploaded Python 3

File details

Details for the file django-toosimple-q-1.0.0b0.tar.gz.

File metadata

  • Download URL: django-toosimple-q-1.0.0b0.tar.gz
  • Upload date:
  • Size: 30.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.1 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.7.12

File hashes

Hashes for django-toosimple-q-1.0.0b0.tar.gz
Algorithm Hash digest
SHA256 e25151262cdf85f346870a85e94d3fce6c2ca3d191e0b87aace148678eb0d64f
MD5 9f0390d7a8efdedc74716d53471fb741
BLAKE2b-256 1352a309a16cc3cf13f135a1daf97c53089fabcf4da0837abfc9562d938c7ad9

See more details on using hashes here.

File details

Details for the file django_toosimple_q-1.0.0b0-py3-none-any.whl.

File metadata

  • Download URL: django_toosimple_q-1.0.0b0-py3-none-any.whl
  • Upload date:
  • Size: 41.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.1 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.7.12

File hashes

Hashes for django_toosimple_q-1.0.0b0-py3-none-any.whl
Algorithm Hash digest
SHA256 d84c7dc842167080a4d5b96441db5f816c034561ee7435e522539e56a30400ec
MD5 90c3cad6018b88cb739a3caede8f1c11
BLAKE2b-256 6383b9ca2addea488138cd45a1ba4176255d5e4d9a41ccd2c90072639ea69b7c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page