Skip to main content

Airflow provider for Arize AX: operators and hooks for datasets, experiments, projects, spans, and ML.

Project description

Arize AX Airflow Provider

PyPI Python Versions License

The official Apache Airflow provider for Arize AX — schedule, automate, and orchestrate your LLMOps workflows directly from Airflow.

Build DAGs that evaluate prompts continuously, compare experiments before deploying, detect drift, curate datasets from production traces, and gate releases on evaluation scores, all using purpose-built operators that wrap the Arize AX platform.

Features

  • 97 operators across 12 domains: datasets, experiments, projects, spans, evaluators, prompts, tasks, annotations, AI integrations, API keys, spaces, ML
  • 8 sensors for waiting on dataset readiness, experiment completion, span ingestion, evaluation scores, task runs, and more
  • Built-in gates fail_on_regression=True, fail_on_drift=True, min_score thresholds raise AirflowException directly, no glue code needed
  • Idempotent operations if_exists="skip" on creates, ignore_if_missing=True on deletes
  • Clean XCom values operators return scalar IDs by default, full results available via named XCom keys
  • Continuous evaluation tasks with Eval Hub for live production monitoring
  • Human-in-the-loop annotations through annotation queues
  • 20 example DAGs covering CI/CD gates, drift detection, prompt lifecycle, RAG evaluation, fine-tuning data pipelines, and a self-contained self-optimizing loop demo

Installation

pip install arize-ax-airflow-provider

Requires Python 3.10+, Apache Airflow 2.4+, and Arize SDK 8.27.0 (the provider exact-pins arize==8.27.0 so installs only use the version we test against; bump alongside the provider release when adopting a newer SDK).

Setup

  1. In Airflow UI, go to Admin → Connections → Add
  2. Set Connection Id to arize_ax_default
  3. Set Connection Type to arize_ax
  4. Set Password to your Arize API key
  5. (Optional) Set Extra to {"space_id": "your-space-id"} to use it as the default space

Quick start

A server-side CI/CD gate: pull a candidate prompt from Prompt Hub, have Arize Eval Hub run it against an evaluation dataset (no LLM SDK or API key on the Airflow worker), compare against the production baseline, and promote on pass. Requires four Airflow Variables: arize_ax_prompt_name, arize_ax_eval_dataset_id, arize_ax_baseline_experiment_id, arize_ax_ai_integration_id.

from datetime import datetime
from typing import Any

from airflow import DAG
from airflow.models import Variable
from airflow.providers.standard.operators.python import PythonOperator

from airflow.providers.arize_ax.operators.experiments import (
    ArizeAxCompareExperimentsOperator,
)
from airflow.providers.arize_ax.operators.prompts import (
    ArizeAxGetPromptOperator,
    ArizeAxPromotePromptOperator,
)
from airflow.providers.arize_ax.operators.tasks import (
    ArizeAxCreateRunExperimentTaskOperator,
    ArizeAxGetTaskRunOperator,
    ArizeAxTriggerTaskRunOperator,
)
from airflow.providers.arize_ax.sensors.arize_ax import ArizeAxTaskRunSensor


def build_run_config_from_prompt(**ctx) -> dict[str, Any]:
    """Materialize a Prompt Hub prompt version into a server-side run config."""
    prompt = ctx["ti"].xcom_pull(task_ids="fetch_candidate_prompt")
    version = prompt["version"]
    return {
        "experiment_type": "llm_generation",
        "ai_integration_id": Variable.get("arize_ax_ai_integration_id"),
        "model_name": version["model"],
        "messages": version["messages"],
        "input_variable_format": version["input_variable_format"],
        "invocation_parameters": version.get("invocation_params") or {},
        "provider_parameters": version.get("provider_params") or {},
    }


with DAG(
    dag_id="llm_cicd_gate",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    render_template_as_native_obj=True,  # required: dict XCom must stay a dict
) as dag:

    fetch_candidate_prompt = ArizeAxGetPromptOperator(
        task_id="fetch_candidate_prompt",
        prompt_name="{{ var.value.arize_ax_prompt_name }}",
        version_label="staging",
    )

    build_run_config = PythonOperator(
        task_id="build_run_config",
        python_callable=build_run_config_from_prompt,
    )

    create_task = ArizeAxCreateRunExperimentTaskOperator(
        task_id="create_candidate_task",
        name="candidate-{{ ds_nodash }}",
        dataset_id="{{ var.value.arize_ax_eval_dataset_id }}",
        run_configuration="{{ ti.xcom_pull(task_ids='build_run_config') }}",
        if_exists="skip",
    )

    trigger_run = ArizeAxTriggerTaskRunOperator(
        task_id="trigger_candidate_run",
        task_id_param="{{ ti.xcom_pull(task_ids='create_candidate_task') }}",
        experiment_name="candidate-{{ ds_nodash }}",
    )

    wait_for_run = ArizeAxTaskRunSensor(
        task_id="wait_for_candidate_run",
        run_id="{{ ti.xcom_pull(task_ids='trigger_candidate_run') }}",
        poke_interval=15,
        timeout=900,
        mode="reschedule",
    )

    get_result = ArizeAxGetTaskRunOperator(
        task_id="get_candidate_result",
        run_id="{{ ti.xcom_pull(task_ids='trigger_candidate_run') }}",
    )

    compare = ArizeAxCompareExperimentsOperator(
        task_id="compare_to_baseline",
        baseline_experiment_id="{{ var.value.arize_ax_baseline_experiment_id }}",
        candidate_experiment_id="{{ ti.xcom_pull(task_ids='get_candidate_result')['experiment_id'] }}",
        metric_names=["accuracy"],
        pass_threshold=0.0,
        fail_on_regression=True,
    )

    promote = ArizeAxPromotePromptOperator(
        task_id="promote_to_production",
        prompt_name="{{ var.value.arize_ax_prompt_name }}",
        label="production",
    )

    fetch_candidate_prompt >> build_run_config >> create_task >> trigger_run
    trigger_run >> wait_for_run >> get_result >> compare >> promote

That's a complete server-side CI/CD gate. The fail_on_regression=True flag means compare raises AirflowException when the candidate underperforms the baseline, so promote never runs. The same arize_ax_prompt_name Variable threads through fetch (at the staging label) and promote (to the production label), the gate is the only path that re-labels production.

Prefer running the LLM call yourself? Swap the server-side stack (Get/Create/Trigger/Wait/Get) for a single ArizeAxRunExperimentOperator with a task= callable. See example_arize_ax_llm_cicd_gate_dag.py for the client-side variant.

Operator domains

Domain Operators
Datasets List, create, get, delete, list/append/annotate examples, export to file, health check, smart refresh
Experiments List, create, get, delete, run, list/annotate runs, get score, compare, detect drift, calibration, behavioral regression, budget allocator
Projects List, create, get, delete
Spans List, log, update evaluations/annotations/metadata, export to DataFrame/Parquet, get metrics, curate to dataset, curate feedback dataset (self-learning agents), export annotated, export to fine-tuning, adaptive sampling
Evaluators List, create (template or code), get, update, delete, list/get/add versions (template or code)
Prompts List, create, get, delete, compare, promote, optimize (meta-prompt via Prompt Learning SDK)
Tasks List, create (evaluation or run-experiment), get, update, delete, list runs, get run, trigger run, cancel run
Annotations List/create/delete configs, list/get/create/update/delete queues, list/add/delete records, annotate, assign
AI Integrations List, get, create, update, delete
API Keys List, create, delete, refresh
Spaces List, get, create, update, delete
ML Log batch/stream, export to DataFrame/Parquet

Sensors

Sensor Purpose
ArizeAxExperimentCompleteSensor Wait until an experiment reaches a terminal state
ArizeAxDatasetReadySensor Wait until a dataset has at least N examples
ArizeAxSpanCountSensor Wait until span count in a project exceeds a threshold
ArizeAxEvaluationScoreSensor Wait until evaluation score crosses a threshold
ArizeAxExperimentRunCountSensor Wait until experiment has N runs
ArizeAxSpanIngestionSensor Wait until span ingestion stabilizes
ArizeAxAnnotationQueueSensor Wait until annotation queue is configured
ArizeAxTaskRunSensor Wait until a task run reaches a terminal state

Design patterns

