Skip to main content

Python library for dataflow programming with Amazon SWF

Project description

==========
simpleflow
==========

.. image:: https://badge.fury.io/py/simpleflow.png
:target: http://badge.fury.io/py/simpleflow

.. image:: https://travis-ci.org/botify-labs/simpleflow.png?branch=master
:target: https://travis-ci.org/botify-labs/simpleflow

.. image:: https://pypip.in/d/simpleflow/badge.png
:target: https://crate.io/packages/simpleflow?version=latest


Simple Flow is a Python library that provides abstractions to write programs in
the `distributed dataflow paradigm
<https://en.wikipedia.org/wiki/Distributed_data_flow>`_. It relies on futures
to describe the dependencies between tasks. It coordinates the execution of
distributed tasks with Amazon `SWF <https://aws.amazon.com/swf/>`_.

A ``Future`` object models the asynchronous execution of a computation that may
end.

It tries to mimics the interface of the Python `concurrent.futures
<http://docs.python.org/3/library/concurrent.futures>`_ library.

Features
--------

- Provides a ``Future`` abstraction to define dependencies between tasks.
- Define asynchronous tasks from callables.
- Handle workflows with Amazon SWF.
- Implement replay behavior like the Amazon Flow framework.
- Handle retry of tasks that failed.
- Automatically register decorated tasks.
- Handle the completion of a decision with more than 100 tasks.
- Provides a local executor to check a workflow without Amazon SWF (see
``simpleflow --local`` command).
- Provides decider and activity worker process for execution with Amazon SWF.
- Ships with the ``simpleflow`` command. ``simpleflow --help`` for more information
about the commands it supports.

Quickstart
----------

Let's take a simple example that computes the result of ``(x + 1) * 2``. You
will find this example in ``examples/basic.py``.

We need to declare the functions as activities to make them available:

.. code:: python

from simpleflow import (
activity,
Workflow,
futures,
)

@activity.with_attributes(task_list='quickstart', version='example')
def increment(x):
return x + 1

@activity.with_attributes(task_list='quickstart', version='example')
def double(x):
return x * 2

@activity.with_attributes(task_list='quickstart', version='example')
def delay(t, x):
time.sleep(t)
return x

And then define the workflow itself in a ``example.py`` file:

.. code:: python

class BasicWorkflow(Workflow):
name = 'basic'
version = 'example'
task_list = 'example'

def run(self, x, t=30):
y = self.submit(increment, x)
yy = self.submit(delay, t, y)
z = self.submit(double, y)

print('({x} + 1) * 2 = {result}'.format(
x=x,
result=z.result))
futures.wait(yy, z)
return z.result

Now check that the workflow works locally with an integer "x" and a wait value "t"::

$ simpleflow workflow.start --local examples.basic.BasicWorkflow --input '[1, 5]'
(1 + 1) * 2 = 4

