Skip to main content

Apache Airflow plugin for orchestrating Spark Jobs on Data Platform.

Project description

DataPlatform Airflow Plugin

An Apache Airflow plugin for orchestrating Spark Jobs on Data Platform (VNG Cloud).

The plugin exposes a single DataPlatformOperator that manages the full lifecycle of a Spark Job:

  1. Acquire an IAM access token via OAuth2 client credentials (with automatic refresh).
  2. Submit a Spark Job run to the Data Platform API.
  3. Poll the job state until it reaches a terminal state.
  4. Cancel the active run when the Airflow task is killed.

Requirements

  • Python 3.10 or newer
  • Apache Airflow 2.8 or newer (Airflow 3 supported)
  • A Data Platform workspace with a registered Spark Job
  • VNG Cloud IAM client credentials (client ID and client secret)

Installation

From PyPI

pip install dataplatform-airflow-plugin

From source

git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
pip install -e ".[dev]"

Verify the installation

pip show dataplatform-airflow-plugin
python -c "from dataplatform_airflow_plugin import DataPlatformOperator, VNGCloudHook; print('OK')"

Confirm the plugin is registered with Airflow:

airflow plugins
# Expected output:
# dataplatform | dataplatform-airflow-plugin==<version>: EntryPoint(name='dataplatform', value='dataplatform_airflow_plugin.plugin:DataPlatformPlugin', group='airflow.plugins')

Install on the Airflow Helm chart (development)

In values.yaml or an override file:

env:
  - name: _PIP_ADDITIONAL_REQUIREMENTS
    value: "dataplatform-airflow-plugin==0.1.1"
helm upgrade <release> . -n <namespace> -f override.yaml

Helm performs a rolling restart of the scheduler, workers, triggerer, and api-server pods. Each pod runs pip install on startup and pulls the plugin from PyPI.

Install on the Airflow Helm chart (production)

Bake the plugin into a custom image — see Kubernetes (production).

Configuration

The plugin supports three credential sources, resolved in the following priority order:

Airflow ConnectionAirflow VariablesEnvironment variables

Pick the option that best fits your environment.

Option 1 — Airflow Connection (recommended for production)

Credentials are Fernet-encrypted at rest, managed through the Airflow UI, and easy to scope per environment (vng_dev, vng_prod, ...).

UI: Admin → Connections → Add

Field Value
Connection Id vng_cloud_default (default — overridable via vng_conn_id)
Connection Type Generic
Host https://dev-iam-proxy.dataplatform.vngcloud.tech
Login <VNG_CLIENT_ID>
Password <VNG_CLIENT_SECRET>
Extra (JSON) (optional — see below)

Extra (optional JSON to override default endpoints):

{
  "data_platform_url": "https://dev-backend-proxy.dataplatform.vngcloud.tech",
  "token_path": "/accounts-api/v2/auth/token",
  "fe_url": "https://dev-app.dataplatform.vngcloud.tech"
}

Provisioning via a Kubernetes Secret:

kubectl -n airflow create secret generic vng-cloud-conn \
  --from-literal=AIRFLOW_CONN_VNG_CLOUD_DEFAULT='{"conn_type":"generic","host":"https://dev-iam-proxy.dataplatform.vngcloud.tech","login":"<CID>","password":"<CSECRET>","extra":{"data_platform_url":"https://dev-backend-proxy.dataplatform.vngcloud.tech"}}'
# override.yaml
env:
  - name: AIRFLOW_CONN_VNG_CLOUD_DEFAULT
    valueFrom:
      secretKeyRef:
        name: vng-cloud-conn
        key: AIRFLOW_CONN_VNG_CLOUD_DEFAULT

Option 2 — Airflow Variables

Quicker to set up, but not encrypted by default. Suitable for development or small teams.

UI: Admin → Variables → Add

Variable name Required Default
vng_client_id Yes
vng_client_secret Yes
vng_iam_host No https://dev-iam-proxy.dataplatform.vngcloud.tech
vng_token_path No /accounts-api/v2/auth/token
vng_data_platform_url No https://dev-backend-proxy.dataplatform.vngcloud.tech/
vng_fe_url No https://dev-app.dataplatform.vngcloud.tech

