Skip to main content

Model Context Protocol (MCP) server for Apache Airflow API integration. Provides comprehensive tools for managing Airflow clusters including service operations, configuration management, status monitoring, and request tracking.

Project description

Model Context Protocol (MCP) server for Apache Airflow API integration.
This project provides natural language MCP tools for essential Airflow cluster operations.

Deploy to PyPI with tag

smithery badge


MCP-Airflow-API

Tested and supported Airflow version: 2.10.2 (API Version: v1)

Features

  • List all DAGs in the Airflow cluster
  • Monitor running/failed DAG runs
  • Trigger DAG runs on demand
  • Check cluster health and version information
  • Minimal, LLM-friendly output for all tools
  • Easy integration with MCP Inspector, OpenWebUI, Smithery, etc.

Available MCP Tools

DAG Management

  • list_dags
    Returns all DAGs registered in the Airflow cluster.
    Output: dag_id, dag_display_name, is_active, is_paused, owners, tags

  • running_dags
    Returns all currently running DAG runs.
    Output: dag_id, run_id, state, execution_date, start_date, end_date

  • failed_dags
    Returns all recently failed DAG runs.
    Output: dag_id, run_id, state, execution_date, start_date, end_date

  • trigger_dag(dag_id)
    Immediately triggers the specified DAG.
    Output: dag_id, run_id, state, execution_date, start_date, end_date

  • pause_dag(dag_id)
    Pauses the specified DAG (prevents scheduling new runs).
    Output: dag_id, is_paused

  • unpause_dag(dag_id)
    Unpauses the specified DAG (allows scheduling new runs).
    Output: dag_id, is_paused

Cluster Management & Health

  • get_health
    Get the health status of the Airflow webserver instance.
    Output: metadatabase, scheduler, overall health status

  • get_version
    Get version information of the Airflow instance.
    Output: version, git_version, build_date, api_version

Pool Management

  • list_pools(limit=100, offset=0)
    List all pools in the Airflow instance with pagination support.
    Output: pools, total_entries, limit, offset, pool details with slots usage

  • get_pool(pool_name)
    Get detailed information about a specific pool.
    Output: name, slots, occupied_slots, running_slots, queued_slots, open_slots, description, utilization_percentage

Task Instance Management

  • get_current_time_context()
    Returns the current time context for accurate relative date calculations.
    Output: current_date, current_time, reference_date, date_calculation_examples, message

  • list_task_instances_all(dag_id=None, dag_run_id=None, execution_date_gte=None, execution_date_lte=None, start_date_gte=None, start_date_lte=None, end_date_gte=None, end_date_lte=None, duration_gte=None, duration_lte=None, state=None, pool=None, queue=None, limit=100, offset=0)
    Lists task instances across all DAGs or filtered by specific criteria with comprehensive filtering options.
    Output: task_instances, total_entries, limit, offset, applied_filters

  • get_task_instance_details(dag_id, dag_run_id, task_id)
    Retrieves detailed information about a specific task instance.
    Output: Comprehensive task instance details including execution info, state, timing, configuration, and metadata

  • list_task_instances_batch(dag_ids=None, dag_run_ids=None, task_ids=None, execution_date_gte=None, execution_date_lte=None, start_date_gte=None, start_date_lte=None, end_date_gte=None, end_date_lte=None, duration_gte=None, duration_lte=None, state=None, pool=None, queue=None)
    Lists task instances in batch with multiple filtering criteria for bulk operations.
    Output: task_instances, total_entries, applied_filters, batch processing results

  • get_task_instance_extra_links(dag_id, dag_run_id, task_id)
    Lists extra links for a specific task instance (e.g., monitoring dashboards, logs, external resources).
    Output: task_id, dag_id, dag_run_id, extra_links, total_links

  • get_task_instance_logs(dag_id, dag_run_id, task_id, try_number=1, full_content=False, token=None)
    Retrieves logs for a specific task instance and its try number with content and metadata.
    Output: task_id, dag_id, dag_run_id, try_number, content, continuation_token, metadata

DAG Analysis & Monitoring

  • dag_details(dag_id)
    Retrieves comprehensive details for a specific DAG.
    Output: dag_id, description, schedule_interval, owners, tags, start_date, next_dagrun, etc.

  • dag_graph(dag_id)
    Retrieves task dependency graph structure for a specific DAG.
    Output: dag_id, tasks, dependencies, task relationships

  • list_tasks(dag_id)
    Lists all tasks for a specific DAG.
    Output: dag_id, tasks, task configuration details
    Output: dag_id, tasks, dependencies, task relationships

  • dag_code(dag_id)
    Retrieves the source code for a specific DAG.
    Output: dag_id, file_token, source_code

  • list_event_logs(dag_id=None, task_id=None, run_id=None, limit=20, offset=0)
    Lists event log entries with optional filtering.
    Output: event_logs, total_entries, limit, offset

  • get_event_log(event_log_id)
    Retrieves a specific event log entry by ID.
    Output: event_log_id, when, event, dag_id, task_id, run_id, etc.

  • all_dag_event_summary()
    Retrieves event count summary for all DAGs.
    Output: dag_summaries, total_dags, total_events

  • list_import_errors(limit=20, offset=0)
    Lists import errors with optional filtering.
    Output: import_errors, total_entries, limit, offset

  • get_import_error(import_error_id)
    Retrieves a specific import error by ID.
    Output: import_error_id, filename, stacktrace, timestamp

  • all_dag_import_summary()
    Retrieves import error summary for all DAGs.
    Output: import_summaries, total_errors, affected_files

  • dag_run_duration(dag_id, limit=10)
    Retrieves run duration statistics for a specific DAG.
    Output: dag_id, runs, duration analysis, success/failure stats

  • dag_task_duration(dag_id, run_id=None)
    Retrieves task duration information for a specific DAG run.
    Output: dag_id, run_id, tasks, individual task performance

  • dag_calendar(dag_id, start_date=None, end_date=None)
    Retrieves calendar/schedule information for a specific DAG.
    Output: dag_id, schedule_interval, runs, upcoming executions


Example Queries

Basic DAG Operations

  • list_dags: "List all DAGs."
  • running_dags: "Show running DAGs."
  • failed_dags: "Show failed DAGs."
  • trigger_dag: "Trigger DAG 'example_dag'."
  • pause_dag: "Pause DAG 'etl_job'."
  • unpause_dag: "Unpause DAG 'etl_job'."

Cluster Management & Health

  • get_health: "Check Airflow cluster health."
  • get_version: "Get Airflow version information."

Pool Management

  • list_pools: "List all pools."
  • list_pools: "Show pool usage statistics."
  • get_pool: "Get details for pool 'default_pool'."
  • get_pool: "Check pool utilization."

Task Instance Management

  • list_task_instances_all: "List all task instances for DAG 'conditional_dataset_and_time_based_timetable'."
  • list_task_instances_all: "Show running task instances."
  • list_task_instances_all: "Show task instances filtered by pool 'default_pool'."
  • list_task_instances_all: "List task instances with duration greater than 300 seconds."
  • list_task_instances_all: "Show failed task instances from last week."
  • list_task_instances_all: "List failed task instances from yesterday."
  • list_task_instances_all: "Show task instances that started after 9 AM today."
  • list_task_instances_all: "List task instances from the last 3 days with state 'failed'."
  • get_task_instance_details: "Get details for task 'conditional_dataset_and_time_based_timetable' in DAG 'conditional_dataset_and_time_based_timetable' run 'scheduled__2025-08-06T01:00:00+00:00'."
  • list_task_instances_batch: "List failed task instances from last month."
  • list_task_instances_batch: "Show task instances in batch for multiple DAGs from this week."
  • get_task_instance_extra_links: "Get extra links for task 'data_processing' in latest run."
  • get_task_instance_logs: "Retrieve logs for task 'etl_task' try number 2."

DAG Analysis & Monitoring

  • dag_details: "Get details for DAG 'my_dag'."
  • dag_graph: "Show task graph for DAG 'workflow_dag'."
  • list_tasks: "List all tasks in DAG 'data_pipeline'."
  • dag_code: "Get source code for DAG 'data_pipeline'."
  • list_event_logs: "List event logs for DAG 'etl_process'."
  • list_event_logs: "Show event logs from yesterday for all DAGs."
  • get_event_log: "Get event log entry with ID 12345."
  • all_dag_event_summary: "Show event count summary for all DAGs."
  • list_import_errors: "List import errors."
  • get_import_error: "Get import error with ID 67890."
  • all_dag_import_summary: "Show import error summary for all DAGs."
  • dag_run_duration: "Get run duration stats for DAG 'batch_job'."
  • dag_task_duration: "Show task durations for latest run of 'ml_pipeline'."
  • dag_calendar: "Get calendar info for DAG 'daily_report' from last month."
  • dag_calendar: "Show DAG schedule for 'weekly_report' from this week."

Prompt Template

The package exposes a tool get_prompt_template that returns either the entire template, a specific section, or just the headings. Three MCP prompts (prompt_template_full, prompt_template_headings, prompt_template_section) are also registered for discovery.

MCP Prompts

For easier discoverability in MCP clients (so prompts/list is not empty), the server now registers three prompts:

prompt_template_full – returns the full canonical template
prompt_template_headings – returns only the section headings
prompt_template_section – takes a section argument (number or keyword) and returns that section

You can still use the get_prompt_template tool for programmatic access or when you prefer tool invocation over prompt retrieval.

Single canonical English prompt template guides safe and efficient tool selection.

Files: • Packaged: src/mcp_airflow_api/prompt_template.md (distributed with PyPI)
• (Optional workspace root copy PROMPT_TEMPLATE.md may exist for editing; packaged copy is the one loaded at runtime.)

Retrieve dynamically via MCP tool: • get_prompt_template() – full template
get_prompt_template("tool map") – only the tool mapping section
get_prompt_template("3") – section 3 (tool map)
get_prompt_template(mode="headings") – list all section headings

Policy: Only English is stored; LLM는 사용자 질의 언어와 무관하게 영어 지침을 내부 추론용으로 사용하고, 사용자 응답은 필요 시 다국어로 생성한다.


Main Tool Files

  • MCP tool definitions: src/mcp_airflow_api/airflow_api.py
  • Utility functions: src/mcp_airflow_api/functions.py

Prompt Template

How To Use

  1. In your MCP Tools environment, configure mcp-config.json as follows:
{
	"mcpServers": {
		"airflow-api": {
			"command": "uvx",
			"args": ["--python", "3.11", "mcp-airflow-api"],
			"env": {
				"AIRFLOW_API_URL": "http://localhost:38080/api/v1",
				"AIRFLOW_API_USERNAME": "airflow",
				"AIRFLOW_API_PASSWORD": "airflow",
				"AIRFLOW_LOG_LEVEL": "INFO"
			}
		}
	}
}
  1. Register the MCP server in MCP Inspector, OpenWebUI, Smithery, etc. and use the tools.

QuickStart (Demo): Running MCP-Airflow-API with Docker

  1. Prepare an Airflow cluster

  2. Prepare MCP Tools environment

    • Install Docker and Docker Compose
    • Clone this project and run docker-compose up -d in the root directory
  3. Register the MCP server in MCP Inspector/Smithery

    • Example address: http://localhost:8000/airflow-api

Logging & Observability

  • Structured logs for all tool invocations and HTTP requests
  • Control log level via environment variable (AIRFLOW_LOG_LEVEL) or CLI flag (--log-level)
  • Supported levels: DEBUG, INFO, WARNING, ERROR, CRITICAL

Installing via Smithery

To install Airflow API Integration Server for Claude Desktop automatically via Smithery:

npx -y @smithery/cli install @call518/mcp-airflow-api --client claude

Roadmap

This project starts with a minimal set of essential Airflow management tools. Many more useful features and tools for Airflow cluster operations will be added soon, including advanced monitoring, DAG/task analytics, scheduling controls, and more. Contributions and suggestions are welcome!


Additional Links


License

This project is licensed under the MIT License.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mcp_airflow_api-0.3.0.tar.gz (22.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mcp_airflow_api-0.3.0-py3-none-any.whl (21.2 kB view details)

Uploaded Python 3

File details

Details for the file mcp_airflow_api-0.3.0.tar.gz.

File metadata

  • Download URL: mcp_airflow_api-0.3.0.tar.gz
  • Upload date:
  • Size: 22.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for mcp_airflow_api-0.3.0.tar.gz
Algorithm Hash digest
SHA256 c4dd34a3ea9f1f4fdbcec5585f3527b84fe9183300b148dc3dc9df6246034ffe
MD5 e4b7404d89a33ff5b0108611663c6332
BLAKE2b-256 47021ddd9370701ca129bc87d1a25e17e9c23191b92b6a8a61c9222146f49b3b

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcp_airflow_api-0.3.0.tar.gz:

Publisher: pypi-publish.yml on call518/MCP-Airflow-API

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mcp_airflow_api-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mcp_airflow_api-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 177cea8e871bdb58ec1283a9a5067c9a50bdebc9f3fd88ce38e385286c4ee143
MD5 95c30848b28b36dc21f44cb99b2e150b
BLAKE2b-256 23dbed9139fbf8e085626f8e08048c668824f7ebc411cf8e4b6d5b19e71dd3c7

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcp_airflow_api-0.3.0-py3-none-any.whl:

Publisher: pypi-publish.yml on call518/MCP-Airflow-API

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page