*input* is encoded in JSON format and can contain the list of *positional*
arguments such as ``'[1, 1]`` or a *dict* with the ``args`` and ``kwargs`` keys
such as ``{"args": [1], "kwargs": {}}``, ``{"kwargs": {"x": 1}}``, or
``'{"args": [1], "kwargs": {"t": 5}}'```.

Now that you are confident that the workflow should work, you can run it on
Amazon SWF with the ``standalone`` command::

$ simpleflow standalone --domain TestDomain examples.basic.BasicWorkflow --input '[1, 5]'

The *standalone* command sets an unique task list and manage all the processes
that are needed to execute the workflow: decider, activity worker, and a client
that starts the workflow. It is very convenient for testing a workflow by
executing it with SWF during the development steps or integration tests.

Let's take a closer look to the workflow definition.

It is a *class* that inherits from ``simpleflow.Workflow``:

.. code:: python

class BasicWorkflow(Workflow):

It defines 3 class attributes:

- *name*, the name of the SWF workflow type.
- *version*, the version of the SWF workflow type. It is currently provided
only for labeling a workflow.
- *task_list*, the default task list (see it as a dynamically created queue)
where decision tasks for this workflow will be sent. Any *decider* that
listens on this task list can handle this workflow. This value can be
overrided by the simpleflow commands and objects.

It also implements the ``run`` method that takes two arguments: ``x`` and
``t=30`` (i.e. ``t`` is optional and has the default value ``30``). These
arguments are passed with the ``--input`` option. The ``run`` method
describes the workflow and how its tasks should execute.

Each time a decider takes a decision task, it executes again the ``run``
from the start. When the workflow execution starts, it evaluates ``y =
self.submit(increment, x)`` for the first time. *y* holds a future in state
``PENDING``. The execution continues with the line ``yy = self.submit(delay, t,
y)``. *yy* holds another future in state ``PENDING``. This state means the task
has not been scheduled. Now execution still continue in the ``run`` method
with the line ``z = self.submit(double, y)``. Here it needs the value of the
*y* future to evaluate the ``double`` activity. As the execution cannot
continues, the decider schedules the task ``increment``. *yy* is not a
dependency for any task so it is not scheduled.

Once the decider has scheduled the task for *y*, it sleeps and waits for an
event to be waken up. This happens when the ``increment`` task completes.
SWF schedules a decision task. A decider takes it and executes the
``BasicWorkflow.run`` method again from the start. It evalues the line ``y
= self.submit(increment, x)``. The task associated with the *y* future has
completed. Hence *y* is in state ``FINISHED`` and contains the value ``2`` in
``y.result``. The execution continues until it blocks. It goes by ``yy =
self.submit(delay, t, y)`` that stays the same. Then it reaches ``z =
self.submit(double, y)``. It gets the value of ``y.result`` and *z* now holds a
future in state ``PENDING``. Execution reaches the line with the ``print``. It
blocks here because ``z.result`` is not available. The decider schedules the
task backs by the *z* future: ``double(y)``. The workflow execution continues
so forth by evaluating the ``BasicWorkflow.run`` again from the start until
it finishes.

Commands
--------

Overview
~~~~~~~~

Please read and even run the ``demo`` script to have a quick glance of
``simpleflow`` commands. To run the ``demo`` you will need to start decider
and activity worker processes.

Start a decider with::

$ simpleflow decider.start --domain TestDomain --task-list test examples.basic.BasicWorkflow

Start an activity worker with::

$ simpleflow worker.start --domain TestDomain --task-list quickstart

Then execute ``./extras/demo``.

Controlling SWF access
~~~~~~~~~~~~~~~~~~~~~~

The SWF region is controlled by the environment variable ``AWS_DEFAULT_REGION``. This variable
comes from the legacy "simple-workflow" project. The option might be exposed through a
``--region`` option in the future (if you want that, please open an issue).

The SWF domain is controlled by the ``--domain`` on most simpleflow commands. It can also
be set via the ``SWF_DOMAIN`` environment variable. In case both are supplied, the
command-line value takes precedence over the environment variable.

Note that some simpleflow commands expect the domain to be passed as a positionnal argument.
In that case the environment variable has no effect for now.

The number of retries for accessing SWF can be controlled via ``SWF_CONNECTION_RETRIES``
(defaults to 5).

List Workflow Executions
~~~~~~~~~~~~~~~~~~~~~~~~

$ simpleflow workflow.list TestDomain
basic-example-1438722273 basic OPEN

Workflow Execution Status
~~~~~~~~~~~~~~~~~~~~~~~~~

$ simpleflow --header workflow.info TestDomain basic-example-1438722273
domain workflow_type.name workflow_type.version task_list workflow_id run_id tag_list execution_time input
TestDomain basic example basic-example-1438722273 22QFVi362TnCh6BdoFgkQFlocunh24zEOemo1L12Yl5Go= 1.70 {u'args': [1], u'kwargs': {}}

Tasks Status
~~~~~~~~~~~~

You can check the status of the workflow execution with::

$ simpleflow --header workflow.tasks DOMAIN WORKFLOW_ID [RUN_ID] --nb-tasks 3
$ simpleflow --header workflow.tasks TestDomain basic-example-1438722273
Tasks Last State Last State Time Scheduled Time
examples.basic.increment scheduled 2015-08-04 23:04:34.510000 2015-08-04 23:04:34.510000
$ simpleflow --header workflow.tasks TestDomain basic-example-1438722273
Tasks Last State Last State Time Scheduled Time
examples.basic.double completed 2015-08-04 23:06:19.200000 2015-08-04 23:06:17.738000
examples.basic.delay completed 2015-08-04 23:08:18.402000 2015-08-04 23:06:17.738000
examples.basic.increment completed 2015-08-04 23:06:17.503000 2015-08-04 23:04:34.510000

Profiling
~~~~~~~~~

You can profile the execution of the workflow with::

$ simpleflow --header workflow.profile TestDomain basic-example-1438722273
Task Last State Scheduled Time Scheduled Start Time Running End Percentage of total time
activity-examples.basic.double-1 completed 2015-08-04 23:06 0.07 2015-08-04 23:06 1.39 2015-08-04 23:06 1.15
activity-examples.basic.increment-1 completed 2015-08-04 23:04 102.20 2015-08-04 23:06 0.79 2015-08-04 23:06 0.65


Controlling log verbosity
~~~~~~~~~~~~~~~~~~~~~~~~~

You can control log verbosity via the ``LOG_LEVEL`` environment variable. Default is ``INFO``. For instance,
the following command will start a decider with ``DEBUG`` logs:

$ LOG_LEVEL=DEBUG simpleflow decider.start --domain TestDomain --task-list test examples.basic.BasicWorkflow


Documentation
-------------

Full documentation (work-in-progress) is available at
https://simpleflow.readthedocs.org/.

Requirements
------------

- Python 2.6.x or 2.7.x
- Python 3.x compatibility is NOT guaranteed for now: https://github.com/botify-labs/simpleflow/issues/87


Development
-----------

A ``Dockerfile`` is provided to help development on non-Linux machines.

You can build a ``simpleflow`` image with:

./script/docker-build

And use it with:

./script/docker-run

It will then mount your current directory inside the container and pass the
most relevant variables (your AWS_* credentials for instance).


Running tests
~~~~~~~~~~~~~

You can run tests with:

./script/test

Any parameter passed to this script is propagated to the underlying call to ``py.test``.
This wrapper script sets some environment variables which control the behavior of
simpleflow during tests:

- ``SIMPLEFLOW_CLEANUP_PROCESSES``: set to ``"yes"`` in tests, so tests will clean up child
processes after each test case. You can set it to an empty string (``""``) or omit it if
outside ``script/test`` if you want to debug things and take care of it yourself.
- ``SIMPLEFLOW_ENV``: set to ``"test"`` in tests, which changes some constants to ease or
speed up tests.
- ``SWF_CONNECTION_RETRIES``: set to ``"1"`` in tests, which avoids having too many retries
on the SWF API calls (5 by default in production).
- ``SIMPLEFLOW_VCR_RECORD_MODE``: set to ``"none"`` in tests, which avoids running requests
against the real SWF endpoints in tests. If you need to update cassettes, see
``tests/integration/README.md``


Release
-------

In order to release a new version, you'll need credentials on pypi.python.org for this
software, as long as write access to this repository. Ask via an issue if needed.
Rough process:

git checkout master
git pull --rebase
v=0.10.0
vi simpleflow/__init__.py
git add . && git commit -m "Bump version to $v"
git tag $v
git push --tags
python setup.py sdist upload -r pypi


License
-------

MIT licensed. See the bundled `LICENSE <https://github.com/botify-labs/simpleflow/blob/master/LICENSE>`__ file for more details.


