A middleware for Dramatiq (for Django) that keeps track of task state only when you need it to.
Project description
dramatiq-taskstate
A middleware for Dramatiq (for Django) that keeps track of task state only when you need it to.
Note:
When using the term "task" in the documentation: that would generally refer
to the task model in this package. It has nothing to do with Dramatiq or the
django_dramatiq
package except that the Task
model is an abstraction of
a Dramatiq task. Therefore, this package only operates on the Task
model
and not Dramatiq tasks.
Quickstart
-
Install dramatiq-taskstate via pip:
pip install dramatiq-taskstate
-
Add
taskstate
anddjango.contrib.postgres
to yourINSTALLED_APPS
in your project settings.py file:INSTALLED_APPS = [ 'django_dramatiq', '...', 'django.contrib.postgres', '...', 'taskstate', ]
-
Run migrate:
python manage.py migrate
-
Include the middleware in your
django_dramatiq
configuration:DRAMATIQ_BROKER = { 'BROKER': '...', 'OPTIONS': { # ... }, 'MIDDLEWARE': [ # ... 'taskstate.middleware.StateMiddleware', ] }
-
Add a
for_state
parameter to Dramatiq actors that need task state to be tracked. The middleware will ignore any tasks that don't have this argument. Also remember that all values in thefor_state
dictionary must be JSON serializable.@dramatiq.actor def my_actor(arg, for_state={}): pass
-
Then, when sending a new task to Dramatiq: the
for_state
dictionary can contain any of the following keys:'for_state': { 'user_pk': user.pk, 'model_name': 'model', 'app_name': 'app', 'description': 'description', }
-
Each time a task's status is updated a
task_changed
signal is dispatched which can be handled like this:from django.dispatch import receiver from taskstate.middleware import StateMiddleware from taskstate.signals import task_changed @receiver(task_changed, sender=StateMiddleware) def handle_task_changed(sender, task, **kwargs): pass
Keep in mind that this is not a
post_save
signal -- it only fires for status updates.
Reporting task state to the UI
Of course, a common case with background tasks is that the progress/state of a
task needs to be displayed to a user somehow. This package includes a
WebsocketConsumer
that can be used with django-channels to check the
status of a task. Check the flowchart in the root of the repo for more
information on how this works.
Check the get_task_status.js
file in the taskstate/static
directory for
an example of how to send a request via websockets to get/monitor the task
status/progress. Also, as shown in the flowchart in the root of the repo,
both task_changed
-- which only handles when the task object's status is
updated, i.e, enqueued, done, etc. -- and post_save
signals are handled.
Routing is included for django-channels. Make sure to use the URLRouter for your django-channels configuration. You can send data for the websocket to the following route:
/ws/get-task-status/
Or, create your own route:
from django.urls import re_path
from taskstate.consumers import CheckTaskStatus
websocket_urlpatterns = [
re_path(r'^ws/custom-route-task-status/$', CheckTaskStatus.as_asgi()),
]
Also remember to add the routes to your django-channels router, for example:
application = ProtocolTypeRouter({
'http': django_asgi_app,
'websocket': AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter(
taskstate.routing.websocket_urlpatterns,
)
),
),
})
A default template is included to render tasks in the UI -- use the following in your templates (check the template to see which context variables to use):
{% include 'taskstate/task_list.html' %}
The above template will merely render a list of tasks, however, to check/monitor the statuses of those tasks include the default script in your HTML before the closing body tag:
<script src="{% static 'taskstate/get_task_status.js' %}" charset="utf-8"></script>
Updating progress percentage of a Task
See the task_progress_example.py
file in the examples directory in the root of
the repo for an example of how to update the progress of a task. Note that
some of the details there are specific to Dramatiq itself.
Seen status of a Task
A task can only be marked as seen when it is complete. The seen status of a set of tasks can be set through another django-channels route:
/ws/set-task-seen/
Sending a list of task ID's to this route will automatically mark all
completed tasks from the list of ID's as seen. This is handled in the default
JS script -- therefore, check the get_task_status.js
file for an example.
There is also an APS (Advanced Python Scheduler) periodic task that will
delete tasks older than 120 seconds for tasks that have been seen and
have a "final/completed" status like "skipped", "failed" or "done". To add
the cleanup_tasks
periodic job to APS:
from django.conf import settings
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from taskstate.tasks import cleanup_tasks
scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler.add_job(
cleanup_tasks.send,
trigger=CronTrigger(second='*/240'), # Every 240 seconds
max_instances=1,
replace_existing=True,
)
Task statuses
- enqueued
- delayed
- running
- failed
- done
- skipped
Get all completed tasks
completed_tasks = Task.objects.completed()
Get tasks for display
To get all the tasks that have been recently seen and that have not been seen (including currently active tasks), use the following:
tasks = Task.objects.for_display()
This will show tasks that have been seen in the last 30 seconds. To only show tasks seen in the last 15 seconds use the following:
tasks = Task.objects.for_display(seconds_since_seen=15)
Management commands
The clear_tasks
management command will delete all Task
objects currently
in the database irrespective of status.
python manage.py clear_tasks
Compatibility
- Python 3.6+
- Django 3.2+
- Only supports PostgreSQL because
django.contrib.postgres.fields.ArrayField
is used. This could be looked at in future.
Versioning
This project follows semantic versioning (SemVer).
License and code of conduct
Check the root of the repo for these files.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file dramatiq-taskstate-0.3.4.tar.gz
.
File metadata
- Download URL: dramatiq-taskstate-0.3.4.tar.gz
- Upload date:
- Size: 25.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/29.0 requests/2.22.0 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.2 importlib-metadata/4.6.4 keyring/18.0.1 rfc3986/1.5.0 colorama/0.4.3 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 199b881a6df7d2ee29303a6dde37a876f94aff86d8d445af323aa0650b5d2fff |
|
MD5 | 4582e2a1bbc67ea660bfc4d5d60aa9b9 |
|
BLAKE2b-256 | a50d48a750ba2740e76180b4c2505aced564baa9ca7554d0911dc702da995188 |
File details
Details for the file dramatiq_taskstate-0.3.4-py3-none-any.whl
.
File metadata
- Download URL: dramatiq_taskstate-0.3.4-py3-none-any.whl
- Upload date:
- Size: 29.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/29.0 requests/2.22.0 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.2 importlib-metadata/4.6.4 keyring/18.0.1 rfc3986/1.5.0 colorama/0.4.3 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e9ab1a0e6d18c8b102b1d2530bb7600d2ae9480a4ab22182412ded2c4b048acd |
|
MD5 | f4cce09338d5e273d62bb15c03cd8991 |
|
BLAKE2b-256 | 95ec2306470245b401d7893b955eb36acbbcb3b5810fd7f4ab5ea63a07f5721d |