Skip to main content

Declarative workflows for celery.

Project description

.. highlight:: python

.. currentmodule:: capillary


:Status: Alpha (API feedback is welcome, API could break compatibility)
:Generated: |today|
:Version: |release|
:License: BSD 3 Clause
:Authors: Domen Kožar and Aaron McMillin

.. topic:: Introduction

:mod:`capillary` is a small integration package for
the :mod:`celery` Distributed Task Queue with an aim of designing
`workflows (Canvas) <>`_
in `a declarative manner <>`_
using Python decorators.

The main reason why :mod:`capillary` exists is to get rid of manual
tracking how celery tasks depend on each other.

:mod:`capillary` executes in two phases:

1. Scans for all Celery tasks in defined Python packages
2. Executes tasks based on metadata passed to :func:`@pipeline` decorators

:mod:`capillary` uses:

- :mod:`venusian` to discover Celery tasks using deferred decorators
- :mod:`networkx` to handle the graph operations for tracking
dependencies between tasks in the workflow

User Guide

.. _simple-example:

Simple Example

Below is the `first steps with Celery <>`_
tutorial expanded with :mod:`capillary` integration.

The ```` module contains the steps of the pipeline. Steps are
marked with the ``@pipeline`` decorator, which has optional parameters
to indicate that this step must be executed after some other step or
that the step has certain tags to allow groups of tasks to be executed together.

.. code:: python

from capillary import pipeline

def foo(celery_task):
return 'simple task flow'

def bar(celery_task, l):
return l.upper()

The ```` module then creates a :class:`PipelineConfigurator`
instance which will assemble the declared steps:

.. code:: python

from celery import Celery
from capillary import PipelineConfigurator

import tasks

app = Celery('tasks', broker='redis://', backend='redis://')
pc = PipelineConfigurator(app)

Start the worker with

.. code:: bash

$ celery worker -A myapp -D

and execute the pipeline in a Python shell:

.. code:: python

>>> from myapp import pc
>>> asyncresult =
>>> asyncresult.get()

*This example will be used throughout the user guide as a base.*

.. note::

This example assumes the Redis broker is running with default settings, but any
`Celery broker <>`_
will do.

``backend`` is defined only for retrieving the result using `.get()`, it is
otherwise not required.

Core concept: Handling input and output parameters

Celery uses a concept called `partials <>`_
(sometimes also known as `Currying <>`_) to
create function `signatures <>`_.

:mod:`capillary` reuses these concepts to execute tasks. A value returned
from task ``foo`` is passed into task ``bar``.

It is possible to pass extra parameters to specific tasks as described in

Core concept: Tagging pipelines

By default :meth:`` will execute all scanned tasks
without tags in topological order.

If ``tags=['foobar']`` is passed to :func:`@pipeline`, the task will be run
when ```tagged_as=['foobar']`` is passed to :meth:``.

See :ref:`predefined_defaults` for information on how to reduce boilerplate and
group pipelines per tag.

Aborting the Pipeline

If a step needs to stop the current pipeline (meaning no further tasks
are processed in the pipeline), just raise :exc:`capillary.AbortPipeline`
anywhere in your pipeline tasks.

.. _extra-parameters:

Passing extra parameters to a specific task

Some :func:`@pipeline` elements might require extra arguments that are only
known when :meth:`` is called.

.. code:: python

>>> @pipeline(
... required_kwarg_names=['param'],
... )
... def foobar(celery_task, param=None):
... print param
... return 'simple task flow'

When :meth:`` is called, it will need `param` passed inside
`required_kwargs`; otherwise :exc:`MissingArgument` will be thrown.

Applying multiple :func:`@pipeline` decorators

The most typical use case where two :func:`@pipeline` decorators are useful is when
you'd like to reuse a function for two different pipelines each differently tagged.

.. code:: python

after=['first', 'second'],
def foobar(celery_task):
return 'simple task flow'

Executing ``['some_pipeline'])``
would run the `foobar` function as a task after `first` and `second` tasks were done.

However executing ``['other_pipeline'])``
would run the `foobar` function after `third` task was done.

.. note::

If both tags are used (e.g. ``['some_pipeline', 'other_pipeline'])``)
then ordering of tags specified matters and the latter will override a former.

if you specify a different `name` parameter for each, they will be both executed.

.. _predefined_defaults:

Create pipelines based on predefined defaults

Often :func:`@pipeline` definitions will repeat arguments through your
application. :func:`make_pipeline_from_defaults` allows you to create customized
predefined defaults for a pipeline. This example makes a ``foobar_pipeline``
decorator that will apply the same tag to each step:

.. code:: python

>>> from capillary import make_pipeline_from_defaults
>>> foobar_pipeline = make_pipeline_from_defaults(
>>> tags=["foobar"]
>>> )

Then use ``@foobar_pipeline`` just as one would use :func:`@pipeline` while all your
definitions will have `foobar` as a tag.

.. note::

Passing ``tags`` to ``@foobar_pipeline`` will override ``["foobar"]`` value.

Printing the task tree

To actually see what kind of canvas will be executed call
:meth:`ConfigurePipeline.prettyprint` with the same arguments as

.. code:: python

>>> pc.prettyprint(args=[], kwargs={}) |

The very last task in the pipeline

Using a constant :class:`capillary.ALL` it's possible to declare a task
as the last one in the pipeline

.. code:: python

>>> from capillary import ALL, pipeline
>>> @pipeline(
... after=ALL,
... )
... def last(celery_task, obj):
... print('ALL DONE!')
... return obj

.. note::

Multiple tasks with `after=ALL` steps will be run
in :class:`` as the last part of the pipeline.

Inner workings of :meth:``

The following is a quick summary of what happens inside :meth:``:

- task tree is generated using dependency information
- Celery signatures are created
- task tree is reduced into a `chain <>`_
using topological sort
- tasks is executed using :meth:``

.. note::

Currently the task tree is reduced into a linear chained list of tasks, but
in future different "runners" could be implemented.

Unit Testing

Functions marked as :func:`@pipeline` elements are still just simple untouched functions,
until :meth:`PipelineConfigurator.scan()` is called. If function code doesn't
depend on the first argument of ``celery_task``, just pass `None` as the value.

To unit test our two pipeline elements from :ref:`simple-example`:

.. code:: python

class PipelineTestCase(unittest.TestCase):

def test_bar(self):
self.assertEquals(bar(None, 'test'), 'TEST')

def test_foo(self):
self.assertEquals(foo(None), 'simple task flow')


To run tests install `py.test` and run it:

.. code:: bash

$ py.test tests/

Features to be considered

- Using a lot of tasks with large objects passed as arguments can be quite
storage intensive. One alternative would be to generate signatures on-the-fly
if Celery permits that.

API Reference

.. automodule:: capillary
:exclude-members: PipelineConfigurator

.. autoclass:: capillary.PipelineConfigurator
:members: run, prettyprint, scan

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

capillary-0.0.1.tar.gz (11.1 kB view hashes)

Uploaded source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page