======================
Python Simple Workflow
======================

.. image:: https://travis-ci.org/botify-labs/python-simple-workflow.png?branch=develop

python-simple-workflow is a wrapper for `Amazon Simple Workflow <http://aws.amazon.com/en/swf/>`_ service.
It aims to provide some abstractions over the webservice concepts through `Boto <https://boto.readthedocs.org/en/latest/ref/swf.html>`_ library Swf api implementation.

It aims to provide:

* **Modelisation**: Swf entities and concepts are to be manipulated through `Models <http://test.com>`_ and `QuerySets <http://test.com>`_ (any ressemblance with the `Django <http://test.com>`_ api would not be a coincidence).
* **High-level Events, History**: A higher level of abstractions over Swf *events* and *history*. Events are implemented as stateful objects aware of their own state and possible transitions. History enhance the events flow description, and can be compiled to check it's integrity and the activities statuses transitions.
* **Decisions**: Stateful abstractions above the Swf decision making system.
* **Actors**: Swf actors base implementation such as a `Decider <http://test.com>`_ or an activity task processor `Worker <http://test.com>`_ from which the user can easily inherit to implement it's own decision/processing model.

It provides querysets and model objects over commonly used concepts: domains, workflow types, activity types, and so on.

It is under MIT license, and any ideas, features requests, patches, pull requests to improve it are of course welcome.

Installation
============

.. code-block:: shell

pip install simple-workflow


Usage and all the rest
======================

Please, refer to `Documentation <http://python-simple-workflow.readthedocs.org>`_


What's left?
============

Amazon interface models implementation:
✔ Domain @done (13-04-02 10:01)
✔ Workflow Type @done (13-04-02 10:01)
✔ Workflow Execution @done (13-04-05 10:13)
☐ Activity Type
☐ Decider

