Skip to main content

Oozie To Airflow migration tool

Project description

Oozie to Airflow

Build Status codecov Code style: black License Updates Python 3

A tool to easily convert between Apache Oozie workflows and Apache Airflow workflows.

The program targets Apache Airflow >= 1.10 and Apache Oozie 1.0 XML schema.

If you want to contribute to the project, please take a look at CONTRIBUTING.md

Table of Contents

Created by gh-md-toc

Background

Apache Airflow is a workflow management system developed by AirBnB in 2014. It is a platform to programmatically author, schedule, and monitor workflows. Airflow workflows are designed as Directed Acyclic Graphs (DAGs) of tasks in Python. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies.

Apache Oozie is a workflow scheduler system to manage Apache Hadoop jobs. Oozie workflows are also designed as Directed Acyclic Graphs (DAGs) in XML.

There are a few differences noted below:

Spec. Task Dependencies "Subworkflows" Parameterization Notification
Oozie XML Action Node Control Node Subworkflow EL functions/Properties file URL based callbacks
Airflow Python Operators Trigger Rules, set_downstream() SubDag jinja2 and macros Callbacks/Emails

Running the Program

Required Python Dependencies

Additionally the shell script included in the directory, init.sh, can be executed to set up the dependencies and have your local machine ready to convert the examples.

# Allow init.sh to execute
$ chmod +x init.sh
# Execute init.sh
$ ./init.sh

Adding bin directory to your PATH

You can add the bin subdirectory to your PATH, then all the scripts below can be run without adding bin path.

You can do it for example by adding similar line to your .bash_profile or bin/postactivate from your virtual environment:

export PATH=${PATH}:<INSERT_PATH_TO_YOUR_OOZIE_PROJECT>/bin

Otherwise you need to run them from the bin subdirectory - prepending it with the path, for example:

./bin/o2a --help

In all the examples below it is assumed that the bin directory is in your PATH.

Running the conversion

You can run the program (minimally) by calling: o2a -i <INPUT_APPLICATION_FOLDER> -o <OUTPUT_FOLDER_PATH>

Example: o2a -i examples/demo -o output/demo

This is the full usage guide, available by running o2a -h

usage: o2a [-h] -i INPUT_DIRECTORY_PATH -o OUTPUT_DIRECTORY_PATH [-d DAG_NAME]
           [-u USER] [-s START_DAYS_AGO] [-v SCHEDULE_INTERVAL]

Convert Apache Oozie workflows to Apache Airflow workflows.

optional arguments:
  -h, --help            show this help message and exit
  -i INPUT_DIRECTORY_PATH, --input-directory-path INPUT_DIRECTORY_PATH
                        Path to input directory
  -o OUTPUT_DIRECTORY_PATH, --output-directory-path OUTPUT_DIRECTORY_PATH
                        Desired output directory
  -d DAG_NAME, --dag-name DAG_NAME
                        Desired DAG name [defaults to input directory name]
  -u USER, --user USER  The user to be used in place of all ${user.name}
                        [defaults to user who ran the conversion]
  -s START_DAYS_AGO, --start-days-ago START_DAYS_AGO
                        Desired DAG start as number of days ago
  -v SCHEDULE_INTERVAL, --schedule-interval SCHEDULE_INTERVAL
                        Desired DAG schedule interval as number of days

Structure of the application folder

The application folder has to follow the structure defined as follows:

<APPLICATION>/
             |- job.properties        - job properties that are used to run the job
             |- hdfs                  - folder with application - should be copied to HDFS
             |     |- workflow.xml    - Oozie workflow xml (1.0 schema)
             |     |- ...             - additional folders required to be copied to HDFS
             |- configuration.template.properties - template of configuration values used during conversion
             |- configuration.properties          - generated properties for configuration values

Supported Oozie features

Control nodes

Fork

A fork node splits the path of execution into multiple concurrent paths of execution.

Join

A join node waits until every concurrent execution of the previous fork node arrives to it. The fork and join nodes must be used in pairs. The join node assumes concurrent execution paths are children of the same fork node.

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <fork name="[FORK-NODE-NAME]">
        <path start="[NODE-NAME]" />
        ...
        <path start="[NODE-NAME]" />
    </fork>
    ...
    <join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
    ...
</workflow-app>

Decision

A decision node enables a workflow to make a selection on the execution path to follow.

The behavior of a decision node can be seen as a switch-case statement.

A decision node consists of a list of predicates-transition pairs plus a default transition. Predicates are evaluated in order or appearance until one of them evaluates to true and the corresponding transition is taken. If none of the predicates evaluates to true the default transition is taken.

Predicates are JSP Expression Language (EL) expressions (refer to section 4.2 of this document) that resolve into a boolean value, true or false . For example: ${fs:fileSize('/usr/foo/myinputdir') gt 10 * GB}

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <decision name="[NODE-NAME]">
        <switch>
            <case to="[NODE_NAME]">[PREDICATE]</case>
            ...
            <case to="[NODE_NAME]">[PREDICATE]</case>
            <default to="[NODE_NAME]"/>
        </switch>
    </decision>
    ...
</workflow-app>

Start

