Vineyard provider for apache-airflow
Project description
Apache Airflow Provider for Vineyard
The apache airflow provider for vineyard contains components to sharing intermediate data among tasks in Airflow workflows using Vineyard.
Vineyard works as a XCom backend for airflow workers to allow transferring large-scale data objects between tasks that cannot be fit into the Airflow's database backend without involving external storage systems like HDFS. The Vineyard XCom backend handles object migration as well when the required inputs is not located on where the task is scheduled to execute.
Table of Contents
Requirements
The following packages are needed to run Airflow on Vineyard,
- airflow >= 2.1.0
- vineyard >= 0.2.12
Configuration and Usage
-
Install required packages:
pip3 install airflow-provider-vineyard
-
Configure Vineyard locally
The vineyard server can be easier launched locally with the following command:
python3 -m vineyard --socket=/tmp/vineyard.sock
-
Configure Airflow to use the vineyard XCom backend by specifying the environment variable
export AIRFLOW__CORE__XCOM_BACKEND=vineyard.contrib.airflow.xcom.VineyardXCom
and configure the location of UNIX-domain IPC socket for vineyard client by
export AIRFLOW__VINEYARD__IPC_SOCKET=/tmp/vineyard.sock
or
export VINEYARD_IPC_SOCKET=/tmp/vineyard.sock
-
Launching your airflow scheduler and workers, and run the following DAG as example,
```python import numpy as np import pandas as pd from airflow.decorators import dag, task from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', } @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def taskflow_etl_pandas(): @task() def extract(): order_data_dict = pd.DataFrame({ 'a': np.random.rand(100000), 'b': np.random.rand(100000) }) return order_data_dict @task(multiple_outputs=True) def transform(order_data_dict: dict): return {"total_order_value": order_data_dict["a"].sum()} @task() def load(total_order_value: float): print(f"Total order value is: {total_order_value:.2f}") order_data = extract() order_summary = transform(order_data) load(order_summary["total_order_value"]) taskflow_etl_pandas_dag = taskflow_etl_pandas() ```
In above example, task :code:extract
and task :code:transform
shares a
:code:pandas.DataFrame
as the intermediate data, which is impossible as
it cannot be pickled and when the data is large, it cannot be fit into the
table in backend databases of Airflow.
The example is adapted from the documentation of Airflow, see also Tutorial on the Taskflow API.
Run the tests
-
Start your vineyardd with the following command,
python3 -m vineyard
-
Set airflow to use the vineyard XCom backend, and run tests with pytest,
export AIRFLOW__CORE__XCOM_BACKEND=vineyard.contrib.airflow.xcom.VineyardXCom pytest -s -vvv python/vineyard/contrib/airflow/tests/test_python_dag.py pytest -s -vvv python/vineyard/contrib/airflow/tests/test_pandas_dag.py
The pandas test suite is not possible to run with the default XCom backend, vineyard enables airflow to exchange complex and big data without modify the DAG and tasks!
Deploy on Kubernetes
We provide a reference settings (see values.yaml) for deploying Airflow with vineyard as the XCom backend on Kubernetes, based on the official helm charts.
Deploying vineyard requires etcd, to ease to deploy process, you first need to setup a standalone etcd cluster. A test etcd cluster with only one instance can be deployed by
$ kubectl create -f etcd.yaml
The values.yaml mainly tweaks the following settings:
- Installing vineyard dependency to the containers using pip before start workers
- Adding a vineyardd container to the airflow pods
- Mounting the vineyardd's UNIX-domain socket and shared memory to the airflow worker pods
Note that the values.yaml
may doesn't work in your environment, as airflow requires
other settings like postgresql database, presistance volumes, etc. You can combine
the reference values.yaml
with your own specific Airflow settings.
The values.yaml for Airflow's helm chart can be used as
# add airflow helm stable repo
$ helm repo add apache-airflow https://airflow.apache.org
$ helm repo update
# deploy airflow
$ helm install -f values.yaml $RELEASE_NAME apache-airflow/airflow --namespace $NAMESPACE
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for airflow_provider_vineyard-0.5.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5b223efdc0b89d58b662d476026d15989b360e83048b0493b2bf842e3defc3c6 |
|
MD5 | e141f06150a87abc6d220ea9556601a6 |
|
BLAKE2b-256 | 9f88971b4641576239936911a9daa824fa36437dfebaff5674ee9a657b780f63 |