Skip to main content

Apache Airflow connector for Ocean for Apache Spark

Project description

Airflow connector to Ocean for Apache Spark

An Airflow plugin and provider to launch and monitor Spark applications on Ocean for Apache Spark.

Installation

pip install ocean-spark-airflow-provider

Usage

For general usage of Ocean for Apache Spark, refer to the official documentation.

Setting up the connection

In the connection menu, register a new connection of type Ocean for Apache Spark. The default connection name is ocean_spark_default. You will need to have:

  • The Ocean Spark cluster ID of the cluster you just created (of the format osc-e4089a00). You can find it in the Spot console in the list of clusters, or by using the Cluster List API.
  • A Spot token to interact with the Spot API.

connection setup dialog

The Ocean for Apache Spark connection type is not available for Airflow 1, instead create an HTTP connection and fill your cluster id as host, and your API token as password.

You will need to create a separate connection for each Ocean Spark cluster that you want to use with Airflow. In the OceanSparkOperator, you can select which Ocean Spark connection to use with the connection_name argument (defaults to ocean_spark_default). For example, you may choose to have one Ocean Spark cluster per environment (dev, staging, prod), and you can easily target an environment by picking the correct Airflow connection.

Using the Spark operator

from ocean_spark.operators import OceanSparkOperator

# DAG creation

spark_pi_task = OceanSparkOperator(
    job_id="spark-pi",
    task_id="compute-pi",
    dag=dag,
    config_overrides={
        "type": "Scala",
        "sparkVersion": "3.2.0",
        "image": "gcr.io/datamechanics/spark:platform-3.2-latest",
        "imagePullPolicy": "IfNotPresent",
        "mainClass": "org.apache.spark.examples.SparkPi",
        "mainApplicationFile": "local:///opt/spark/examples/jars/examples.jar",
        "arguments": ["10000"],
        "driver": {
            "cores": 1,
            "spot": false
        },
        "executor": {
            "cores": 4,
            "instances": 1,
            "spot": true,
            "instanceSelector": "r5"
        },
    },
)

Using the Spark Connect operator (available since airflow 2.6.2)

from airflow import DAG, utils
from ocean_spark.operators import (
    OceanSparkConnectOperator,
)

args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": utils.dates.days_ago(0, second=1),
}


dag = DAG(dag_id="spark-connect-task", default_args=args, schedule_interval=None)

spark_pi_task = OceanSparkConnectOperator(
    task_id="spark-connect",
    dag=dag,
)

Trigger the DAG with config, such as

{
  "sql": "select random()"
}

more examples are available for Airflow 2.

Test locally

You can test the plugin locally using the docker compose setup in this repository. Run make serve_airflow at the root of the repository to launch an instance of Airflow 2 with the provider already installed.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ocean_spark_airflow_provider-1.1.4.tar.gz (12.4 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file ocean_spark_airflow_provider-1.1.4.tar.gz.

File metadata

File hashes

Hashes for ocean_spark_airflow_provider-1.1.4.tar.gz
Algorithm Hash digest
SHA256 6db73eb59f77a4582e7a6933db444596420140596015b391ed16d6d147e4c863
MD5 d2eb865df1fea7db61c9fc2e81afdbb5
BLAKE2b-256 79d276989946167a053307dcd061fdff682aabec3182354be3df9d1df17a5ae8

See more details on using hashes here.

File details

Details for the file ocean_spark_airflow_provider-1.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for ocean_spark_airflow_provider-1.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 9dcd05da2cbd988c5c084e53da1fb9b201a71123167183d2c4c33f9bb48741d3
MD5 f93498c199400ee149d98890870f767d
BLAKE2b-256 079a2196110e185db76e3ece2a74875dfd37d2e9dbb96c919dd24edbb7978ce7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page