Airflow provider for Arize AX: operators and hooks for datasets, experiments, projects, spans, and ML.
Project description
Arize AX Airflow Provider
The official Apache Airflow provider for Arize AX — schedule, automate, and orchestrate your LLMOps workflows directly from Airflow.
Build DAGs that evaluate prompts continuously, compare experiments before deploying, detect drift, curate datasets from production traces, and gate releases on evaluation scores, all using purpose-built operators that wrap the Arize AX platform.
Features
- 97 operators across 12 domains: datasets, experiments, projects, spans, evaluators, prompts, tasks, annotations, AI integrations, API keys, spaces, ML
- 8 sensors for waiting on dataset readiness, experiment completion, span ingestion, evaluation scores, task runs, and more
- Built-in gates
fail_on_regression=True,fail_on_drift=True,min_scorethresholds raiseAirflowExceptiondirectly, no glue code needed - Idempotent operations
if_exists="skip"on creates,ignore_if_missing=Trueon deletes - Clean XCom values operators return scalar IDs by default, full results available via named XCom keys
- Continuous evaluation tasks with Eval Hub for live production monitoring
- Human-in-the-loop annotations through annotation queues
- 20 example DAGs covering CI/CD gates, drift detection, prompt lifecycle, RAG evaluation, fine-tuning data pipelines, and a self-contained self-optimizing loop demo
Installation
pip install arize-ax-airflow-provider
Requires Python 3.10+, Apache Airflow 2.4+, and Arize SDK 8.27.0 (the
provider exact-pins arize==8.27.0 so installs only use the version we test
against; bump alongside the provider release when adopting a newer SDK).
Setup
- In Airflow UI, go to Admin → Connections → Add
- Set Connection Id to
arize_ax_default - Set Connection Type to
arize_ax - Set Password to your Arize API key
- (Optional) Set Extra to
{"space_id": "your-space-id"}to use it as the default space
Quick start
A server-side CI/CD gate: pull a candidate prompt from Prompt Hub, have Arize Eval Hub run it against an evaluation dataset (no LLM SDK or API key on the Airflow worker), compare against the production baseline, and promote on pass. Requires four Airflow Variables: arize_ax_prompt_name, arize_ax_eval_dataset_id, arize_ax_baseline_experiment_id, arize_ax_ai_integration_id.
from datetime import datetime
from typing import Any
from airflow import DAG
from airflow.models import Variable
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.arize_ax.operators.experiments import (
ArizeAxCompareExperimentsOperator,
)
from airflow.providers.arize_ax.operators.prompts import (
ArizeAxGetPromptOperator,
ArizeAxPromotePromptOperator,
)
from airflow.providers.arize_ax.operators.tasks import (
ArizeAxCreateRunExperimentTaskOperator,
ArizeAxGetTaskRunOperator,
ArizeAxTriggerTaskRunOperator,
)
from airflow.providers.arize_ax.sensors.arize_ax import ArizeAxTaskRunSensor
def build_run_config_from_prompt(**ctx) -> dict[str, Any]:
"""Materialize a Prompt Hub prompt version into a server-side run config."""
prompt = ctx["ti"].xcom_pull(task_ids="fetch_candidate_prompt")
version = prompt["version"]
return {
"experiment_type": "llm_generation",
"ai_integration_id": Variable.get("arize_ax_ai_integration_id"),
"model_name": version["model"],
"messages": version["messages"],
"input_variable_format": version["input_variable_format"],
"invocation_parameters": version.get("invocation_params") or {},
"provider_parameters": version.get("provider_params") or {},
}
with DAG(
dag_id="llm_cicd_gate",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
render_template_as_native_obj=True, # required: dict XCom must stay a dict
) as dag:
fetch_candidate_prompt = ArizeAxGetPromptOperator(
task_id="fetch_candidate_prompt",
prompt_name="{{ var.value.arize_ax_prompt_name }}",
version_label="staging",
)
build_run_config = PythonOperator(
task_id="build_run_config",
python_callable=build_run_config_from_prompt,
)
create_task = ArizeAxCreateRunExperimentTaskOperator(
task_id="create_candidate_task",
name="candidate-{{ ds_nodash }}",
dataset_id="{{ var.value.arize_ax_eval_dataset_id }}",
run_configuration="{{ ti.xcom_pull(task_ids='build_run_config') }}",
if_exists="skip",
)
trigger_run = ArizeAxTriggerTaskRunOperator(
task_id="trigger_candidate_run",
task_id_param="{{ ti.xcom_pull(task_ids='create_candidate_task') }}",
experiment_name="candidate-{{ ds_nodash }}",
)
wait_for_run = ArizeAxTaskRunSensor(
task_id="wait_for_candidate_run",
run_id="{{ ti.xcom_pull(task_ids='trigger_candidate_run') }}",
poke_interval=15,
timeout=900,
mode="reschedule",
)
get_result = ArizeAxGetTaskRunOperator(
task_id="get_candidate_result",
run_id="{{ ti.xcom_pull(task_ids='trigger_candidate_run') }}",
)
compare = ArizeAxCompareExperimentsOperator(
task_id="compare_to_baseline",
baseline_experiment_id="{{ var.value.arize_ax_baseline_experiment_id }}",
candidate_experiment_id="{{ ti.xcom_pull(task_ids='get_candidate_result')['experiment_id'] }}",
metric_names=["accuracy"],
pass_threshold=0.0,
fail_on_regression=True,
)
promote = ArizeAxPromotePromptOperator(
task_id="promote_to_production",
prompt_name="{{ var.value.arize_ax_prompt_name }}",
label="production",
)
fetch_candidate_prompt >> build_run_config >> create_task >> trigger_run
trigger_run >> wait_for_run >> get_result >> compare >> promote
That's a complete server-side CI/CD gate. The fail_on_regression=True flag means compare raises AirflowException when the candidate underperforms the baseline, so promote never runs. The same arize_ax_prompt_name Variable threads through fetch (at the staging label) and promote (to the production label), the gate is the only path that re-labels production.
Prefer running the LLM call yourself? Swap the server-side stack (Get/Create/Trigger/Wait/Get) for a single
ArizeAxRunExperimentOperatorwith atask=callable. Seeexample_arize_ax_llm_cicd_gate_dag.pyfor the client-side variant.
Operator domains
| Domain | Operators |
|---|---|
| Datasets | List, create, get, delete, list/append/annotate examples, export to file, health check, smart refresh |
| Experiments | List, create, get, delete, run, list/annotate runs, get score, compare, detect drift, calibration, behavioral regression, budget allocator |
| Projects | List, create, get, delete |
| Spans | List, log, update evaluations/annotations/metadata, export to DataFrame/Parquet, get metrics, curate to dataset, curate feedback dataset (self-learning agents), export annotated, export to fine-tuning, adaptive sampling |
| Evaluators | List, create (template or code), get, update, delete, list/get/add versions (template or code) |
| Prompts | List, create, get, delete, compare, promote, optimize (meta-prompt via Prompt Learning SDK) |
| Tasks | List, create (evaluation or run-experiment), get, update, delete, list runs, get run, trigger run, cancel run |
| Annotations | List/create/delete configs, list/get/create/update/delete queues, list/add/delete records, annotate, assign |
| AI Integrations | List, get, create, update, delete |
| API Keys | List, create, delete, refresh |
| Spaces | List, get, create, update, delete |
| ML | Log batch/stream, export to DataFrame/Parquet |
Sensors
| Sensor | Purpose |
|---|---|
ArizeAxExperimentCompleteSensor |
Wait until an experiment reaches a terminal state |
ArizeAxDatasetReadySensor |
Wait until a dataset has at least N examples |
ArizeAxSpanCountSensor |
Wait until span count in a project exceeds a threshold |
ArizeAxEvaluationScoreSensor |
Wait until evaluation score crosses a threshold |
ArizeAxExperimentRunCountSensor |
Wait until experiment has N runs |
ArizeAxSpanIngestionSensor |
Wait until span ingestion stabilizes |
ArizeAxAnnotationQueueSensor |
Wait until annotation queue is configured |
ArizeAxTaskRunSensor |
Wait until a task run reaches a terminal state |
Design patterns
The provider follows established Airflow operator conventions so DAGs read naturally and stay maintainable:
- Idempotent creates Set
if_exists="skip"to handle 409 conflicts by resolving the existing resource by name - Idempotent deletes All Delete operators accept
ignore_if_missing=True(default), logging on 404 instead of raising - Built-in gates Comparison operators (
CompareExperiments,DetectEvalDrift,EvaluatorCalibration,BehavioralRegression,GetExperimentScore) acceptfail_on_*/min_scoreparams that raiseAirflowExceptionon failure - Param validation Operators validate required
space_id/project_idinexecute()with clear error messages - Convenience XCom keys List operators push
first_idandfirst_namefor direct chaining via Jinja templates - Override evaluations
ArizeAxTriggerTaskRunOperator(override_evaluations=True)re-evaluates spans that already have labels
Example DAGs
Bundled in provider_pkg/example_dags/:
| Pattern | DAG |
|---|---|
| Self-contained smoke test | example_arize_ax_e2e_dag.py |
| LLM CI/CD gate | example_arize_ax_llm_cicd_gate_dag.py |
| Prompt lifecycle (staging → production) | example_arize_ax_prompt_lifecycle_dag.py |
| Prompt A/B testing | example_arize_ax_prompt_ab_test_dag.py |
| Drift detection with auto-rollback | example_arize_ax_drift_detection_dag.py |
| Behavioral regression detection | example_arize_ax_behavioral_regression_dag.py |
| Evaluator calibration vs human labels | example_arize_ax_evaluator_calibration_dag.py |
| RAG evaluation pipeline | example_arize_ax_rag_evaluation_dag.py |
| Production span curation into datasets | example_arize_ax_dataset_curation_dag.py |
| Fine-tuning data pipeline | example_arize_ax_finetune_data_pipeline_dag.py |
| Continuous evaluation tasks | example_arize_ax_tasks_dag.py |
| Annotation queues for HITL | example_arize_ax_annotation_queues_dag.py |
| Multi-model experiment matrix | example_arize_ax_llm_experiments_dag.py |
| Self-learning agent (multi-prompt optimization from production feedback) | example_arize_ax_prompt_optimization_with_feedback_dag.py |
| Self-optimizing loop (self-contained closed loop: baseline → optimize → candidate → gate → promote) | example_arize_ax_self_optimizing_loop_dag.py |
Plus dataset, experiment, evaluator, span, project, ML, and admin demos for individual domain walkthroughs.
The self-learning agent demo requires the
prompt-learning-enhancedSDK (available as a git source only, PyPI does not accept direct-URL deps, so it is not declared as an optional extra here). Install it separately on workers that run the optimization DAG:pip install 'arize-phoenix-evals>=2.0,<3.0' \ 'prompt-learning-enhanced @ git+https://github.com/Arize-ai/prompt-learning.git'The
arize-phoenix-evals<3.0pin is required because upstreamprompt-learning-enhancedimportsphoenix.evals.models, which was removed in 3.0.0.ArizeAxOptimizePromptOperatorraises a clearAirflowExceptionwith this exact install line when the SDK is missing.
Documentation
- Provider guide: Arize AX Airflow Provider — install, setup, design patterns, example DAGs
- Operator reference: Operators, sensors, and hooks — full reference for all 97 operators
- Arize AX docs: arize.com/docs/ax
- Arize SDK reference: Arize Python SDK v8
Support
- Help & support: arize.com/support
- Slack: Arize Community Slack
- Twitter/X: @ArizeAI
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file arize_ax_airflow_provider-1.3.0.tar.gz.
File metadata
- Download URL: arize_ax_airflow_provider-1.3.0.tar.gz
- Upload date:
- Size: 158.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5d58b6973f4317fb75e81bc051131f39bd7baa8f858a545c9e892dc3c3c28654
|
|
| MD5 |
f16819f444068c29f6627177154fd1e8
|
|
| BLAKE2b-256 |
ae81d055cc5d4d0bce7bc7246bc46c780d89e81c93ed5cd87af6cfe091916efe
|
Provenance
The following attestation bundles were made for arize_ax_airflow_provider-1.3.0.tar.gz:
Publisher:
publish.yml on Arize-ai/arize-ax-airflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
arize_ax_airflow_provider-1.3.0.tar.gz -
Subject digest:
5d58b6973f4317fb75e81bc051131f39bd7baa8f858a545c9e892dc3c3c28654 - Sigstore transparency entry: 1573582327
- Sigstore integration time:
-
Permalink:
Arize-ai/arize-ax-airflow@4b44a310398198c4986cde38226121d9feb8aefa -
Branch / Tag:
refs/tags/v1.3.0 - Owner: https://github.com/Arize-ai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@4b44a310398198c4986cde38226121d9feb8aefa -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file arize_ax_airflow_provider-1.3.0-py3-none-any.whl.
File metadata
- Download URL: arize_ax_airflow_provider-1.3.0-py3-none-any.whl
- Upload date:
- Size: 174.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
08b73831a62b872366a086ef0206689ca5d0502cd8491bbd350c16bcf6e4b2c3
|
|
| MD5 |
ca1ff18f55a745e7d87914fc9372f000
|
|
| BLAKE2b-256 |
6a928a428cdaff19fc53d65d3cc67eff587d6d469538911daf4e0a9e65e08316
|
Provenance
The following attestation bundles were made for arize_ax_airflow_provider-1.3.0-py3-none-any.whl:
Publisher:
publish.yml on Arize-ai/arize-ax-airflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
arize_ax_airflow_provider-1.3.0-py3-none-any.whl -
Subject digest:
08b73831a62b872366a086ef0206689ca5d0502cd8491bbd350c16bcf6e4b2c3 - Sigstore transparency entry: 1573582372
- Sigstore integration time:
-
Permalink:
Arize-ai/arize-ax-airflow@4b44a310398198c4986cde38226121d9feb8aefa -
Branch / Tag:
refs/tags/v1.3.0 - Owner: https://github.com/Arize-ai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@4b44a310398198c4986cde38226121d9feb8aefa -
Trigger Event:
workflow_dispatch
-
Statement type: