Apache Airflow plugin for orchestrating Spark Jobs on Data Platform.
Project description
DataPlatform Airflow Plugin
Apache Airflow plugin để trigger và quản lý Spark Job trên GreenNode Data Platform (VNG Cloud).
Plugin cung cấp DataPlatformOperator xử lý toàn bộ vòng đời job:
- Lấy IAM Access Token (OAuth2 client credentials, auto-refresh)
- Submit Spark Job run
- Poll status đến khi job kết thúc
- Cancel job nếu Airflow task bị kill
Installation
Từ PyPI (production):
pip install dataplatform-airflow-plugin
Từ source (local dev):
git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
pip install -e ".[dev]"
Verify đã cài thành công:
pip show dataplatform-airflow-plugin
python -c "from dataplatform_airflow_plugin import DataPlatformOperator, VNGCloudHook; print('OK')"
Hoặc check qua Airflow CLI:
airflow plugins
# Phải thấy dòng:
# dataplatform | dataplatform-airflow-plugin==0.1.0: EntryPoint(name='dataplatform', value='dataplatform_airflow_plugin.plugin:DataPlatformPlugin', group='airflow.plugins')
Cài trên Airflow Helm chart (dev)
Trong values.yaml / override.yaml:
env:
- name: _PIP_ADDITIONAL_REQUIREMENTS
value: "dataplatform-airflow-plugin==0.1.0"
helm upgrade <release> . -n <namespace> -f override.yaml
Helm sẽ rolling restart pods (scheduler/worker/triggerer/api-server) → mỗi pod khi start sẽ pip install plugin từ PyPI.
Cài trên Airflow Helm chart (production)
Build custom image — xem Kubernetes (Production) ở dưới.
Configuration
Plugin hỗ trợ 3 cách setup credentials, ưu tiên theo thứ tự sau:
Connection → Variable → Env var
Plugin tìm credentials ở Connection trước, nếu không có thì sang Variable, cuối cùng là env. Chọn 1 trong 3 tuỳ nhu cầu.
Cách 1 — Airflow Connection ⭐ (recommend cho production)
Password được mã hóa Fernet, có UI riêng, multi-environment dễ dàng (vng_dev, vng_prod...).
UI: Admin → Connections → Add
| Field | Giá trị |
|---|---|
| Connection Id | vng_cloud_default (default — có thể đổi qua vng_conn_id) |
| Connection Type | Generic |
| Host | https://dev-iam-proxy.dataplatform.vngcloud.tech |
| Login | <VNG_CLIENT_ID> |
| Password | <VNG_CLIENT_SECRET> |
| Extra (JSON) | (optional — xem dưới) |
Extra (JSON, optional — override default URL):
{
"data_platform_url": "https://dev-backend-proxy.dataplatform.vngcloud.tech",
"token_path": "/accounts-api/v2/auth/token",
"fe_url": "https://dev-app.dataplatform.vngcloud.tech"
}
Đặt qua Kubernetes Secret (Helm):
kubectl -n airflow create secret generic vng-cloud-conn \
--from-literal=AIRFLOW_CONN_VNG_CLOUD_DEFAULT='{"conn_type":"generic","host":"https://dev-iam-proxy.dataplatform.vngcloud.tech","login":"<CID>","password":"<CSECRET>","extra":{"data_platform_url":"https://dev-backend-proxy.dataplatform.vngcloud.tech"}}'
# override.yaml
env:
- name: AIRFLOW_CONN_VNG_CLOUD_DEFAULT
valueFrom:
secretKeyRef:
name: vng-cloud-conn
key: AIRFLOW_CONN_VNG_CLOUD_DEFAULT
Cách 2 — Airflow Variables (đơn giản, giống iomete)
Setup nhanh không cần hiểu Connection, nhưng không encrypt mặc định. Phù hợp dev hoặc team nhỏ.
UI: Admin → Variables → Add
| Variable name | Required | Default |
|---|---|---|
vng_client_id |
✅ | — |
vng_client_secret |
✅ | — |
vng_iam_host |
❌ | https://dev-iam-proxy.dataplatform.vngcloud.tech |
vng_token_path |
❌ | /accounts-api/v2/auth/token |
vng_data_platform_url |
❌ | https://dev-backend-proxy.dataplatform.vngcloud.tech/ |
vng_fe_url |
❌ | https://dev-app.dataplatform.vngcloud.tech |
Set qua CLI:
airflow variables set vng_client_id "<CLIENT_ID>"
airflow variables set vng_client_secret "<CLIENT_SECRET>"
Cách 3 — Env var (local dev, không khuyến khích production)
export VNG_CLIENT_ID="..."
export VNG_CLIENT_SECRET="..."
Chỉ override 2 credentials cốt lõi. Để override URL endpoints, dùng Connection hoặc Variable.
Usage
from datetime import datetime
from airflow import DAG
from dataplatform_airflow_plugin import DataPlatformOperator
with DAG(
dag_id="example_dataplatform_spark_job",
start_date=datetime(2026, 1, 1),
schedule=None,
catchup=False,
tags=["dataplatform"],
) as dag:
run_job = DataPlatformOperator(
task_id="run_spark_job",
workspace_id="ws-abc-123",
job_id="job-xyz-456",
application_args=["--date", "{{ ds }}", "--mode", "prod"],
polling_period_seconds=15,
do_xcom_push=True,
)
Xem thêm dags/example_dataplatform.py (full) hoặc dags/example_dataplatform_minimal.py (minimal).
DataPlatformOperator parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
workspace_id |
✅ | — | Workspace ID (templated) |
job_id |
✅ | — | Spark Job ID (templated) |
application_args |
❌ | [""] |
list[str], dict, hoặc JSON string (templated) |
vng_conn_id |
❌ | vng_cloud_default |
Airflow Connection ID |
token_url |
❌ | từ Connection / default | Override IAM token endpoint |
data_platform_url |
❌ | từ Connection / default | Override Data Platform base URL |
polling_period_seconds |
❌ | 15 |
Thời gian giữa các lần poll status |
do_xcom_push |
❌ | False |
Push workspace_id, job_id, run_id qua XCom |
Templated fields: workspace_id, job_id, application_args — hỗ trợ Jinja {{ ds }}, {{ params.x }}, {{ var.value.* }}.
Template extension: .json — file .json sẽ được auto-load.
Logs URL: khi job kết thúc (success/failed), operator sẽ log link tới UI Data Platform để xem logs:
[INFO] View logs on Data Platform UI: https://dev-app.dataplatform.vngcloud.tech/workspaces/<ws>/jobs/<job>
Override URL base qua Connection extra fe_url hoặc Variable vng_fe_url khi chuyển sang prod.
XCom keys
| Key | Constant | Mô tả |
|---|---|---|
workspace_id |
XCOM_WORKSPACE_ID_KEY |
Workspace ID |
job_id |
XCOM_JOB_ID_KEY |
Spark Job ID |
run_id |
XCOM_RUN_ID_KEY |
Run ID |
Spark Job states
from dataplatform_airflow_plugin import SparkJobState
SparkJobState.SUCCESS.is_final # True
SparkJobState.SUCCESS.is_successful # True
SparkJobState.RUNNING.is_final # False
| State | Final? | Success? |
|---|---|---|
QUEUING |
❌ | — |
SCHEDULING |
❌ | — |
PENDING |
❌ | — |
RUNNING |
❌ | — |
SUCCESS |
✅ | ✅ |
FAILED |
✅ | ❌ |
CANCELLED |
✅ | ❌ |
Kubernetes (Production)
Khi release ổn định, không nên dùng _PIP_ADDITIONAL_REQUIREMENTS (chậm pod start, phụ thuộc network mỗi lần khởi động). Build custom image:
FROM apache/airflow:3.2.0
RUN pip install --no-cache-dir dataplatform-airflow-plugin==0.1.0
docker build -t <registry>/airflow-dataplatform:0.1.0 .
docker push <registry>/airflow-dataplatform:0.1.0
# override.yaml
defaultAirflowRepository: <registry>/airflow-dataplatform
defaultAirflowTag: "0.1.0"
images:
airflow:
repository: <registry>/airflow-dataplatform
tag: "0.1.0"
pullPolicy: IfNotPresent
pod_template:
repository: <registry>/airflow-dataplatform
tag: "0.1.0"
pullPolicy: IfNotPresent
KubernetesExecutor: image
pod_templatecũng phải chứa plugin (task pod sinh từ template này).
Development
git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
make install # uv pip install -e ".[dev]"
make test # pytest
make lint # ruff + mypy
make format # black + isort
make build # build wheel
Project structure
dataplatform-airflow-plugin/
├── dataplatform_airflow_plugin/
│ ├── __init__.py # Package metadata, public exports
│ ├── plugin.py # AirflowPlugin registration (hooks + operators)
│ ├── dataplatform_operator.py # DataPlatformOperator + SparkJobState
│ └── hook.py # VNGCloudHook (IAM, Data Platform API)
├── dags/
│ ├── example_dataplatform.py # Full example (templating, Variables)
│ └── example_dataplatform_minimal.py # Minimal example
├── tests/
│ ├── test_state.py
│ └── test_operator.py
├── Makefile
├── pyproject.toml
└── README.md
Release workflow
# 1. Bump version trong pyproject.toml (ví dụ 0.1.0 → 0.1.1)
# 2. Build + publish
make build
uv publish --token "$PYPI_TOKEN"
# 3. Commit + tag + push
git commit -am "Release v0.1.1"
git tag v0.1.1
git push origin main && git push origin v0.1.1
Troubleshooting
ModuleNotFoundError: dataplatform_airflow_plugin
- Verify package đã cài:
pip show dataplatform-airflow-plugin
- Trên Kubernetes, check pod có install được không (lúc start mới chạy
pip install):kubectl -n airflow logs deploy/airflow-scheduler -c scheduler | grep -iE "pip|dataplatform"
- Nếu dùng
_PIP_ADDITIONAL_REQUIREMENTSmà version trongpyproject.tomlkhông bump, pip có thể cache → restart pod không reinstall. Bump version (vd0.1.0→0.1.1) hoặc đổi sang Docker image-based (xem Kubernetes (Production)).
Task pod (KubernetesExecutor) không có plugin
- Đảm bảo
images.pod_templatecùng image với scheduler/worker. - Hoặc set env
_PIP_ADDITIONAL_REQUIREMENTSở top-levelenv:để propagate xuống task pod.
Token request fail (401/403)
- Kiểm tra credentials đúng theo cách bạn setup (Connection / Variable / env). Xem priority order ở Configuration.
- Verify
client_id/client_secrettrong VNG Cloud IAM console. - Xác minh cả
client_idlẫnclient_secretthuộc cùng environment (dev / prod) và cùngiam_host.
VNG Cloud credentials are not configured
Plugin không tìm được credentials qua bất kỳ cách nào trong 3. Check theo thứ tự ưu tiên:
airflow connections get vng_cloud_default # Cách 1
airflow variables get vng_client_id # Cách 2
echo $VNG_CLIENT_ID # Cách 3
Lỗi parse DAG: "Don't use runtime-varying value as argument in Dag constructor"
- Không dùng
pendulum.today(),datetime.now(),Variable.get(...)trực tiếp trongDAG(...)/Operator(...)args. - Dùng giá trị tĩnh (
pendulum.datetime(2026, 1, 1, tz="UTC")) hoặc Jinja template ("{{ ds }}","{{ var.value.x }}").
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dataplatform_airflow_plugin-0.1.0.tar.gz.
File metadata
- Download URL: dataplatform_airflow_plugin-0.1.0.tar.gz
- Upload date:
- Size: 177.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
16f0f213f355e7b549997deb5ac914eed412f4ea78ea7fa4c91e559228097d8d
|
|
| MD5 |
2f51a42f3d9123ebd62ba8253c687a24
|
|
| BLAKE2b-256 |
f8c260b8bc4946392fab7618d22dd641cefb56ddbf26eabfdb41150670471a23
|
File details
Details for the file dataplatform_airflow_plugin-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dataplatform_airflow_plugin-0.1.0-py3-none-any.whl
- Upload date:
- Size: 11.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
89154dbb6aab760cc12f6342b7a4182acb8869983f3c055421ff7d4a836eb375
|
|
| MD5 |
e584a05ebc6ca0104f11b8f1c43b85bd
|
|
| BLAKE2b-256 |
df06969532f2ddbf027604e316a1428fe3b80b6a79778e0fc6b20619a5a81044
|