Skip to main content

Distributed tasks for Django Channels.

Project description


PyPI version


An attempt to implement a distributed task queue for use with Django channels. Modelled after RQ and Celery, complex task workflows are possible, all leveraging the Channels machinery.


There are three reasons:

  1. Aiming for more fault tolerant tasks. There are many occasions where information regarding how tests are progressing is needed to be stored persistently. For important tasks, this should be stored even in the case of a Redis fault, or if the worker goes down.

  2. Prefer to leverage the same machinery as channels.

  3. Would like to have a little extra functionality surrounding subtasks that didn't seem to be available via Celery or RQ.


There are two limitations involved:

  • REDIS must be used as the Django cache.

  • asgi_redis must be used as the channel backend.

There is work being done to remove these restrictions.


Use pip if you can:

pip install django-cq

or live on the edge:

pip install -e

Add the package to your settings file:


And include the routing information in your channel routing list:

channel_routing = [

You'll need to migrate to include the models:

./ migrate

You'll most likely want to create a new channel layer for your CQ tasks. The default layer has a short time-to-live on the channel messages, which causes slightly long running tasks to kill off any queued messages. Update your settings file to include the following:

    'default': {
    'long': {
        'BACKEND': 'asgi_redis.RedisChannelLayer',
        'CONFIG': {
            'hosts': [REDIS_URL],
            'expiry': 1800,
            'channel_capacity': {
                'cq-tasks': 1000
        'ROUTING': '',


In order to process messages sent on the "cq-tasks" channel a worker process needs to be launched:

./ cq_runworker


Basic task usage is straight forward:

def send_email(cqt, addr):
    return 'OK'

task = send_emails.delay('')
print(task.result)  # "OK"

Here, cqt is the task representation for the send_email task. This can be used to launch subtasks, chain subsequent tasks, amongst other things.

Tasks may also be run in serial by just calling them:

result = send_email('')
print(result)  # "OK"


For more complex workflows, subtasks may be launched from within parent tasks:

def send_emails(cqt):
    for addr in email_addresses:
        cqt.subtask(send_email, addr)
    return 'OK'

task = send_emails.delay()
print(task.result)  # "OK"

The difference between a subtask and another task launched using delay from within a task is that the parent task of a subtask will not be marked as complete until all subtasks are also complete.

from cq.models import Task

def parent(cqt):
    task_a.delay()  # not a subtask
    cqt.subtask(task_b)  # subtask

parent.status == Task.STATUS_WAITING  # True
# once task_b completes
parent.status == Task.STATUS_COMPLETE  # True

Chained Tasks


def calculate_something(cqt):
    return calc_a.delay(3).chain(add_a_to_4, (4,))

Non-atomic Tasks

By default every CQ task is atomic; no changes to the database will persist unless the task finishes without an exception. If you need to keep changes to the database, even in the event of an error, then use the atomic flag:

def unsafe_task(cqt):


For longer running tasks it's useful to be able to access an ongoing log of the task's progress. CQ tasks have a log method to send logging messages to both the standard Django log streams, and also cache them on the running task.

def long_task(cqt):
    cqt.log('standard old log')
    cqt.log('debugging log', logging.DEBUG)

If the current task is a subtask, the logs will go to the parent. This way there is a central task (the top-level task) which can be used to monitor the progress and status of a network of sub and chained tasks.


Due to the way logs are handled there can be issues with performance with a lot of frequent log messages. There are two ways to prevent this.

Reduce the frequency of logs by setting publish to False on as many log calls as you can. This will cache the logs locally and store them on the next publish=True call.

def long_task(cqt):
    for ii in range(100):
        cqt.log('iteration %d' % ii, publish=False)
    cqt.log('done')  # publish=True

Secondly, reducing the volume of logs may be accomplished by limiting the number of log lines that are kept. The limit option specifies this. The following will only keep 10 of the logged iterations:

def long_task(cqt):
    for ii in range(100):
        cqt.log('iteration %d' % ii, publish=False)
    cqt.log('done', limit=10)



Repeating Tasks

CQ comes with robust repeating tasks. There are two ways to create repeating tasks:

  1. From the Django admin.

  2. Using a data migration.

From the admin, click into cq and repeating tasks. From there you can create a new repeating task, specifying the background task to call, and a CRON time for repetition.

To create a repeating task from a migration, use the helper function schedule_task.

from django.db import migrations
from cq.models import schedule_task

from myapp.tasks import a_task

def add_repeating(apps, scema_editor):
    RepeatingTask = apps.get_model('cq.RepeatingTask')
        '* * * * *',

class Migration(migrations.Migration):
    operations = [
        migrations.RunPython(add_repeating, reverse_code=migrations.RunPython.noop)


Pending or queued instances of a coalescing task will prevent other instances of the task from running.

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

django_cq-0.3.3-py3-none-any.whl (32.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page