Skip to main content

Apache Airflow Provider for Onehouse

Project description

Apache Airflow Provider for Onehouse

PyPI version Build Status

This is the Apache Airflow provider for Onehouse. It provides operators and sensors for managing Onehouse resources through Apache Airflow.

Requirements

  • Apache Airflow >= 2.9.2
  • Python >= 3.10

Installation

You can install this provider package via pip:

pip install apache-airflow-providers-onehouse

Configuration

  1. Set up an Airflow connection with the following details:

    • Connection Id: onehouse_default (or your custom connection id)
    • Connection Type: Generic
    • Host: https://api.onehouse.ai
    • Extra: Configure the following JSON:
      {
        "project_uid": "your-project-uid",
        "user_id": "your-user-id",
        "api_key": "your-api-key",
        "api_secret": "your-api-secret",
        "link_uid": "your-link-uid",
        "region": "your-region"
      }
      

Usage

Basic Example DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow_providers_onehouse.operators.clusters import (
    OnehouseCreateClusterOperator,
    OnehouseDeleteClusterOperator,
)
from airflow_providers_onehouse.operators.jobs import (
    OnehouseCreateJobOperator,
    OnehouseRunJobOperator,
    OnehouseDeleteJobOperator,
)
from airflow_providers_onehouse.sensors.onehouse import OnehouseJobRunSensor, OnehouseCreateClusterSensor

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

cluster_name = "cluster_1"
job_name = "job_1"

bucket_name = "bucket-name"
job_path = "s3a://{bucket_name}/path/to/hello_world_job.py"
venv_path = "s3a://{bucket_name}/path/to/venv.tar.gz"

with DAG(
        dag_id="example_dag",
        default_args=default_args,
        description="Example DAG",
        schedule_interval=None,
        start_date=datetime(2025, 4, 28),
        catchup=False,
        tags=["onehouse", "example", "dag"],
) as dag:

    create_cluster = OnehouseCreateClusterOperator(
        task_id="create_onehouse_cluster",
        cluster_name=cluster_name,
        cluster_type="Spark",
        max_ocu=1,
        min_ocu=1,
        conn_id="onehouse_default",
    )

    wait_for_cluster_ready = OnehouseCreateClusterSensor(
        task_id="wait_for_cluster_ready",
        cluster_name="{{ ti.xcom_pull(task_ids='create_onehouse_cluster') }}",
        conn_id="onehouse_default",
        poke_interval=30,
        timeout=60 * 30,
    )

    create_onehouse_job = OnehouseCreateJobOperator(
        task_id="create_onehouse_job",
        job_name=job_name,
        job_type="PYTHON",
        parameters=[
            "--conf", f"spark.archives={venv_path}#environment",
            "--conf", "spark.pyspark.python=./environment/bin/python",
            job_path,
        ],
        cluster_name=cluster_name,
        conn_id="onehouse_default",
    )

    run_onehouse_job = OnehouseRunJobOperator(
        task_id="run_onehouse_job",
        job_name=job_name,
        conn_id="onehouse_default",
    )

    wait_for_job = OnehouseJobRunSensor(
        task_id="wait_for_job_completion",
        job_name=job_name,
        job_run_id="{{ ti.xcom_pull(task_ids='run_onehouse_job') }}",
        conn_id="onehouse_default",
        poke_interval=30,
        timeout=60 * 60,
    )

    delete_onehouse_job = OnehouseDeleteJobOperator(
        task_id="delete_onehouse_job",
        job_name=job_name,
        conn_id="onehouse_default",
    )

    delete_onehouse_cluster = OnehouseDeleteClusterOperator(
        task_id="delete_onehouse_cluster",
        cluster_name=cluster_name,
        conn_id="onehouse_default",
    )

    (
            create_cluster
            >> wait_for_cluster_ready
            >> create_onehouse_job
            >> run_onehouse_job
            >> wait_for_job
            >> delete_onehouse_job
            >> delete_onehouse_cluster
    ) 

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apache_airflow_providers_onehouse-0.2.4.tar.gz (10.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file apache_airflow_providers_onehouse-0.2.4.tar.gz.

File metadata

File hashes

Hashes for apache_airflow_providers_onehouse-0.2.4.tar.gz
Algorithm Hash digest
SHA256 c442f11638851ed6933e02fe244b58e3e59301fc7d656cbd080e74363f0ee7de
MD5 94707fe8e5dd6bfc83370046f294f2ea
BLAKE2b-256 37bf9e7912cc29feadb18bb3420835d8ff683ac4f4990f95ac503628c9fc7a8d

See more details on using hashes here.

File details

Details for the file apache_airflow_providers_onehouse-0.2.4-py3-none-any.whl.

File metadata

File hashes

Hashes for apache_airflow_providers_onehouse-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 49265ee08112846937c32d43fab68fc53b4c490a4d0724784ac86f4b6d852f19
MD5 c9ba7c8d1394341a97b74b96be995320
BLAKE2b-256 321a5ceee052a2528397a6f7c0558146756a5b427d5aad38c0e9de6e30c0b3ff

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page