Skip to main content

Celery Tasktree module

Project description

celery-tasktree is a module which helps to execute trees of celery tasks asynchronously in a particular order. Tasktree comes to the rescue when a number of tasks and dependencies grows and when naive callback-based approach becomes hard to understand and maintain.

Usage sample

from tasktree import task_with_callbacks, TaskTree

@task_with_callbacks
def some_action(...):
    ...

def execute_actions():
    tree = TaskTree()
    task0 = tree.add_task(some_action, args=[...], kwargs={...})
    task1 = tree.add_task(some_action, args=[...], kwargs={...})
    task10 = task1.add_task(some_action, args=[...], kwargs={...})
    task11 = task1.add_task(some_action, args=[...], kwargs={...})
    task110 = task11.add_task(some_action, args=[...], kwargs={...})
    async_result = tree.apply_async()
    return async_result

Decorator named task_with_callbacks should be used instead of simple celery task decorator.

According to the code:

  • task0 and task1 are executed simultaniously

  • task10 and task11 are executed simultaniously after task1

  • task110 is executed after task11

Things to be noted:

  • There is no way to stop propagation of the execution and there is no way to pass extra arguments from ancestor to child task. In short, there in only one kind of dependency between tasks: the dependency of execution order.

  • If subtask (function) return value is an object, then a property named “async_result” will be added to that object so that it will be possible to join() for. To extend previous example:

    async_result = execute_actions()
    task0_result, task1_result = async_result.join()
    task10_result, task11_result = task1_result.async_result.join()
    task110_result = task11_result.async_result.join()

Task with callbacks outside TaskTree

task_with_callbacks decorator can be useful in itself. It decorates functions the same way as ordinary task celery decorator does, but also adds an optional callback parameter.

Callback can be a subtask or a list of subtasks (not the TaskSet). Behind the scene, when a task with callback is invoked, it executes function’s main code, then builds a TaskSet, invokes it asynchronously and attaches the TaskSetResut as the attribute named async_result to function’s return value.

Simple example is provided below:

from tasktree import task_with_callbacks

@task_with_callbacks
def some_action(...):
    ...

cb1 = some_action.subtask(...)
cb2 = some_action.subtask(...)
async_result = some_action.delay(..., callback=[cb1, cb2])
main_result = async_result.wait()
cb1_result, cb2_result = main_result.async_result.join()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery-tasktree-0.2.tar.gz (2.5 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page