Submission and monitoring of jobs and notebooks using the Yeedu API in Apache Airflow.
Project description
Airflow Yeedu Operator
Note: This version of
airflow-yeedu-operatoris compatible only with Apache Airflow 3.x. Apache Airflow 2.x is no longer supported.
Installation
To install the Yeedu Operator in your Airflow environment, run:
pip3 install airflow-yeedu-operator
Overview
The YeeduOperator enables Apache Airflow users to easily submit and monitor Spark jobs and notebooks in the Yeedu platform. This operator acts as a bridge between Airflow workflows and Yeedu's computing capabilities.
Key Features
- Submit both notebooks and Spark jobs to Yeedu from your Airflow DAGs
- Monitor job progress and completion status in real-time
- Automatically capture and display logs in Airflow UI for easy troubleshooting
- Support for multiple authentication methods (LDAP, AAD, SSO)
Prerequisites
- Apache Airflow 3.x environment
- Valid credentials to interact with the Yeedu API.
- Yeedu Authentication (LDAP, AAD, or SSO)
- Valid certificate for SSL if applicable
Airflow Connection Setup
Step 1: Create Airflow Connection
- In the Airflow UI, go to Admin > Connections
- Click the + Add Connection button to create a new connection
Fill in the following fields:
| Field | Value / Example |
|---|---|
| Conn Id | yeedu_connection |
| Conn Type | HTTP |
| Login | Your LDAP/AAD username (if applicable) |
| Password | Your password (if applicable) |
| Extra | JSON with SSL options (see below) |
Extra JSON Field
{
"YEEDU_AIRFLOW_VERIFY_SSL": "true",
"YEEDU_SSL_CERT_FILE": "/path/to/cert/file"
}
Replace
/path/to/cert/filewith the actual path to your certificate file.
SSO Token Setup (Only for SSO auth)
If your Yeedu authentication method is SSO, follow these steps:
- Go to Admin > Variables
- Click + Add Variable
- Enter:
- Key: e.g.,
yeedu_sso_token - Value: your Yeedu login token
- Key: e.g.,
You will refer to this variable in your DAG using token_variable_name.
Example DAG
DAG Definition
from datetime import datetime, timedelta
from airflow import DAG
from yeedu.operators.yeedu import YeeduOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'yeedu_job_execution',
default_args=default_args,
description='DAG to execute jobs using Yeedu API',
schedule_interval='@once',
catchup=False,
)
Task Configuration
Authentication Options
The Yeedu Operator supports two authentication methods:
Option 1: LDAP / AAD Authentication
For LDAP or Azure Active Directory authentication, provide your credentials in the Airflow connection:
submit_job_task = YeeduOperator(
task_id='ldap_task',
job_url='https://hostname:{restapi_port}/tenant/tenant_id/workspace/workspace_id/spark/notebook/notebook_id',
connection_id='yeedu_connection',
dag=dag,
)
Tip: Copy the Job/Notebook URL directly from the Yeedu UI. Make sure to replace the port in the URL with your actual
restapi_portvalue.
Option 2: SSO Authentication
For Single Sign-On authentication, use a token stored in Airflow Variables:
submit_job_task = YeeduOperator(
task_id='sso_task',
job_url='https://hostname:{restapi_port}/tenant/tenant_id/workspace/workspace_id/spark/notebook/notebook_id',
connection_id='yeedu_connection',
token_variable_name='yeedu_sso_token', # Reference to your Airflow Variable
dag=dag,
)
Important: When using SSO authentication, do not set
LoginandPasswordfields in the Airflow connection.
Quick Start Guide
Follow these steps to get your Yeedu jobs running in Airflow:
- Install the package:
pip3 install airflow-yeedu-operator - Configure authentication: Set up either the connection (for LDAP/AAD) or the token variable (for SSO)
- Create your DAG file: Copy the example code from above and modify it for your specific use case
- Deploy your DAG: Place the DAG file in your Airflow DAGs folder or use the Airflow UI Code Editor
- Verify configuration: Ensure connections and variables are properly set
- Run your DAG: Trigger manually or let it run on schedule
- Monitor execution: Track progress in both Airflow UI and Yeedu UI
Troubleshooting Tip: If you encounter connection issues, verify your SSL settings and credentials first.
Advanced Configuration
The YeeduOperator supports additional parameters for more complex use cases:
Optional Parameters
| Parameter | Description | Example |
|---|---|---|
arguments |
Arguments to pass to the job run | arguments="--input /data/input.csv --output /data/output" |
conf |
Configuration list for Spark job runs (key=value format) | conf=["spark.driver.memory=4g", "spark.executor.memory=8g"] |
cluster_ids |
Fallback cluster IDs if the current notebook execution fails due to any reason (eg. OOM). | cluster_ids=[1,2,3] |
Example with Advanced Configuration
# Example with Spark configuration
spark_job_task = YeeduOperator(
task_id='spark_job_with_config',
job_url='https://hostname:{restapi_port}/tenant/tenant_id/workspace/workspace_id/spark/job/job_id',
connection_id='yeedu_connection',
# Pass arguments to the job
arguments="--date {{ ds }}",
# Set Spark configuration
conf=[
"spark.driver.memory=4g",
"spark.executor.memory=8g",
"spark.executor.instances=2"
],
cluster_ids=[1,2,3],
dag=dag,
)
Looping / Dynamic Task Mapping
The YeeduOperator also supports looping over inputs using dynamic task mapping in Airflow. This is useful when you want to run the same notebook or job multiple times with different inputs.
Example: Iterating Over Inputs
workflow_notebook = YeeduOperator.partial(
task_id='workflow_notebook_iteration',
job_url='https://hostname:{restapi_port}/tenant/tenant_id/workspace/workspace_id/notebook/notebook_id',
connection_id='yeedu_connection',
max_active_tis_per_dag=1,
dag=dag,
trigger_rule=TriggerRule.ALL_SUCCESS
).expand(
loop_input=['1', '2', '3', '4', '5'] # List of values to loop over
)
Explanation
YeeduOperator.partial(...): Creates a template for the task..expand(loop_input=[...]): Dynamically generates multiple task instances with differentloop_inputvalues.max_active_tis_per_dag (optional): If you want to limit parallelism, set this parameter (e.g., max_active_tis_per_dag=2 to allow only 2 iterations at once).
Email Notifications
The package includes an EmailNotificationHook that enables sending styled email notifications about DAG and task statuses via Microsoft Graph API. This is useful for alerting stakeholders about job completions or failures.
Email notifications include:
- Direct links to DAG runs and task logs in the Airflow UI
- Professional HTML styling with color-coded status indicators
- Run information (duration, owner, execution date)
- Error details for failed runs
- Cluster bump information (when applicable)
Setup Email Notifications
-
Required Airflow Variables - Set up these variables for Microsoft Graph authentication:
AIRFLOW_VAR_TENANT_ID: Your Microsoft Azure tenant IDAIRFLOW_VAR_CLIENT_ID: Your Microsoft application client IDAIRFLOW_VAR_CLIENT_SECRET: Your Microsoft application client secretAIRFLOW_VAR_SENDER_EMAIL: Email address that will send notifications
-
Optional Airflow Configuration - For clickable links in emails, configure the base URL:
-
Set
base_urlin the[api]section ofairflow.cfg:[api] base_url = https://your-airflow-domain.com
-
Alternative: Set the Airflow variable
AIRFLOW_VAR_BASE_URL:Key: AIRFLOW_VAR_BASE_URL Value: https://your-airflow-domain.com
Note: If not configured, links will default to
http://localhost:8080 -
-
Use the hook in your DAG:
from yeedu.hooks.email_notification import EmailNotificationHook
def success_callback(context):
email_hook = EmailNotificationHook()
ti = context['task_instance']
# Pull Yeedu URL from XCom (automatically populated by YeeduOperator)
yeedu_url = ti.xcom_pull(key='yeedu_run_url')
email_hook.notify_task(
recipients=["recipient@example.com"],
task_id=ti.task_id,
run_id=context['run_id'],
status="success",
context=context,
extra_info=yeedu_url
)
def failure_callback(context):
email_hook = EmailNotificationHook()
ti = context['task_instance']
# Pull execution details from XCom (automatically populated by YeeduOperator)
yeedu_url = ti.xcom_pull(key='yeedu_run_url')
error_summary = ti.xcom_pull(key='yeedu_error_summary') or str(context.get('exception', ''))
cluster_attempts = ti.xcom_pull(key='yeedu_cluster_attempts')
# Format cluster bump info if clusters were attempted
cluster_info = None
if cluster_attempts:
cluster_info = f"Attempted clusters: {cluster_attempts}"
email_hook.notify_task(
recipients=["recipient@example.com"],
task_id=ti.task_id,
run_id=context['run_id'],
status="failed",
context=context,
extra_info=yeedu_url,
error_summary=error_summary,
cluster_bump_info=cluster_info
)
# Add callbacks to your DAG
dag = DAG(
'yeedu_job_execution',
default_args=default_args,
on_success_callback=success_callback,
on_failure_callback=failure_callback,
# other DAG parameters
)
Parameters
| Parameter | Description | Example |
|---|---|---|
context |
Airflow context for links and detailed info | context=context |
extra_info |
Additional information or Yeedu URL | extra_info=ti.xcom_pull(key='yeedu_run_url') |
error_summary |
Error details for failed runs | error_summary=ti.xcom_pull(key='yeedu_error_summary') |
cluster_bump_info |
Cluster failover information | cluster_bump_info="Attempted clusters: [10, 20]" |
XCom Keys
The YeeduOperator automatically pushes these values to XCom for use in callbacks:
| Key | Description |
|---|---|
yeedu_run_url |
Direct URL to the Yeedu run metrics page |
yeedu_error_summary |
Extracted error details from job/notebook stderr |
yeedu_cluster_attempts |
List of cluster IDs attempted during bump |
Visual References
Connection Configuration Example
SSO Token Variable Example
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file airflow_yeedu_operator-2.10.0.tar.gz.
File metadata
- Download URL: airflow_yeedu_operator-2.10.0.tar.gz
- Upload date:
- Size: 45.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
80b6ec81cff84cbd9ee3574b3d14def5aed6ea4174052e95dfabfcae14a96e2e
|
|
| MD5 |
32cd23da0930ea837631d741d0ee564b
|
|
| BLAKE2b-256 |
fc91c55431fe90c8a1a1a3d1604742bf4a37e4a67a853f2bebb799c7b9867ac7
|
Provenance
The following attestation bundles were made for airflow_yeedu_operator-2.10.0.tar.gz:
Publisher:
publish-to-pypi.yml on yeedu-io/Apache-Airflow-Operator
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_yeedu_operator-2.10.0.tar.gz -
Subject digest:
80b6ec81cff84cbd9ee3574b3d14def5aed6ea4174052e95dfabfcae14a96e2e - Sigstore transparency entry: 1259526399
- Sigstore integration time:
-
Permalink:
yeedu-io/Apache-Airflow-Operator@10bf41b85bb8b87e0c5caecfa3b92ed01b977d21 -
Branch / Tag:
refs/tags/v2.10.0 - Owner: https://github.com/yeedu-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-to-pypi.yml@10bf41b85bb8b87e0c5caecfa3b92ed01b977d21 -
Trigger Event:
push
-
Statement type:
File details
Details for the file airflow_yeedu_operator-2.10.0-py3-none-any.whl.
File metadata
- Download URL: airflow_yeedu_operator-2.10.0-py3-none-any.whl
- Upload date:
- Size: 52.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4d41fb3d107c6243db7c98b354ac8cba05789d6c705be6da31400ab3bb8ade8d
|
|
| MD5 |
9f7359e0d91b269824364f485d28bd0b
|
|
| BLAKE2b-256 |
8f3bca9aa6d815b3280061bf65a5e715cd7349e512631077bcceb66bf0e03390
|
Provenance
The following attestation bundles were made for airflow_yeedu_operator-2.10.0-py3-none-any.whl:
Publisher:
publish-to-pypi.yml on yeedu-io/Apache-Airflow-Operator
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_yeedu_operator-2.10.0-py3-none-any.whl -
Subject digest:
4d41fb3d107c6243db7c98b354ac8cba05789d6c705be6da31400ab3bb8ade8d - Sigstore transparency entry: 1259526438
- Sigstore integration time:
-
Permalink:
yeedu-io/Apache-Airflow-Operator@10bf41b85bb8b87e0c5caecfa3b92ed01b977d21 -
Branch / Tag:
refs/tags/v2.10.0 - Owner: https://github.com/yeedu-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-to-pypi.yml@10bf41b85bb8b87e0c5caecfa3b92ed01b977d21 -
Trigger Event:
push
-
Statement type: