Skip to main content

Apache Airflow plugin for orchestrating Spark Jobs on Data Platform.

Project description

DataPlatform Airflow Plugin

Apache Airflow plugin để trigger và quản lý Spark Job trên GreenNode Data Platform (VNG Cloud).

Plugin cung cấp DataPlatformOperator xử lý toàn bộ vòng đời job:

  1. Lấy IAM Access Token (OAuth2 client credentials, auto-refresh)
  2. Submit Spark Job run
  3. Poll status đến khi job kết thúc
  4. Cancel job nếu Airflow task bị kill

Installation

Từ PyPI (production):

pip install dataplatform-airflow-plugin

Từ source (local dev):

git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
pip install -e ".[dev]"

Verify đã cài thành công:

pip show dataplatform-airflow-plugin
python -c "from dataplatform_airflow_plugin import DataPlatformOperator, VNGCloudHook; print('OK')"

Hoặc check qua Airflow CLI:

airflow plugins
# Phải thấy dòng:
# dataplatform | dataplatform-airflow-plugin==0.1.0: EntryPoint(name='dataplatform', value='dataplatform_airflow_plugin.plugin:DataPlatformPlugin', group='airflow.plugins')

Cài trên Airflow Helm chart (dev)

Trong values.yaml / override.yaml:

env:
  - name: _PIP_ADDITIONAL_REQUIREMENTS
    value: "dataplatform-airflow-plugin==0.1.0"
helm upgrade <release> . -n <namespace> -f override.yaml

Helm sẽ rolling restart pods (scheduler/worker/triggerer/api-server) → mỗi pod khi start sẽ pip install plugin từ PyPI.

Cài trên Airflow Helm chart (production)

Build custom image — xem Kubernetes (Production) ở dưới.


Configuration

Plugin hỗ trợ 3 cách setup credentials, ưu tiên theo thứ tự sau:

ConnectionVariableEnv var

Plugin tìm credentials ở Connection trước, nếu không có thì sang Variable, cuối cùng là env. Chọn 1 trong 3 tuỳ nhu cầu.


Cách 1 — Airflow Connection ⭐ (recommend cho production)

Password được mã hóa Fernet, có UI riêng, multi-environment dễ dàng (vng_dev, vng_prod...).

UI: Admin → Connections → Add

Field Giá trị
Connection Id vng_cloud_default (default — có thể đổi qua vng_conn_id)
Connection Type Generic
Host https://dev-iam-proxy.dataplatform.vngcloud.tech
Login <VNG_CLIENT_ID>
Password <VNG_CLIENT_SECRET>
Extra (JSON) (optional — xem dưới)

Extra (JSON, optional — override default URL):

{
  "data_platform_url": "https://dev-backend-proxy.dataplatform.vngcloud.tech",
  "token_path": "/accounts-api/v2/auth/token",
  "fe_url": "https://dev-app.dataplatform.vngcloud.tech"
}

Đặt qua Kubernetes Secret (Helm):

kubectl -n airflow create secret generic vng-cloud-conn \
  --from-literal=AIRFLOW_CONN_VNG_CLOUD_DEFAULT='{"conn_type":"generic","host":"https://dev-iam-proxy.dataplatform.vngcloud.tech","login":"<CID>","password":"<CSECRET>","extra":{"data_platform_url":"https://dev-backend-proxy.dataplatform.vngcloud.tech"}}'
# override.yaml
env:
  - name: AIRFLOW_CONN_VNG_CLOUD_DEFAULT
    valueFrom:
      secretKeyRef:
        name: vng-cloud-conn
        key: AIRFLOW_CONN_VNG_CLOUD_DEFAULT

Cách 2 — Airflow Variables (đơn giản, giống iomete)

Setup nhanh không cần hiểu Connection, nhưng không encrypt mặc định. Phù hợp dev hoặc team nhỏ.

UI: Admin → Variables → Add

Variable name Required Default
vng_client_id
vng_client_secret
vng_iam_host https://dev-iam-proxy.dataplatform.vngcloud.tech
vng_token_path /accounts-api/v2/auth/token
vng_data_platform_url https://dev-backend-proxy.dataplatform.vngcloud.tech/
vng_fe_url https://dev-app.dataplatform.vngcloud.tech

Set qua CLI:

airflow variables set vng_client_id "<CLIENT_ID>"
airflow variables set vng_client_secret "<CLIENT_SECRET>"

Cách 3 — Env var (local dev, không khuyến khích production)

export VNG_CLIENT_ID="..."
export VNG_CLIENT_SECRET="..."

Chỉ override 2 credentials cốt lõi. Để override URL endpoints, dùng Connection hoặc Variable.


Usage

from datetime import datetime
from airflow import DAG
from dataplatform_airflow_plugin import DataPlatformOperator

with DAG(
    dag_id="example_dataplatform_spark_job",
    start_date=datetime(2026, 1, 1),
    schedule=None,
    catchup=False,
    tags=["dataplatform"],
) as dag:

    run_job = DataPlatformOperator(
        task_id="run_spark_job",
        workspace_id="ws-abc-123",
        job_id="job-xyz-456",
        application_args=["--date", "{{ ds }}", "--mode", "prod"],
        polling_period_seconds=15,
        do_xcom_push=True,
    )

Xem thêm dags/example_dataplatform.py (full) hoặc dags/example_dataplatform_minimal.py (minimal).

DataPlatformOperator parameters

Parameter Required Default Description
workspace_id Workspace ID (templated)
job_id Spark Job ID (templated)
application_args [""] list[str], dict, hoặc JSON string (templated)
vng_conn_id vng_cloud_default Airflow Connection ID
token_url từ Connection / default Override IAM token endpoint
data_platform_url từ Connection / default Override Data Platform base URL
polling_period_seconds 15 Thời gian giữa các lần poll status
do_xcom_push False Push workspace_id, job_id, run_id qua XCom

Templated fields: workspace_id, job_id, application_args — hỗ trợ Jinja {{ ds }}, {{ params.x }}, {{ var.value.* }}.

Template extension: .json — file .json sẽ được auto-load.

Logs URL: khi job kết thúc (success/failed), operator sẽ log link tới UI Data Platform để xem logs:

[INFO] View logs on Data Platform UI: https://dev-app.dataplatform.vngcloud.tech/workspaces/<ws>/jobs/<job>

Override URL base qua Connection extra fe_url hoặc Variable vng_fe_url khi chuyển sang prod.

XCom keys

Key Constant Mô tả
workspace_id XCOM_WORKSPACE_ID_KEY Workspace ID
job_id XCOM_JOB_ID_KEY Spark Job ID
run_id XCOM_RUN_ID_KEY Run ID

Spark Job states

from dataplatform_airflow_plugin import SparkJobState

SparkJobState.SUCCESS.is_final        # True
SparkJobState.SUCCESS.is_successful   # True
SparkJobState.RUNNING.is_final        # False
State Final? Success?
QUEUING
SCHEDULING
PENDING
RUNNING
SUCCESS
FAILED
CANCELLED

Kubernetes (Production)

Khi release ổn định, không nên dùng _PIP_ADDITIONAL_REQUIREMENTS (chậm pod start, phụ thuộc network mỗi lần khởi động). Build custom image:

FROM apache/airflow:3.2.0
RUN pip install --no-cache-dir dataplatform-airflow-plugin==0.1.0
docker build -t <registry>/airflow-dataplatform:0.1.0 .
docker push <registry>/airflow-dataplatform:0.1.0
# override.yaml
defaultAirflowRepository: <registry>/airflow-dataplatform
defaultAirflowTag: "0.1.0"

images:
  airflow:
    repository: <registry>/airflow-dataplatform
    tag: "0.1.0"
    pullPolicy: IfNotPresent
  pod_template:
    repository: <registry>/airflow-dataplatform
    tag: "0.1.0"
    pullPolicy: IfNotPresent

KubernetesExecutor: image pod_template cũng phải chứa plugin (task pod sinh từ template này).


Development

git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
make install          # uv pip install -e ".[dev]"
make test             # pytest
make lint             # ruff + mypy
make format           # black + isort
make build            # build wheel

Project structure

