Skip to main content

Yet Another Workflow Engine, a subprocess-based DAG execution system

Project description

YAWN provides a framework for executing a set of shell commands with dependencies in a distributed manner and on a repeating schedule. Other tools do similar things and are inspirations for this one; particularly Celery and Airflow.

https://circleci.com/gh/aclowes/yawn/tree/master.svg?style=svg https://codecov.io/gh/aclowes/yawn/branch/master/graph/badge.svg

Principle Differences

YAWN is inspired by, but different from Celery and Airflow because it:

  • Runs each task in a separate subprocess, like Airflow but unlike Celery, which avoids polution of a shared python interpreter and makes memory usage easier to reason about.

  • Uses PostgreSQL as the message broker and database, alleviating the need for a separate broker like Redis or RabbitMQ. This avoids the visibility timeout issue when using Redis as a Celery broker. YAWN uses the new SELECT ... FOR UPDATE SKIP LOCKED statement to efficiently select from the queue table.

  • Stores the command, environment variables, stdout and stderror for each task execution, so its easier to see the logs and history of what happened. Re-running a task does not overwrite the previous run.

  • Does not support inputs or outputs other than the command line and environment variables, with the intention that client applications should handle state instead.

Components

Web Server

The website provides a user interface to view the workflows and tasks running within them. It allows you to run an existing workflow or re-run a failed task. The web server also provides a REST API to remotely create and run workflows.

Worker

The worker schedules and executes tasks. The worker uses subprocess.Popen to run tasks and capture stdout and stderr.

Concepts

Workflow

A set of Tasks that can depend on each other, forming what is popularly known as a directed acyclic graph (DAG). Workflows can be scheduled to run on a regular basis and they are versioned so they can change over time.

Run

An instance of a workflow, manually triggered or scheduled.

Task

A shell command that specifies the upstream tasks it depends on, the number times to retry, and a timeout. The task is given environment variables configured in the workflow and run.

Execution

A single execution of a Task’s command, capturing the exit code and standard output and error.

Queue

A first-in, first-out list of Tasks to execute.

Worker

A process that reads from a set of Queues and executes the associated Tasks, recording the results in an Execution.

Installation

To get started using YAWN:

# install the package - someone has yawn :-(
pip install yawns

# install postgres and create the yawn database
# the default settings localhost and no password
createdb yawn

# setup the tables by running db migrations
yawn migrate

# create a user to login with
yawn createsuperuser

# create some sample workflows
yawn examples

# start the webserver on port 8000
yawn webserver

# open a new terminal and start the worker
yawn worker

Here is a screenshot of the page for a single workflow:

https://cloud.githubusercontent.com/assets/910316/21969288/fe40baf0-db51-11e6-97f2-7e6875c1e575.png

REST API

Browse the API by going to http://127.0.0.1:8000/api/ in a browser.

When creating a workflow, the format is (shown as YAML for readability):

name: Example
parameters:
  ENVIRONMENT: production
  CALCULATION_DATE: 2017-01-01
schedule: 0 0 *
schedule_active: True

tasks:
- name: task_1
  queue: default
  max_retries: 1
  timeout: 30
  command: python my_awesome_program.py $ENVIRONMENT
- name: task_2
  queue: default
  command: echo $CALCULATION_DATE | grep 2017
  upstream:
  - task_1
/api/workflows/

GET a list of versions or a single workflow version. POST to create or update a workflow using the schema show above. PATCH to change the schedule, schedule_active, or parameters fields only.

  • POST - use the schema shown above

  • PATCH {"schedule_active": false}

/api/runs/

GET a list of runs, optionally filtering to a particular workflow using ?workflow=<id>. POST to create a new run. PATCH to change the parameters.

  • POST {"workflow_id": 1, "parameters": null}

  • PATCH {"parameters": {"ENVIRONMENT": "test"}}

/api/tasks/<id>/

GET a single task from a workflow run, and its executions with their status and logging information. PATCH to enqueue a task or kill a running execution.

  • PATCH {"enqueue": true}

  • PATCH {"terminate": <execution_id>}

Python API

Import and use the Django models to create your workflow:

from yawn.workflow.models import WorkflowName
from yawn.task.models import Template

name, _ = WorkflowName.objects.get_or_create(name='Simple Workflow Example')
workflow = name.new_version(parameters={'MY_OBJECT_ID': '1', 'SOME_SETTING': 'false'})
task1 = Template.objects.create(workflow=workflow, name='start', command='echo Starting...')
task2 = Template.objects.create(workflow=workflow, name='task2', command='echo Working on $MY_OBJECT_ID')
task2.upstream.add(task1)
task3 = Template.objects.create(workflow=workflow, name='task3',
                                command='echo Another busy thing && sleep 20')
task3.upstream.add(task1)
task4 = Template.objects.create(workflow=workflow, name='done', command='echo Finished!')
task4.upstream.add(task2, task3)

workflow.submit_run(parameters={'child': 'true'})

Alternatively, use the serializer to give tasks as a dictionary in the format used by the API. This method checks if a version of the Workflow exists with the same structure, and will return the existing version if so:

from yawn.workflow.serializers import WorkflowSerializer

serializer = WorkflowSerializer(data=test_views.data())
serializer.is_valid(raise_exception=True)
workflow = serializer.save()
workflow.submit_run()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

yawns-0.1.4.tar.gz (2.5 MB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page