Apache Airflow connector for Ocean for Apache Spark
Project description
Airflow connector to Ocean for Apache Spark
An Airflow plugin and provider to launch and monitor Spark applications on Ocean for Apache Spark.
Installation
pip install ocean-spark-airflow-provider
Usage
For general usage of Ocean for Apache Spark, refer to the official documentation.
Setting up the connection
In the connection menu, register a new connection of type Ocean for
Apache Spark. The default connection name is ocean_spark_default
. You will
need to have:
- The Ocean Spark cluster ID of the cluster you just created (of the
format
osc-e4089a00
). You can find it in the Spot console in the list of clusters, or by using the Cluster List API. - A Spot token to interact with the Spot API.
The Ocean for Apache Spark connection type is not available for Airflow 1, instead create an HTTP connection and fill your cluster id as host, and your API token as password.
You will need to create a separate connection for each Ocean Spark
cluster that you want to use with Airflow. In the
OceanSparkOperator
, you can select which Ocean Spark connection to
use with the connection_name
argument (defaults to
ocean_spark_default
). For example, you may choose to have one
Ocean Spark cluster per environment (dev, staging, prod), and you
can easily target an environment by picking the correct Airflow connection.
Using the Spark operator
from ocean_spark.operators import OceanSparkOperator
# DAG creation
spark_pi_task = OceanSparkOperator(
job_id="spark-pi",
task_id="compute-pi",
dag=dag,
config_overrides={
"type": "Scala",
"sparkVersion": "3.2.0",
"image": "gcr.io/datamechanics/spark:platform-3.2-latest",
"imagePullPolicy": "IfNotPresent",
"mainClass": "org.apache.spark.examples.SparkPi",
"mainApplicationFile": "local:///opt/spark/examples/jars/examples.jar",
"arguments": ["10000"],
"driver": {
"cores": 1,
"spot": false
},
"executor": {
"cores": 4,
"instances": 1,
"spot": true,
"instanceSelector": "r5"
},
},
)
Using the Spark Connect operator (available since airflow 2.6.2)
from airflow import DAG, utils
from ocean_spark.operators import (
OceanSparkConnectOperator,
)
args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": utils.dates.days_ago(0, second=1),
}
dag = DAG(dag_id="spark-connect-task", default_args=args, schedule_interval=None)
spark_pi_task = OceanSparkConnectOperator(
task_id="spark-connect",
dag=dag,
)
Trigger the DAG with config, such as
{
"sql": "select random()"
}
more examples are available for Airflow 2.
Test locally
You can test the plugin locally using the docker compose setup in this
repository. Run make serve_airflow
at the root of the repository to
launch an instance of Airflow 2 with the provider already installed.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file ocean_spark_airflow_provider-1.1.4.tar.gz
.
File metadata
- Download URL: ocean_spark_airflow_provider-1.1.4.tar.gz
- Upload date:
- Size: 12.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6db73eb59f77a4582e7a6933db444596420140596015b391ed16d6d147e4c863 |
|
MD5 | d2eb865df1fea7db61c9fc2e81afdbb5 |
|
BLAKE2b-256 | 79d276989946167a053307dcd061fdff682aabec3182354be3df9d1df17a5ae8 |
File details
Details for the file ocean_spark_airflow_provider-1.1.4-py3-none-any.whl
.
File metadata
- Download URL: ocean_spark_airflow_provider-1.1.4-py3-none-any.whl
- Upload date:
- Size: 16.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9dcd05da2cbd988c5c084e53da1fb9b201a71123167183d2c4c33f9bb48741d3 |
|
MD5 | f93498c199400ee149d98890870f767d |
|
BLAKE2b-256 | 079a2196110e185db76e3ece2a74875dfd37d2e9dbb96c919dd24edbb7978ce7 |