Amazon interface querysets implementation:
✔ DomainQuery @done (13-04-02 10:02)
✔ WorkflowTypeQuery @done (13-04-02 10:03)
✔ Workflow Execution @done (13-04-05 10:13)
☐ Activity Type
☐ Decider

General:
☐ Add sphinx doc
☐ Document real world example
☐ TESTS TESTS TESTS!


Changelog
---------

0.12.4
~~~~~~

- Avoid returning too big responses to RespondDecisionTaskCompleted endpoint (#166)
- Worker: remove useless monitor_child (#168)

0.12.3
~~~~~~

- Add max_parallel option in Group (#164)

0.12.2
~~~~~~

- Make the dynamic dispatcher more flexible (#161)
- Fix README.rst format (#160)
- Tiny command-line usability fixes (#158)

0.12.1
~~~~~~

- Don't override passed "default" in json_dumps() (#155)
- Expose activity context (#156)

0.12.0
~~~~~~

- Improve process management (#142)

0.11.17
~~~~~~~

- Don't reap children in the back of multiprocessing (#141)
- Don't force to pass a workflow to activity workers (#133)
- Don't override the task list if not standalone (#139)
- Split FuncGroup submit (#146)
- CI: Test on python 3 (#144)
- Decider: use workflow's task list if unset (#148)

0.11.16
~~~~~~~

- Refactor: cleanups and many python 3 compatibility issues fixed (#135)
- Introduce AggregationException to inspect exceptions inside canvas.Group/Chain (#92)
- Improve heartbeating, now enabled by default on activity workers (#136)

0.11.15
~~~~~~~

- Fix tag_list declaration in case no tag is associated with the workflow
- Fix listing workflow tasks not handling "scheduled" (not started) tasks correctly
- Fix CSV formatter outputing an extra "None" at the end of the output
- Fix 'simpleflow activity.rerun' resolving the bad function name if not the last event

0.11.14
~~~~~~~

- Various little fixes around process management, heartbeat, logging (#110)

0.11.13
~~~~~~~

- Add ability to provide a 'run ID' with 'simpleflow standalone --repair'

0.11.12
~~~~~~~

- Fix --tags argument for simpleflow standalone (#114)
- Improve tests and add integration tests (#116)
- Add 'simpleflow activity.rerun' command (#117)

0.11.11
~~~~~~~

- Fix a circular import on simpleflow.swf.executor

0.11.10
~~~~~~~

- Fix previous_history initialization (#106)
- Improve WorkflowExecutionQueryset default date values (#111)

0.11.9
~~~~~~

- Add a --repair option to simpleflow standalone (#100)

0.11.8
~~~~~~

- Retry boto.swf connection to avoid frequent errors when using IAM roles (#99)

0.11.7
~~~~~~

Same as 0.11.6 but the 0.11.6 on pypi is broken (pushed something similar to 0.11.5 by mistake)

0.11.6
~~~~~~

- Add ``issubclass_`` method (#96)
- Avoid duplicate logs if root logger has an handler (#97)
- Allow passing SWF domain via the SWF_DOMAIN environment variable (#98)

0.11.5
~~~~~~

- Don't mask activity cancel exception (#84)
- Propagate all decision response attributes up to Executor.replay() (#76, #94)

0.11.4
~~~~~~

- ISO dates in workflow history #91
- Fix potential infinite retry loop #90

0.11.3
~~~~~~

- Fix replay hooks introduced in 0.11.2 (#86)
- Remove python3 compatibility from README (which was not working for a long time)

0.11.2
~~~~~~

- Add new workflow hooks (#79)

0.11.1
~~~~~~

- Fix logging when an exception occurs

0.11.0
~~~~~~

- Merge ``swf`` package into simplefow for easier maintenance.


0.10.4 and below
~~~~~~~~~~~~~~~~

Sorry changes were not documented for simpleflow <= 0.10.x.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simpleflow-0.12.4.tar.gz (116.9 kB view details)

Uploaded Source

File details

Details for the file simpleflow-0.12.4.tar.gz.

File metadata

  • Download URL: simpleflow-0.12.4.tar.gz
  • Upload date:
  • Size: 116.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for simpleflow-0.12.4.tar.gz
Algorithm Hash digest
SHA256 be58c53c45daf3bb04d698efc70c6885b54ec90ae7d9a8dd59a769d657739a0e
MD5 d1c956941dc73da31b66aa4cca66f0a3
BLAKE2b-256 71e103ec67f173c385780cb814b5b2d7d7a5255295a444d19169107d3d9b6793

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page