The provider follows established Airflow operator conventions so DAGs read naturally and stay maintainable:

  • Idempotent creates Set if_exists="skip" to handle 409 conflicts by resolving the existing resource by name
  • Idempotent deletes All Delete operators accept ignore_if_missing=True (default), logging on 404 instead of raising
  • Built-in gates Comparison operators (CompareExperiments, DetectEvalDrift, EvaluatorCalibration, BehavioralRegression, GetExperimentScore) accept fail_on_* / min_score params that raise AirflowException on failure
  • Param validation Operators validate required space_id / project_id in execute() with clear error messages
  • Convenience XCom keys List operators push first_id and first_name for direct chaining via Jinja templates
  • Override evaluations ArizeAxTriggerTaskRunOperator(override_evaluations=True) re-evaluates spans that already have labels

Example DAGs

Bundled in provider_pkg/example_dags/:

Pattern DAG
Self-contained smoke test example_arize_ax_e2e_dag.py
LLM CI/CD gate example_arize_ax_llm_cicd_gate_dag.py
Prompt lifecycle (staging → production) example_arize_ax_prompt_lifecycle_dag.py
Prompt A/B testing example_arize_ax_prompt_ab_test_dag.py
Drift detection with auto-rollback example_arize_ax_drift_detection_dag.py
Behavioral regression detection example_arize_ax_behavioral_regression_dag.py
Evaluator calibration vs human labels example_arize_ax_evaluator_calibration_dag.py
RAG evaluation pipeline example_arize_ax_rag_evaluation_dag.py
Production span curation into datasets example_arize_ax_dataset_curation_dag.py
Fine-tuning data pipeline example_arize_ax_finetune_data_pipeline_dag.py
Continuous evaluation tasks example_arize_ax_tasks_dag.py
Annotation queues for HITL example_arize_ax_annotation_queues_dag.py
Multi-model experiment matrix example_arize_ax_llm_experiments_dag.py
Self-learning agent (multi-prompt optimization from production feedback) example_arize_ax_prompt_optimization_with_feedback_dag.py
Self-optimizing loop (self-contained closed loop: baseline → optimize → candidate → gate → promote) example_arize_ax_self_optimizing_loop_dag.py

Plus dataset, experiment, evaluator, span, project, ML, and admin demos for individual domain walkthroughs.

The self-learning agent demo requires the prompt-learning-enhanced SDK (available as a git source only, PyPI does not accept direct-URL deps, so it is not declared as an optional extra here). Install it separately on workers that run the optimization DAG:

pip install 'arize-phoenix-evals>=2.0,<3.0' \
            'prompt-learning-enhanced @ git+https://github.com/Arize-ai/prompt-learning.git'

The arize-phoenix-evals<3.0 pin is required because upstream prompt-learning-enhanced imports phoenix.evals.models, which was removed in 3.0.0. ArizeAxOptimizePromptOperator raises a clear AirflowException with this exact install line when the SDK is missing.

Documentation

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

arize_ax_airflow_provider-1.3.0.tar.gz (158.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

arize_ax_airflow_provider-1.3.0-py3-none-any.whl (174.6 kB view details)

Uploaded Python 3

File details

Details for the file arize_ax_airflow_provider-1.3.0.tar.gz.

File metadata

File hashes

Hashes for arize_ax_airflow_provider-1.3.0.tar.gz
Algorithm Hash digest
SHA256 5d58b6973f4317fb75e81bc051131f39bd7baa8f858a545c9e892dc3c3c28654
MD5 f16819f444068c29f6627177154fd1e8
BLAKE2b-256 ae81d055cc5d4d0bce7bc7246bc46c780d89e81c93ed5cd87af6cfe091916efe

See more details on using hashes here.

Provenance

The following attestation bundles were made for arize_ax_airflow_provider-1.3.0.tar.gz:

Publisher: publish.yml on Arize-ai/arize-ax-airflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file arize_ax_airflow_provider-1.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for arize_ax_airflow_provider-1.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 08b73831a62b872366a086ef0206689ca5d0502cd8491bbd350c16bcf6e4b2c3
MD5 ca1ff18f55a745e7d87914fc9372f000
BLAKE2b-256 6a928a428cdaff19fc53d65d3cc67eff587d6d469538911daf4e0a9e65e08316

See more details on using hashes here.

Provenance

The following attestation bundles were made for arize_ax_airflow_provider-1.3.0-py3-none-any.whl:

Publisher: publish.yml on Arize-ai/arize-ax-airflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page