CLI:

airflow variables set vng_client_id "<CLIENT_ID>"
airflow variables set vng_client_secret "<CLIENT_SECRET>"

Option 3 — Environment variables (local development)

export VNG_CLIENT_ID="..."
export VNG_CLIENT_SECRET="..."

Only the two core credentials are read from the environment. To override endpoint URLs, use Option 1 or 2.

Usage

from datetime import datetime
from airflow import DAG
from dataplatform_airflow_plugin import DataPlatformOperator

with DAG(
    dag_id="example_dataplatform_spark_job",
    start_date=datetime(2026, 1, 1),
    schedule=None,
    catchup=False,
    tags=["dataplatform"],
) as dag:

    run_job = DataPlatformOperator(
        task_id="run_spark_job",
        workspace_id="ws-abc-123",
        job_id="job-xyz-456",
        application_args=["--date", "{{ ds }}", "--mode", "prod"],
        polling_period_seconds=15,
        do_xcom_push=True,
    )

See dags/example_dataplatform.py for a complete example with templating and Variables, or dags/example_dataplatform_minimal.py for the minimal form.

DataPlatformOperator parameters

Parameter Required Default Description
workspace_id Yes Workspace identifier (templated).
job_id Yes Spark Job identifier (templated).
application_args No [""] list[str], dict, or JSON string (templated).
vng_conn_id No vng_cloud_default Airflow Connection identifier.
token_url No From Connection / default Overrides the IAM token endpoint.
data_platform_url No From Connection / default Overrides the Data Platform base URL.
polling_period_seconds No 15 Interval between job-state polls, in seconds.
do_xcom_push No False Push workspace_id, job_id, and run_id to XCom.

Templated fields: workspace_id, job_id, application_args — Jinja expressions such as {{ ds }}, {{ params.x }}, and {{ var.value.* }} are supported.

Template extensions: .json files referenced through application_args are auto-loaded.

Job UI link: when the job reaches a terminal state, the operator logs a link to the Data Platform UI:

[INFO] View logs on Data Platform UI: https://dev-app.dataplatform.vngcloud.tech/workspaces/<ws>/jobs/<job>

Override the UI base URL via the Connection's fe_url extra or the vng_fe_url Variable when promoting to production.

XCom keys

Key Constant Description
workspace_id XCOM_WORKSPACE_ID_KEY Workspace identifier.
job_id XCOM_JOB_ID_KEY Spark Job identifier.
run_id XCOM_RUN_ID_KEY Run identifier.

Spark Job states

from dataplatform_airflow_plugin import SparkJobState

SparkJobState.SUCCESS.is_final        # True
SparkJobState.SUCCESS.is_successful   # True
SparkJobState.RUNNING.is_final        # False
State Final Successful
QUEUING No
SCHEDULING No
PENDING No
RUNNING No
SUCCESS Yes Yes
FAILED Yes No
CANCELLED Yes No

Kubernetes (production)

For stable releases, avoid _PIP_ADDITIONAL_REQUIREMENTS — it slows pod startup and depends on network availability at every restart. Build a custom image instead:

FROM apache/airflow:3.2.0
RUN pip install --no-cache-dir dataplatform-airflow-plugin==0.1.1
docker build -t <registry>/airflow-dataplatform:0.1.1 .
docker push <registry>/airflow-dataplatform:0.1.1
# override.yaml
defaultAirflowRepository: <registry>/airflow-dataplatform
defaultAirflowTag: "0.1.1"

images:
  airflow:
    repository: <registry>/airflow-dataplatform
    tag: "0.1.1"
    pullPolicy: IfNotPresent
  pod_template:
    repository: <registry>/airflow-dataplatform
    tag: "0.1.1"
    pullPolicy: IfNotPresent

KubernetesExecutor: the pod_template image must also include the plugin — task pods are spawned from this template.

Development

git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
make install          # uv pip install -e ".[dev]"
make test             # pytest
make lint             # ruff + mypy
make format           # black + isort
make build            # build the wheel

Project layout

dataplatform-airflow-plugin/
├── dataplatform_airflow_plugin/
│   ├── __init__.py                       # Package metadata and public exports
│   ├── plugin.py                         # AirflowPlugin registration (hooks + operators)
│   ├── dataplatform_operator.py          # DataPlatformOperator + SparkJobState
│   └── hook.py                           # VNGCloudHook (IAM, Data Platform API)
├── dags/
│   ├── example_dataplatform.py           # Full example with templating and Variables
│   └── example_dataplatform_minimal.py   # Minimal example
├── tests/
│   ├── test_state.py
│   └── test_operator.py
├── Makefile
├── pyproject.toml
└── README.md

Release workflow

# 1. Bump the version in pyproject.toml and dataplatform_airflow_plugin/__init__.py
# 2. Build and publish
make build
uv publish --token "$PYPI_TOKEN"

# 3. Commit, tag, and push
git commit -am "Release v0.1.1"
git tag v0.1.1
git push origin main && git push origin v0.1.1

Troubleshooting

ModuleNotFoundError: dataplatform_airflow_plugin

  1. Confirm the package is installed:
    pip show dataplatform-airflow-plugin
    
  2. On Kubernetes, check that the pod successfully ran pip install at startup:
    kubectl -n airflow logs deploy/airflow-scheduler -c scheduler | grep -iE "pip|dataplatform"
    
  3. When using _PIP_ADDITIONAL_REQUIREMENTS, pip may cache an older build if the version in pyproject.toml is unchanged. Bump the version (e.g. 0.1.10.1.2) or switch to a custom image — see Kubernetes (production).

Task pods (KubernetesExecutor) do not see the plugin

  • Ensure images.pod_template uses the same image as the scheduler and workers.
  • Alternatively, set _PIP_ADDITIONAL_REQUIREMENTS at the top-level env: so it propagates to task pods.

Token request fails (HTTP 401/403)

  • Confirm credentials are configured through the expected mechanism — see Configuration.
  • Verify the client_id and client_secret in the VNG Cloud IAM console.
  • Make sure the client_id, client_secret, and iam_host all belong to the same environment (dev or prod).

VNG Cloud credentials are not configured

The plugin could not locate credentials through any of the three mechanisms. Check each in priority order:

airflow connections get vng_cloud_default     # Option 1
airflow variables get vng_client_id           # Option 2
echo $VNG_CLIENT_ID                            # Option 3

DAG parse error: "Don't use runtime-varying value as argument in DAG constructor"

  • Avoid calls such as pendulum.today(), datetime.now(), or Variable.get(...) directly inside DAG(...) or Operator(...) arguments.
  • Use static values (pendulum.datetime(2026, 1, 1, tz="UTC")) or Jinja templates ("{{ ds }}", "{{ var.value.x }}").

License

Apache License 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dataplatform_airflow_plugin-0.1.2.tar.gz (177.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dataplatform_airflow_plugin-0.1.2-py3-none-any.whl (11.8 kB view details)

Uploaded Python 3

File details

Details for the file dataplatform_airflow_plugin-0.1.2.tar.gz.

File metadata

  • Download URL: dataplatform_airflow_plugin-0.1.2.tar.gz
  • Upload date:
  • Size: 177.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dataplatform_airflow_plugin-0.1.2.tar.gz
Algorithm Hash digest
SHA256 a4ea27cfd47029da8ac8fcee6145e02727bc97d0b07eeba425124d0dd5692c4c
MD5 6df27c32f065ba56088f1c7447e84d00
BLAKE2b-256 af54aba21f22b9037d4f061db7717483e2cbb04d12cbccc44bab8fe2a012062d

See more details on using hashes here.

File details

Details for the file dataplatform_airflow_plugin-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: dataplatform_airflow_plugin-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 11.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dataplatform_airflow_plugin-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 32bc5c0d194d817562ba3fd7ba3e087b60db030e132b6e41b29a1287b1d23784
MD5 3b459616efe7283d18986249723a2827
BLAKE2b-256 7bf9a843bd9f89bb27f2736c8f53b98495a12eee18d77af296e3dcecb13ea0cf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page