Skip to main content

No project description provided

Project description

Pentaho Airflow plugin

Build Status codecov PyPI PyPI - Downloads

This plugins runs Jobs and Transformations through Carte servers. It allows to orchestrate a massive number of trans/jobs taking care of the dependencies between them, even between different instances. This is done by using CarteJobOperator and CarteTransOperator

It also runs Pan (transformations) and Kitchen (Jobs) in local mode, both from repository and local XML files. For this approach, use KitchenOperator and PanOperator

Requirements

  1. A Apache Airflow system deployed.
  2. One or many working PDI CE installations.
  3. A Carte server for Carte Operators.

Setup

The same setup process must be performed on webserver, scheduler and workers (that runs this tasks) to get it working. If you want to deploy specific workers to run this kind of tasks, see Queues, in Airflow Concepts section.

Pip package

First of all, the package should be installed via pip install command.

pip install airflow-pentaho-plugin

Airflow connection

Then, a new connection needs to be added to Airflow Connections, to do this, go to Airflow web UI, and click on Admin -> Connections on the top menu. Now, click on Create tab.

Use HTTP connection type. Enter the Conn Id, this plugin uses pdi_default by default, the username and the password for your Pentaho Repository.

At the bottom of the form, fill the Extra field with pentaho_home, the path where your pdi-ce is placed, and rep, the repository name for this connection, using a json formatted string like it follows.

{
    "pentaho_home": "/opt/pentaho",
    "rep": "Default"
}

Carte

In order to use CarteJobOperator, the connection should be set different. Fill host (including http:// or https://) and port for Carte hostname and port, username and password for PDI repository, and extra as it follows.

{
    "rep": "Default",
    "carte_username": "cluster",
    "carte_password": "cluster"
}

Usage

CarteJobOperator

CarteJobOperator is responsible for running jobs in remote slave servers. Here it is an example of CarteJobOperator usage.

# For versions before 2.0
# from airflow.operators.airflow_pentaho import CarteJobOperator

from airflow_pentaho.operators.carte import CarteJobOperator

# ... #

# Define the task using the CarteJobOperator
avg_spent = CarteJobOperator(
    conn_id='pdi_default',
    task_id="average_spent",
    job="/home/bi/average_spent",
    params={"date": "{{ ds }}"},  # Date in yyyy-mm-dd format
    dag=dag)

# ... #

some_task >> avg_spent >> another_task

KitchenOperator

Kitchen operator is responsible for running Jobs. Lets suppose that we have a defined Job saved on /home/bi/average_spent in our repository with the argument date as input parameter. Lets define the task using the KitchenOperator.

# For versions before 2.0
# from airflow.operators.airflow_pentaho import KitchenOperator

from airflow_pentaho.operators.kettle import KitchenOperator

# ... #

# Define the task using the KitchenOperator
avg_spent = KitchenOperator(
    conn_id='pdi_default',
    queue="pdi",
    task_id="average_spent",
    directory="/home/bi",
    job="average_spent",
    params={"date": "{{ ds }}"},  # Date in yyyy-mm-dd format
    dag=dag)

# ... #

some_task >> avg_spent >> another_task

CarteTransOperator

CarteTransOperator is responsible for running transformations in remote slave servers. Here it is an example of CarteTransOperator usage.

# For versions before 2.0
# from airflow.operators.airflow_pentaho import CarteTransOperator

from airflow_pentaho.operators.carte import CarteTransOperator

# ... #

# Define the task using the CarteJobOperator
enriche_customers = CarteTransOperator(
    conn_id='pdi_default',
    task_id="enrich_customer_data",
    job="/home/bi/enrich_customer_data",
    params={"date": "{{ ds }}"},  # Date in yyyy-mm-dd format
    dag=dag)

# ... #

some_task >> enrich_customers >> another_task

PanOperator

Pan operator is responsible for running transformations. Lets suppose that we have one saved on /home/bi/clean_somedata. Lets define the task using the PanOperator. In this case, the transformation receives a parameter that determines the file to be cleaned.

# For versions before 2.0
# from airflow.operators.airflow_pentaho import PanOperator

from airflow_pentaho.operators.kettle import PanOperator

# ... #

# Define the task using the PanOperator
clean_input = PanOperator(
    conn_id='pdi_default',
    queue="pdi",
    task_id="cleanup",
    directory="/home/bi",
    trans="clean_somedata",
    params={"file": "/tmp/input_data/{{ ds }}/sells.csv"},
    dag=dag)

# ... #

some_task >> clean_input >> another_task

For more information, please see sample_dags/pdi_flow.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-pentaho-plugin-1.1.2.tar.gz (15.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_pentaho_plugin-1.1.2-py3-none-any.whl (20.3 kB view details)

Uploaded Python 3

File details

Details for the file airflow-pentaho-plugin-1.1.2.tar.gz.

File metadata

  • Download URL: airflow-pentaho-plugin-1.1.2.tar.gz
  • Upload date:
  • Size: 15.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for airflow-pentaho-plugin-1.1.2.tar.gz
Algorithm Hash digest
SHA256 87ecbddda3c5ee9132a826561b9bbb2e705e4d50e7d7c27c99ac6447b2a6defe
MD5 52ec7ff5fb2a8b6bfe4b24d984a1dec1
BLAKE2b-256 aeb77ed0879294111a208aa858d0a0bb68ba9b0ea91e2e711b06c4ab386c2d41

See more details on using hashes here.

File details

Details for the file airflow_pentaho_plugin-1.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_pentaho_plugin-1.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 cb04228f585d75f04f79bbe2b5bfd0288a9112bbbb959c2026e8086a3f78eec0
MD5 9301fe64db9838951a7c4500badad4b8
BLAKE2b-256 4d47f2ef3e17d7a9f1a0298d8b4b42cb20b39fd38d30177405241991d0216c0e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page