Skip to main content

Celery tasks for Django made easy

Project description

Gonk

gonk

Setup

Install the library:

pip install gonk

You can add contribution add-ons:

For Mercure support:

pip install gonk[mercure]

For Django Rest Framework support:

pip install gonk[drf]

Or both of them:

pip install gonk[drf,mercure]

Add the application to INSTALLED_APPS in Django settings:

INSTALLED_APPS = [
    # ...
    'gonk',
]

Launch migrations:

python manage.py migrate

Usage

Create taskrunner

# taskrunners.py
from gonk.taskrunners import TaskRunner
from gonk.decorators import register, register_beat
from celery.schedules import crontab


# Register taskrunner
@register('my_taskrunner')
class MyTaskRunner(TaskRunner):
    def revert(self):
        # Specific implementation
    
    def run(self):
        # Specific implementation


# Register scheduled taskrunner
@register_beat('scheduled_taskrunner', crontab(minute='*'))
class ScheduledTaskRunner(TaskRunner):
    def revert(self):
        # Specific implementation
    
    def run(self):
        # Specific implementation

We have to import the taskrunner within every app. The best way to do so is in apps.py

class MyAppConfig(AppConfig):
    # ...

    def ready(self):
        from . import taskrunners

Launch task

from gonk.tasks import Task

args = {}
Task.create_task('my_taskrunner', args)

Revert task

from gonk.tasks import Task

t = Task.objects.last()
t.revert()

Cancel task

from gonk.tasks import Task

t = Task.objects.last()
terminate: bool = False
t.cancel(terminate=terminate)

Checkpoints

You can add checkpoints to register transcendent events within the task. Every checkpoint can send a notification to the user to get feedback of the status and progress of the task.

# taskrunners.py
from gonk.taskrunners import TaskRunner


class MyTaskRunner(TaskRunner):
    def run(self):
        # Specific implementation
        self.task.log_status('STARTED', checkpoint=False)
        self.task.log_status('Checkpoint 1', checkpoint=True)
        self.task.log_status('FINISHED')

Command to list registered taskrunners

We can list the registered taskrunner with the command list_taskrunners.

python manage.py list_taskrunners

Command to launch tasks manually

We can create tasks using the command create_tasks.

python manage.py create_task --help
usage: manage.py create_task [-h] [--input INPUT] [--raw-input RAW_INPUT] [--queue QUEUE] [--when WHEN] [--version] [-v {0,1,2,3}] [--settings SETTINGS] [--pythonpath PYTHONPATH] [--traceback] [--no-color] [--force-color]
                             [--skip-checks]
                             task_type

positional arguments:
  task_type             Task type identifier

options:
  -h, --help            show this help message and exit
  --input INPUT         File input -- can be redirected from standard output
  --raw-input RAW_INPUT
                        Raw string input -- Must be in json format
  --queue QUEUE         Celery queue name in which the task will be run
  --when WHEN           Scheduled task run date -- ISO Format

Examples:

python manage.py create_task <task_type> --raw-input='{}'
cat file.json | python manage.py create_task <task_type> --queue="celery" --input -

Setup

Environment variable Type Description
KEEP_TASK_HISTORY_DAYS int Number of days to keep the tasks
DEFAULT_NOTIFICATION_EMAIL str Default e-mail to notify

Django Rest Framework

To use Django Rest Framework extension we have to install with the drf extra.

In our project urls.py we have to add the Gonk urls:

from django.urls import path, include

urlpatterns = [
    # ...
    path('tasks/', include('gonk.contrib.rest_framework.urls')),
]

Notifications with Mercure

To use Mercure extension we have to install with the mercure extra.

To send notifications with Mercure we have to setup the following environment variables:

Variable Type Description
MERCURE_HUB_URL str Mercure service URL
MERCURE_JWT_KEY str Mercure's JWT Token to publish events
# taskrunners.py
from gonk.taskrunners import TaskRunner
from gonk.contrib.notifications.mercure import MercureNotificationMixin


class MyTaskRunner(MercureNotificationMixin, TaskRunner):
    # Specific implementation

Development

Clone repository

git clone git@github.com:kasfactory/gonk.git && cd gonk

Install poetry

pip install poetry

Install dependencies

poetry install

Run docker-compose

docker-compose up -d

Launch celery worker

poetry run celery -A test_app worker

Launch celery beat

poetry run celery -A test_app beat

At this point, we have to ensure that gonk.tasks.to_run, gonk.tasks.to_revert and gonk.tasks.to_schedule tasks are detected

Credits

Authors

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gonk-0.6.1.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

gonk-0.6.1-py3-none-any.whl (19.1 kB view details)

Uploaded Python 3

File details

Details for the file gonk-0.6.1.tar.gz.

File metadata

  • Download URL: gonk-0.6.1.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.1 CPython/3.9.19 Linux/5.15.0-1059-azure

File hashes

Hashes for gonk-0.6.1.tar.gz
Algorithm Hash digest
SHA256 2cbcd46ad08cad491ae6ded7f87fca3ef2f80e359cb06037e824482292cc817e
MD5 9ce8d245235b4b3b8326e4a09505f28f
BLAKE2b-256 cb05a247cb0da6d3ccb49b2a80c381a905b38ae392124b8a735b29067082583d

See more details on using hashes here.

File details

Details for the file gonk-0.6.1-py3-none-any.whl.

File metadata

  • Download URL: gonk-0.6.1-py3-none-any.whl
  • Upload date:
  • Size: 19.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.1 CPython/3.9.19 Linux/5.15.0-1059-azure

File hashes

Hashes for gonk-0.6.1-py3-none-any.whl
Algorithm Hash digest
SHA256 145983dbcced1c0b77fa8a6e34b59e650367131d99a4d278708fd854320fc30a
MD5 35d2aa5d8a159f95fff5ec5b3ce21478
BLAKE2b-256 c354cf246955536def8940b3fd1e55224bd9dcb5d9750866378d898a83c10bc4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page