Skip to main content

Apache Airflow Provider for Onehouse

Project description

Apache Airflow Provider for Onehouse

PyPI version Build Status

This is the Apache Airflow provider for Onehouse. It provides operators and sensors for managing Onehouse resources through Apache Airflow.

Requirements

  • Apache Airflow >= 2.9.2
  • Python >= 3.10

Installation

You can install this provider package via pip:

pip install apache-airflow-providers-onehouse

Configuration

  1. Set up an Airflow connection with the following details:

    • Connection Id: onehouse_default (or your custom connection id)
    • Connection Type: Generic
    • Host: https://api.onehouse.ai
    • Extra: Configure the following JSON:
      {
        "project_uid": "your-project-uid",
        "user_id": "your-user-id",
        "api_key": "your-api-key",
        "api_secret": "your-api-secret",
        "link_uid": "your-link-uid",
        "region": "your-region"
      }
      

Usage

Basic Example DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow_providers_onehouse.operators.clusters import (
    OnehouseCreateClusterOperator,
    OnehouseDeleteClusterOperator,
)
from airflow_providers_onehouse.operators.jobs import (
    OnehouseCreateJobOperator,
    OnehouseRunJobOperator,
    OnehouseDeleteJobOperator,
)
from airflow_providers_onehouse.sensors.onehouse import OnehouseJobRunSensor, OnehouseCreateClusterSensor

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

cluster_name = "cluster_1"
job_name = "job_1"

bucket_name = "bucket-name"
job_path = "s3a://{bucket_name}/path/to/hello_world_job.py"
venv_path = "s3a://{bucket_name}/path/to/venv.tar.gz"

with DAG(
        dag_id="example_dag",
        default_args=default_args,
        description="Example DAG",
        schedule_interval=None,
        start_date=datetime(2025, 4, 28),
        catchup=False,
        tags=["onehouse", "example", "dag"],
) as dag:

    create_cluster = OnehouseCreateClusterOperator(
        task_id="create_onehouse_cluster",
        cluster_name=cluster_name,
        cluster_type="Spark",
        max_ocu=1,
        min_ocu=1,
        conn_id="onehouse_default",
    )

    wait_for_cluster_ready = OnehouseCreateClusterSensor(
        task_id="wait_for_cluster_ready",
        cluster_name="{{ ti.xcom_pull(task_ids='create_onehouse_cluster') }}",
        conn_id="onehouse_default",
        poke_interval=30,
        timeout=60 * 30,
    )

    create_onehouse_job = OnehouseCreateJobOperator(
        task_id="create_onehouse_job",
        job_name=job_name,
        job_type="PYTHON",
        parameters=[
            "--conf", f"spark.archives={venv_path}#environment",
            "--conf", "spark.pyspark.python=./environment/bin/python",
            job_path,
        ],
        cluster_name=cluster_name,
        conn_id="onehouse_default",
    )

    run_onehouse_job = OnehouseRunJobOperator(
        task_id="run_onehouse_job",
        job_name=job_name,
        conn_id="onehouse_default",
    )

    wait_for_job = OnehouseJobRunSensor(
        task_id="wait_for_job_completion",
        job_name=job_name,
        job_run_id="{{ ti.xcom_pull(task_ids='run_onehouse_job') }}",
        conn_id="onehouse_default",
        poke_interval=30,
        timeout=60 * 60,
    )

    delete_onehouse_job = OnehouseDeleteJobOperator(
        task_id="delete_onehouse_job",
        job_name=job_name,
        conn_id="onehouse_default",
    )

    delete_onehouse_cluster = OnehouseDeleteClusterOperator(
        task_id="delete_onehouse_cluster",
        cluster_name=cluster_name,
        conn_id="onehouse_default",
    )

    (
            create_cluster
            >> wait_for_cluster_ready
            >> create_onehouse_job
            >> run_onehouse_job
            >> wait_for_job
            >> delete_onehouse_job
            >> delete_onehouse_cluster
    ) 

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apache_airflow_providers_onehouse-0.2.5.tar.gz (10.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file apache_airflow_providers_onehouse-0.2.5.tar.gz.

File metadata

File hashes

Hashes for apache_airflow_providers_onehouse-0.2.5.tar.gz
Algorithm Hash digest
SHA256 674aac8baa883d49e99a4a534513615fea608c36fe3d438b4c62befc2cb001ac
MD5 40f4a9ef44e519573a2ca4ef01452c41
BLAKE2b-256 37f929c13e205bca5edd6f573da3aa68ed16099cf2122ad80f894ea5ec2c2ff7

See more details on using hashes here.

File details

Details for the file apache_airflow_providers_onehouse-0.2.5-py3-none-any.whl.

File metadata

File hashes

Hashes for apache_airflow_providers_onehouse-0.2.5-py3-none-any.whl
Algorithm Hash digest
SHA256 9c246c025d7dc8afc9672ea1f3e3772abbcd688f2b07dca52d4bf7b4107f8ccd
MD5 91f4507d2a90a47b0c3f5ddb3a9bbbce
BLAKE2b-256 2635b402f879eec10ce98781f432213797eef9ff01956016fcc51e10cd13c907

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page