dataplatform-airflow-plugin/
├── dataplatform_airflow_plugin/
│   ├── __init__.py                       # Package metadata, public exports
│   ├── plugin.py                         # AirflowPlugin registration (hooks + operators)
│   ├── dataplatform_operator.py          # DataPlatformOperator + SparkJobState
│   └── hook.py                           # VNGCloudHook (IAM, Data Platform API)
├── dags/
│   ├── example_dataplatform.py           # Full example (templating, Variables)
│   └── example_dataplatform_minimal.py   # Minimal example
├── tests/
│   ├── test_state.py
│   └── test_operator.py
├── Makefile
├── pyproject.toml
└── README.md

Release workflow

# 1. Bump version trong pyproject.toml (ví dụ 0.1.0 → 0.1.1)
# 2. Build + publish
make build
uv publish --token "$PYPI_TOKEN"

# 3. Commit + tag + push
git commit -am "Release v0.1.1"
git tag v0.1.1
git push origin main && git push origin v0.1.1

Troubleshooting

ModuleNotFoundError: dataplatform_airflow_plugin

  1. Verify package đã cài:
    pip show dataplatform-airflow-plugin
    
  2. Trên Kubernetes, check pod có install được không (lúc start mới chạy pip install):
    kubectl -n airflow logs deploy/airflow-scheduler -c scheduler | grep -iE "pip|dataplatform"
    
  3. Nếu dùng _PIP_ADDITIONAL_REQUIREMENTS mà version trong pyproject.toml không bump, pip có thể cache → restart pod không reinstall. Bump version (vd 0.1.00.1.1) hoặc đổi sang Docker image-based (xem Kubernetes (Production)).

Task pod (KubernetesExecutor) không có plugin

  • Đảm bảo images.pod_template cùng image với scheduler/worker.
  • Hoặc set env _PIP_ADDITIONAL_REQUIREMENTS ở top-level env: để propagate xuống task pod.

Token request fail (401/403)

  • Kiểm tra credentials đúng theo cách bạn setup (Connection / Variable / env). Xem priority order ở Configuration.
  • Verify client_id / client_secret trong VNG Cloud IAM console.
  • Xác minh cả client_id lẫn client_secret thuộc cùng environment (dev / prod) và cùng iam_host.

VNG Cloud credentials are not configured

Plugin không tìm được credentials qua bất kỳ cách nào trong 3. Check theo thứ tự ưu tiên:

airflow connections get vng_cloud_default     # Cách 1
airflow variables get vng_client_id           # Cách 2
echo $VNG_CLIENT_ID                            # Cách 3

Lỗi parse DAG: "Don't use runtime-varying value as argument in Dag constructor"

  • Không dùng pendulum.today(), datetime.now(), Variable.get(...) trực tiếp trong DAG(...) / Operator(...) args.
  • Dùng giá trị tĩnh (pendulum.datetime(2026, 1, 1, tz="UTC")) hoặc Jinja template ("{{ ds }}", "{{ var.value.x }}").

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dataplatform_airflow_plugin-0.1.0.tar.gz (177.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dataplatform_airflow_plugin-0.1.0-py3-none-any.whl (11.9 kB view details)

Uploaded Python 3

File details

Details for the file dataplatform_airflow_plugin-0.1.0.tar.gz.

File metadata

  • Download URL: dataplatform_airflow_plugin-0.1.0.tar.gz
  • Upload date:
  • Size: 177.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dataplatform_airflow_plugin-0.1.0.tar.gz
Algorithm Hash digest
SHA256 16f0f213f355e7b549997deb5ac914eed412f4ea78ea7fa4c91e559228097d8d
MD5 2f51a42f3d9123ebd62ba8253c687a24
BLAKE2b-256 f8c260b8bc4946392fab7618d22dd641cefb56ddbf26eabfdb41150670471a23

See more details on using hashes here.

File details

Details for the file dataplatform_airflow_plugin-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dataplatform_airflow_plugin-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dataplatform_airflow_plugin-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 89154dbb6aab760cc12f6342b7a4182acb8869983f3c055421ff7d4a836eb375
MD5 e584a05ebc6ca0104f11b8f1c43b85bd
BLAKE2b-256 df06969532f2ddbf027604e316a1428fe3b80b6a79778e0fc6b20619a5a81044

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page