Skip to main content

A middleware for Dramatiq (for Django) that keeps track of task state only when you need it to.

Project description

dramatiq-taskstate

A middleware for Dramatiq (for Django) that keeps track of task state only when you need it to.

Note:

When using the term "task" in the documentation: that would generally refer to the task model in this package. It has nothing to do with Dramatiq or the django_dramatiq package except that the Task model is an abstraction of a Dramatiq task. Therefore, this package only operates on the Task model and not Dramatiq tasks.

Quickstart

  1. Install dramatiq-taskstate via pip:

    pip install dramatiq-taskstate
    
  2. Add taskstate and django.contrib.postgres to your INSTALLED_APPS in your project settings.py file:

    INSTALLED_APPS = [
        'django_dramatiq',
        '...',
        'django.contrib.postgres',
        '...',
        'taskstate',
    ]
    
  3. Run migrate:

    python manage.py migrate
    
  4. Include the middleware in your django_dramatiq configuration:

    DRAMATIQ_BROKER = {
        'BROKER': '...',
        'OPTIONS': {
            # ...
        },
        'MIDDLEWARE': [
            # ...
            'taskstate.middleware.StateMiddleware',
        ]
    }
    
  5. Add a for_state parameter to Dramatiq actors that need task state to be tracked. The middleware will ignore any tasks that don't have this argument. Also remember that all values in the for_state dictionary must be JSON serializable.

    @dramatiq.actor
    def my_actor(arg, for_state={}):
        pass
    
  6. Then, when sending a new task to Dramatiq: the for_state dictionary can contain any of the following keys:

    'for_state': {
        'user_pk': user.pk,
        'model_name': 'model',
        'app_name': 'app',
        'description': 'description',
    }
    
  7. Each time a task's status is updated a task_changed signal is dispatched which can be handled like this:

    from django.dispatch import receiver
    
    from taskstate.middleware import StateMiddleware
    from taskstate.signals import task_changed
    
    @receiver(task_changed, sender=StateMiddleware)
    def handle_task_changed(sender, task, **kwargs):
        pass
    

    Keep in mind that this is not a post_save signal -- it only fires for status updates.

Reporting task state to the UI

Of course, a common case with background tasks is that the progress/state of a task needs to be displayed to a user somehow. This package includes a WebsocketConsumer that can be used with django-channels to check the status of a task. Check the flowchart in the root of the repo for more information on how this works.

Check the get_task_status.js file in the taskstate/static directory for an example of how to send a request via websockets to get/monitor the task status/progress. Also, as shown in the flowchart in the root of the repo, both task_changed -- which only handles when the task object's status is updated, i.e, enqueued, done, etc. -- and post_save signals are handled.

Routing is included for django-channels. Make sure to use the URLRouter for your django-channels configuration. You can send data for the websocket to the following route:

/ws/get-task-status/

Or, create your own route:

from django.urls import re_path

from taskstate.consumers import CheckTaskStatus

websocket_urlpatterns = [
    re_path(r'^ws/custom-route-task-status/$', CheckTaskStatus.as_asgi()),
]

Also remember to add the routes to your django-channels router, for example:

application = ProtocolTypeRouter({
    'http': django_asgi_app,
    'websocket': AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter(
                taskstate.routing.websocket_urlpatterns,
            )
        ),
    ),
})

A default template is included to render tasks in the UI -- use the following in your templates (check the template to see which context variables to use):

{% include 'taskstate/task_list.html' %}

The above template will merely render a list of tasks, however, to check/monitor the statuses of those tasks include the default script in your HTML before the closing body tag:

<script src="{% static 'taskstate/get_task_status.js' %}" charset="utf-8"></script>

Updating progress percentage of a Task

See the task_progress_example.py file in the examples directory in the root of the repo for an example of how to update the progress of a task. Note that some of the details there are specific to Dramatiq itself.

Seen status of a Task

A task can only be marked as seen when it is complete. The seen status of a set of tasks can be set through another django-channels route:

/ws/set-task-seen/

Sending a list of task ID's to this route will automatically mark all completed tasks from the list of ID's as seen. This is handled in the default JS script -- therefore, check the get_task_status.js file for an example.

There is also an APS (Advanced Python Scheduler) periodic task that will delete tasks older than 120 seconds for tasks that have been seen and have a "final/completed" status like "skipped", "failed" or "done". To add the cleanup_tasks periodic job to APS:

from django.conf import settings
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from taskstate.tasks import cleanup_tasks


scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler.add_job(
    cleanup_tasks.send,
    trigger=CronTrigger(second='*/240'), # Every 240 seconds
    max_instances=1,
    replace_existing=True,
)

Task statuses

  • enqueued
  • delayed
  • running
  • failed
  • done
  • skipped

Get all completed tasks

completed_tasks = Task.objects.completed()

Get tasks for display

To get all the tasks that have been recently seen and that have not been seen (including currently active tasks), use the following:

tasks = Task.objects.for_display()

This will show tasks that have been seen in the last 30 seconds. To only show tasks seen in the last 15 seconds use the following:

tasks = Task.objects.for_display(seconds_since_seen=15)

Management commands

The clear_tasks management command will delete all Task objects currently in the database irrespective of status.

python manage.py clear_tasks

Compatibility

  • Python 3.6+
  • Django 3.2+
  • Only supports PostgreSQL because django.contrib.postgres.fields.ArrayField is used. This could be looked at in future.

Versioning

This project follows semantic versioning (SemVer).

License and code of conduct

Check the root of the repo for these files.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dramatiq-taskstate-0.3.4.tar.gz (25.0 kB view details)

Uploaded Source

Built Distribution

dramatiq_taskstate-0.3.4-py3-none-any.whl (29.0 kB view details)

Uploaded Python 3

File details

Details for the file dramatiq-taskstate-0.3.4.tar.gz.

File metadata

  • Download URL: dramatiq-taskstate-0.3.4.tar.gz
  • Upload date:
  • Size: 25.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/29.0 requests/2.22.0 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.2 importlib-metadata/4.6.4 keyring/18.0.1 rfc3986/1.5.0 colorama/0.4.3 CPython/3.8.10

File hashes

Hashes for dramatiq-taskstate-0.3.4.tar.gz
Algorithm Hash digest
SHA256 199b881a6df7d2ee29303a6dde37a876f94aff86d8d445af323aa0650b5d2fff
MD5 4582e2a1bbc67ea660bfc4d5d60aa9b9
BLAKE2b-256 a50d48a750ba2740e76180b4c2505aced564baa9ca7554d0911dc702da995188

See more details on using hashes here.

File details

Details for the file dramatiq_taskstate-0.3.4-py3-none-any.whl.

File metadata

  • Download URL: dramatiq_taskstate-0.3.4-py3-none-any.whl
  • Upload date:
  • Size: 29.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/29.0 requests/2.22.0 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.2 importlib-metadata/4.6.4 keyring/18.0.1 rfc3986/1.5.0 colorama/0.4.3 CPython/3.8.10

File hashes

Hashes for dramatiq_taskstate-0.3.4-py3-none-any.whl
Algorithm Hash digest
SHA256 e9ab1a0e6d18c8b102b1d2530bb7600d2ae9480a4ab22182412ded2c4b048acd
MD5 f4cce09338d5e273d62bb15c03cd8991
BLAKE2b-256 95ec2306470245b401d7893b955eb36acbbcb3b5810fd7f4ab5ea63a07f5721d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page