Apache Airflow plugin for orchestrating Spark Jobs on Data Platform.
Project description
DataPlatform Airflow Plugin
An Apache Airflow plugin for orchestrating Spark Jobs on Data Platform (VNG Cloud).
The plugin exposes a single DataPlatformOperator that manages the full lifecycle of a Spark Job:
- Acquire an IAM access token via OAuth2 client credentials (with automatic refresh).
- Submit a Spark Job run to the Data Platform API.
- Poll the job state until it reaches a terminal state.
- Cancel the active run when the Airflow task is killed.
Requirements
- Python 3.10 or newer
- Apache Airflow 2.8 or newer (Airflow 3 supported)
- A Data Platform workspace with a registered Spark Job
- VNG Cloud IAM client credentials (client ID and client secret)
Installation
From PyPI
pip install dataplatform-airflow-plugin
From source
git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
pip install -e ".[dev]"
Verify the installation
pip show dataplatform-airflow-plugin
python -c "from dataplatform_airflow_plugin import DataPlatformOperator, VNGCloudHook; print('OK')"
Confirm the plugin is registered with Airflow:
airflow plugins
# Expected output:
# dataplatform | dataplatform-airflow-plugin==<version>: EntryPoint(name='dataplatform', value='dataplatform_airflow_plugin.plugin:DataPlatformPlugin', group='airflow.plugins')
Install on the Airflow Helm chart (development)
In values.yaml or an override file:
env:
- name: _PIP_ADDITIONAL_REQUIREMENTS
value: "dataplatform-airflow-plugin==0.1.1"
helm upgrade <release> . -n <namespace> -f override.yaml
Helm performs a rolling restart of the scheduler, workers, triggerer, and api-server pods. Each pod runs pip install on startup and pulls the plugin from PyPI.
Install on the Airflow Helm chart (production)
Bake the plugin into a custom image — see Kubernetes (production).
Configuration
The plugin supports three credential sources, resolved in the following priority order:
Airflow Connection → Airflow Variables → Environment variables
Pick the option that best fits your environment.
Option 1 — Airflow Connection (recommended for production)
Credentials are Fernet-encrypted at rest, managed through the Airflow UI, and easy to scope per environment (vng_dev, vng_prod, ...).
UI: Admin → Connections → Add
| Field | Value |
|---|---|
| Connection Id | vng_cloud_default (default — overridable via vng_conn_id) |
| Connection Type | Generic |
| Host | https://dev-iam-proxy.dataplatform.vngcloud.tech |
| Login | <VNG_CLIENT_ID> |
| Password | <VNG_CLIENT_SECRET> |
| Extra (JSON) | (optional — see below) |
Extra (optional JSON to override default endpoints):
{
"data_platform_url": "https://dev-backend-proxy.dataplatform.vngcloud.tech",
"token_path": "/accounts-api/v2/auth/token",
"fe_url": "https://dev-app.dataplatform.vngcloud.tech"
}
Provisioning via a Kubernetes Secret:
kubectl -n airflow create secret generic vng-cloud-conn \
--from-literal=AIRFLOW_CONN_VNG_CLOUD_DEFAULT='{"conn_type":"generic","host":"https://dev-iam-proxy.dataplatform.vngcloud.tech","login":"<CID>","password":"<CSECRET>","extra":{"data_platform_url":"https://dev-backend-proxy.dataplatform.vngcloud.tech"}}'
# override.yaml
env:
- name: AIRFLOW_CONN_VNG_CLOUD_DEFAULT
valueFrom:
secretKeyRef:
name: vng-cloud-conn
key: AIRFLOW_CONN_VNG_CLOUD_DEFAULT
Option 2 — Airflow Variables
Quicker to set up, but not encrypted by default. Suitable for development or small teams.
UI: Admin → Variables → Add
| Variable name | Required | Default |
|---|---|---|
vng_client_id |
Yes | — |
vng_client_secret |
Yes | — |
vng_iam_host |
No | https://dev-iam-proxy.dataplatform.vngcloud.tech |
vng_token_path |
No | /accounts-api/v2/auth/token |
vng_data_platform_url |
No | https://dev-backend-proxy.dataplatform.vngcloud.tech/ |
vng_fe_url |
No | https://dev-app.dataplatform.vngcloud.tech |
CLI:
airflow variables set vng_client_id "<CLIENT_ID>"
airflow variables set vng_client_secret "<CLIENT_SECRET>"
Option 3 — Environment variables (local development)
export VNG_CLIENT_ID="..."
export VNG_CLIENT_SECRET="..."
Only the two core credentials are read from the environment. To override endpoint URLs, use Option 1 or 2.
Usage
from datetime import datetime
from airflow import DAG
from dataplatform_airflow_plugin import DataPlatformOperator
with DAG(
dag_id="example_dataplatform_spark_job",
start_date=datetime(2026, 1, 1),
schedule=None,
catchup=False,
tags=["dataplatform"],
) as dag:
run_job = DataPlatformOperator(
task_id="run_spark_job",
workspace_id="ws-abc-123",
job_id="job-xyz-456",
application_args=["--date", "{{ ds }}", "--mode", "prod"],
polling_period_seconds=15,
do_xcom_push=True,
)
See dags/example_dataplatform.py for a complete example with templating and Variables, or dags/example_dataplatform_minimal.py for the minimal form.
DataPlatformOperator parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
workspace_id |
Yes | — | Workspace identifier (templated). |
job_id |
Yes | — | Spark Job identifier (templated). |
application_args |
No | [""] |
list[str], dict, or JSON string (templated). |
vng_conn_id |
No | vng_cloud_default |
Airflow Connection identifier. |
token_url |
No | From Connection / default | Overrides the IAM token endpoint. |
data_platform_url |
No | From Connection / default | Overrides the Data Platform base URL. |
polling_period_seconds |
No | 15 |
Interval between job-state polls, in seconds. |
do_xcom_push |
No | False |
Push workspace_id, job_id, and run_id to XCom. |
Templated fields: workspace_id, job_id, application_args — Jinja expressions such as {{ ds }}, {{ params.x }}, and {{ var.value.* }} are supported.
Template extensions: .json files referenced through application_args are auto-loaded.
Job UI link: when the job reaches a terminal state, the operator logs a link to the Data Platform UI:
[INFO] View logs on Data Platform UI: https://dev-app.dataplatform.vngcloud.tech/workspaces/<ws>/jobs/<job>
Override the UI base URL via the Connection's fe_url extra or the vng_fe_url Variable when promoting to production.
XCom keys
| Key | Constant | Description |
|---|---|---|
workspace_id |
XCOM_WORKSPACE_ID_KEY |
Workspace identifier. |
job_id |
XCOM_JOB_ID_KEY |
Spark Job identifier. |
run_id |
XCOM_RUN_ID_KEY |
Run identifier. |
Spark Job states
from dataplatform_airflow_plugin import SparkJobState
SparkJobState.SUCCESS.is_final # True
SparkJobState.SUCCESS.is_successful # True
SparkJobState.RUNNING.is_final # False
| State | Final | Successful |
|---|---|---|
QUEUING |
No | — |
SCHEDULING |
No | — |
PENDING |
No | — |
RUNNING |
No | — |
SUCCESS |
Yes | Yes |
FAILED |
Yes | No |
CANCELLED |
Yes | No |
Kubernetes (production)
For stable releases, avoid _PIP_ADDITIONAL_REQUIREMENTS — it slows pod startup and depends on network availability at every restart. Build a custom image instead:
FROM apache/airflow:3.2.0
RUN pip install --no-cache-dir dataplatform-airflow-plugin==0.1.1
docker build -t <registry>/airflow-dataplatform:0.1.1 .
docker push <registry>/airflow-dataplatform:0.1.1
# override.yaml
defaultAirflowRepository: <registry>/airflow-dataplatform
defaultAirflowTag: "0.1.1"
images:
airflow:
repository: <registry>/airflow-dataplatform
tag: "0.1.1"
pullPolicy: IfNotPresent
pod_template:
repository: <registry>/airflow-dataplatform
tag: "0.1.1"
pullPolicy: IfNotPresent
KubernetesExecutor: the
pod_templateimage must also include the plugin — task pods are spawned from this template.
Development
git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
make install # uv pip install -e ".[dev]"
make test # pytest
make lint # ruff + mypy
make format # black + isort
make build # build the wheel
Project layout
dataplatform-airflow-plugin/
├── dataplatform_airflow_plugin/
│ ├── __init__.py # Package metadata and public exports
│ ├── plugin.py # AirflowPlugin registration (hooks + operators)
│ ├── dataplatform_operator.py # DataPlatformOperator + SparkJobState
│ └── hook.py # VNGCloudHook (IAM, Data Platform API)
├── dags/
│ ├── example_dataplatform.py # Full example with templating and Variables
│ └── example_dataplatform_minimal.py # Minimal example
├── tests/
│ ├── test_state.py
│ └── test_operator.py
├── Makefile
├── pyproject.toml
└── README.md
Release workflow
# 1. Bump the version in pyproject.toml and dataplatform_airflow_plugin/__init__.py
# 2. Build and publish
make build
uv publish --token "$PYPI_TOKEN"
# 3. Commit, tag, and push
git commit -am "Release v0.1.1"
git tag v0.1.1
git push origin main && git push origin v0.1.1
Troubleshooting
ModuleNotFoundError: dataplatform_airflow_plugin
- Confirm the package is installed:
pip show dataplatform-airflow-plugin
- On Kubernetes, check that the pod successfully ran
pip installat startup:kubectl -n airflow logs deploy/airflow-scheduler -c scheduler | grep -iE "pip|dataplatform"
- When using
_PIP_ADDITIONAL_REQUIREMENTS, pip may cache an older build if the version inpyproject.tomlis unchanged. Bump the version (e.g.0.1.1→0.1.2) or switch to a custom image — see Kubernetes (production).
Task pods (KubernetesExecutor) do not see the plugin
- Ensure
images.pod_templateuses the same image as the scheduler and workers. - Alternatively, set
_PIP_ADDITIONAL_REQUIREMENTSat the top-levelenv:so it propagates to task pods.
Token request fails (HTTP 401/403)
- Confirm credentials are configured through the expected mechanism — see Configuration.
- Verify the
client_idandclient_secretin the VNG Cloud IAM console. - Make sure the
client_id,client_secret, andiam_hostall belong to the same environment (dev or prod).
VNG Cloud credentials are not configured
The plugin could not locate credentials through any of the three mechanisms. Check each in priority order:
airflow connections get vng_cloud_default # Option 1
airflow variables get vng_client_id # Option 2
echo $VNG_CLIENT_ID # Option 3
DAG parse error: "Don't use runtime-varying value as argument in DAG constructor"
- Avoid calls such as
pendulum.today(),datetime.now(), orVariable.get(...)directly insideDAG(...)orOperator(...)arguments. - Use static values (
pendulum.datetime(2026, 1, 1, tz="UTC")) or Jinja templates ("{{ ds }}","{{ var.value.x }}").
License
Apache License 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dataplatform_airflow_plugin-0.1.1.tar.gz.
File metadata
- Download URL: dataplatform_airflow_plugin-0.1.1.tar.gz
- Upload date:
- Size: 177.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
723b0667c7dc94ce4486d3f75f397957afbd3bea6d9056ae43dad08970921872
|
|
| MD5 |
a487eadca7fc23f9ad5cbd3fd71c0bd9
|
|
| BLAKE2b-256 |
2c4b6d10ea05236a8ed132fc91f5865f3b310759eb254f1bf9a70190f0690993
|
File details
Details for the file dataplatform_airflow_plugin-0.1.1-py3-none-any.whl.
File metadata
- Download URL: dataplatform_airflow_plugin-0.1.1-py3-none-any.whl
- Upload date:
- Size: 11.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c711f5fff71e8592c59faf69316cd2f0f8585fd1d5e5af2c1c7ae4810aea0686
|
|
| MD5 |
2597b9d2123477f78e03cb2f1f95be2c
|
|
| BLAKE2b-256 |
d4eb314dc003243665e5f2ce97f2ee5ddb33d05ffbcd5a40c260cd106a0e5105
|