The start node is the entry point for a workflow job, it indicates the first workflow node the workflow job must transition to.

When a workflow is started, it automatically transitions to the node specified in the start .

A workflow definition must have one start node.

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
  ...
  <start to="[NODE-NAME]"/>
  ...
</workflow-app>

End

The end node is the end for a workflow job, it indicates that the workflow job has completed successfully.

When a workflow job reaches the end it finishes successfully (SUCCEEDED).

If one or more actions started by the workflow job are executing when the end node is reached, the actions will be killed. In this scenario the workflow job is still considered as successfully run.

A workflow definition must have one end node.

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <end name="[NODE-NAME]"/>
    ...
</workflow-app>

Kill

The kill node allows a workflow job to exit with an error.

When a workflow job reaches the kill it finishes in error (KILLED).

If one or more actions started by the workflow job are executing when the kill node is reached, the actions will be killed.

A workflow definition may have zero or more kill nodes.

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <kill name="[NODE-NAME]">
        <message>[MESSAGE-TO-LOG]</message>
    </kill>
    ...
</workflow-app>

Known Limitations

The goal of this program is to mimic both the actions and control flow that is outlined by the Oozie workflow file. Unfortunately there are some limitations as of now that have not been worked around regarding the execution flow. The situation where the execution path might not execute correctly is when there are 4 nodes, A, B, C, D, with the following Oozie specified execution paths:

A executes ok to C
B executes error to C

A executes error to D
B executes ok to D

In this situation Airflow does not have enough fine grained node execution control. The converter should be able to handle this situation in the future, but it is not currently guaranteed to work.

This is because if goes from A to C on ok, and B goes to C on error, C's trigger rule will have to be set to DUMMY, but this means that if A goes to error, and B goes to ok C will then execute incorrectly.

This limitation is temporary and will be removed in a future version of Oozie-2-Airflow converter.

EL Functions

As of now, a very minimal set of Oozie EL functions are supported. The way they work is that there exists a dictionary mapping from each Oozie EL function string to the corresponding Python function. This is in utils/el_utils.py. This design allows for custom EL function mapping if one so chooses. By default everything gets mapped to the module o2a_libs. This means in order to use EL function mapping, the folder o2a_libs should be copied over to the Airflow DAG folder. This should then be picked up and parsed by the Airflow workers and then available to all DAGs.

Examples

All examples can be found in the examples directory.

Demo Example

The demo example contains several action and control nodes. The control nodes are fork, join, decision, start, end, and kill. As far as action nodes go, there are fs, map-reduce, and pig.

Most of these are already supported, but when the program encounters a node it does not know how to parse, it will perform a sort of "skeleton transformation" - it will convert all the unknown nodes to dummy nodes. This will allow users to manually parse the nodes if they so wish as the control flow is there.

The demo can be run as:

o2a -i examples/demo -o output/demo

This will parse and write to an output file in the output/demo directory.

Current limitations

The decision node is not fully functional as there is not currently support for all EL functions. So in order for it to run in Airflow you must edit the Python output file and change the decision node expression.

Output

In this example the output will appear in /output/ssh/test_demo_dag.py. Additionally subworkflow is generated in /output/ssh/subdag_test.py.

Childwf Example

The childwf example is sub-workflow for the demo example. It can be run as:

o2a -i examples/childwf -o output/childwf

Make sure to first copy examples/subwf/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will appear in output/childwf/test_childwf_dag.py.

Current limitations

No known limitations.

SSH Example

The ssh example can be run as:

o2a -i examples/ssh -o output/ssh

This will convert the specified Oozie XML and write the output into the specified output directory, in this case output/ssh/test_ssh_dag.py.

There are some differences between Apache Oozie and Apache Airflow as far as the SSH specification goes. In Airflow you will have to add/edit an SSH-specific connection that contains the credentials required for the specified SSH action. For example, if the SSH node looks like:

<action name="ssh">
    <ssh xmlns="uri:oozie:ssh-action:0.1">
        <host>user@apache.org</host>
        <command>echo</command>
        <args>"Hello Oozie!"</args>
    </ssh>
    <ok to="end"/>
    <error to="fail"/>
</action>

Then the default Airflow SSH connection, ssh_default should have at the very least a password set. This can be found in the Airflow Web UI under Admin > Connections. From the command line it is impossible to edit connections so you must add one like:

airflow connections --add --conn_id <SSH_CONN_ID> --conn_type SSH --conn_password <PASSWORD>

More information can be found in Airflow's documentation.

Output

In this example the output will appear in /output/ssh/test_ssh_dag.py.

The converted DAG uses the SSHOperator in Airflow.

Current limitations

No known limitations.

MapReduce Example

The MapReduce example can be run as:

o2a -i examples/mapreduce -o output/mapreduce

Make sure to first copy examples/mapreduce/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will appear in /output/mapreduce/test_mapreduce_dag.py.

The converted DAG uses the DataProcHadoopOperator in Airflow.

Current limitations

1. Exit status not available

From the Oozie documentation:

The counters of the Hadoop job and job exit status (FAILED, KILLED or SUCCEEDED) must be available to the workflow job after the Hadoop jobs ends. This information can be used from within decision nodes and other actions configurations.

Currently we use the DataProcHadoopOperator which does not store the job exit status in an XCOM for other tasks to use.

2. Configuration options

From the Oozie documentation (the strikethrough is from us):

Hadoop JobConf properties can be specified as part of

  • the config-default.xml or
  • JobConf XML file bundled with the workflow application or
  • <global> tag in workflow definition or
  • Inline map-reduce action configuration or
  • An implementation of OozieActionConfigurator specified by the tag in workflow definition.

Currently the only supported way of configuring the map-reduce action is with the inline action configuration, i.e. using the <configuration> tag in the workflow's XML file definition.

3. Streaming and pipes

Streaming and pipes are currently not supported.

FS Example

The FS example can be run as:

o2a -i examples/fs -o output/fs

Make sure to first copy examples/fs/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will appear in /output/fs/test_fs_dag.py.

The converted DAG uses the BashOperator in Airflow.

Current limitations

Not all FS operations are currently idempotent. This will be fixed.

Pig Example

The Pig example can be run as:

o2a -i examples/pig -o output/pig

Make sure to first copy examples/pig/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will appear in output/pig/test_pig_dag.py.

The converted DAG uses the DataProcPigOperator in Airflow.

Current limitations

1. Configuration options

From the Oozie documentation (the strikethrough is from us):

Hadoop JobConf properties can be specified as part of

  • the config-default.xml or
  • JobConf XML file bundled with the workflow application or
  • <global> tag in workflow definition or
  • Inline pig action configuration.

Currently the only supported way of configuring the pig action is with the inline action configuration, i.e. using the <configuration> tag in the workflow's XML file definition.

Shell Example

The Shell example can be run as:

o2a -i examples/shell -o output/shell

Make sure to first copy examples/shell/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will appear in output/shell/test_shell_dag.py.

The converted DAG uses the BashOperator in Airflow, which executes the desired shell action with Pig by invoking gcloud dataproc jobs submit pig --cluster=<cluster> --region=<region> --execute 'sh <action> <args>'.

Current limitations

1. Exit status not available

From the Oozie documentation:

The output (STDOUT) of the Shell job can be made available to the workflow job after the Shell job ends. This information could be used from within decision nodes.

Currently we use the BashOperator which can store only the last line of the job output in an XCOM. In this case the line is not helpful as it relates to the Dataproc job submission status and not the Shell action's result.

2. No Shell launcher configuration

From the Oozie documentation:

Shell launcher configuration can be specified with a file, using the job-xml element, and inline, using the configuration elements.

Currently there is no way specify the shell launcher configuration (it is ignored).

Spark Example

The Shell example can be run as:

o2a -i examples/spark -o output/spark

Make sure to first copy /examples/spark/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will appear in /output/spark/spark.py.

The converted DAG uses the DataProcSparkOperator in Airflow.

Current limitations

1. Ony tasks written in Java are supported

From the Oozie documentation:

The jar element indicates a comma separated list of jars or python files.

The solution was tested with only a single Jar file.

2. No Spark launcher configuration

From the Oozie documentation:

Shell launcher configuration can be specified with a file, using the job-xml element, and inline, using the configuration elements.

Currently there is no way to specify the Spark launcher configuration (it is ignored).

3. Not all elements are supported

The following elements are not supported: job-tracker, name-node, master, mode.

Sub-workflow Example

The Sub-workflow example can be run as:

o2a -i examples/subwf -o output/subwf

Make sure to first copy examples/subwf/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will appear in output/subwf/test_subwf_dag.py. Additionally, a subdag_test.py (name to be changed soon) file is generated in the same directory, which contains the factory method sub_dag() returning the actual Airflow subdag.

The converted DAG uses the SubDagOperator in Airflow.

Current limitations

Currently generated name of the sub-workflow is fixed which means that only one subworkflow is supported per DAG folder. This will be fixed soon.

Decision Example

The decision example can be run as:

o2a -i examples/decision -o output/decision

Make sure to first copy examples/decision/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will appear in output/decision/test_decision_dag.py.

The converted DAG uses the BranchPythonOperator in Airflow.

Current limitations

Decision example is not yet fully functional as EL functions are not yet fully implemented so condition is hard-coded for now. Once EL functions are implemented, the condition in the example will be updated.

EL Example

The Oozie Expression Language (EL) example can be run as: o2a -i examples/el -o output/el

This will showcase the ability to use the o2a_libs directory to map EL functions to Python methods. This example assumes that the user has a valid Apache Airflow SSH connection set up and the o2a_libs directory has been copied to the dags folder.

Please keep in mind that as of the current version only a single EL variable or single EL function. Variable/function chaining is not currently supported.

Output

In this example the output will appear in output/el/test_el_dag.py.

Current limitations

Decision example is not yet fully functional as EL functions are not yet fully implemented so condition is hard-coded for now. Once EL functions are implemented, the condition in the example will be updated.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

o2a-potiuk-0.0.1.tar.gz (61.3 kB view hashes)

Uploaded Source

Built Distribution

o2a_potiuk-0.0.1-py3-none-any.whl (111.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page