Skip to main content

Airflow extension for communicating with Wherobots Cloud

Project description

Airflow Providers for Wherobots

Airflow providers to bring Wherobots Cloud's spatial compute to your data workflows and ETLs.

Installation

If you use Poetry in your project, add the dependency with poetry add:

$ poetry add airflow-providers-wherobots

Otherwise, just pip install it:

$ pip install airflow-providers-wherobots

Create an http connection

Create a Connection in Airflow. This can be done from Apache Airflow's Web UI, or from the command-line. The default Wherobots connection name is wherobots_default; if you use another name you must specify that name with the wherobots_conn_id parameter when initializing Wherobots operators.

The only required fields for the connection are:

  • the Wherobots API endpoint in the host field;
  • your Wherobots API key in the password field.
$ airflow connections add "wherobots_default" \
    --conn-type "generic" \
    --conn-host "api.cloud.wherobots.com" \
    --conn-password "$(< api.key)"

Usage

Execute a Run on Wherobots Cloud

Wherobots allows users to upload their code (.py, .jar), execute it on the cloud, and monitor the status of the run. Each execution is called a Run.

The WherobotsRunOperator allows you to execute a Run on Wherobots Cloud. WherobotsRunOperator triggers the run according to the parameters you provide, and waits for the run to finish before completing the task.

Refer to the Wherobots Managed Storage Documentation to learn more about how to upload and manage your code on Wherobots Cloud.

Below is an example of WherobotsRunOperator

operator = WherobotsRunOperator(
        task_id="your_task_id",
        name="airflow_operator_test_run_{{ ts_nodash }}",
        run_python={
            "uri": "s3://wbts-wbc-m97rcg45xi/42ly7mi0p1/data/shared/classification.py"
        },
        runtime="tiny-a10-gpu",
        dag=dag,
        poll_logs=True,
    )

Arguments

The arguments for the WherobotsRunOperator constructor:

The run_* arguments are mutually exclusive, you can only specify one of them.

  • name: str: The name of the run. If not specified, a default name will be generated.

  • runtime: str: The runtime decides the size of the workloads that execute the run. The default value is tiny. Find the list of available runtimes in your Wherobots Cloud account -> Organization Settings -> General -> RUNTIME CONFIGURATION.

    Find the runtime IDs in the bracket on the right side of the city name.

    runtimes.png

  • poll_logs: bool: If True, the operator will poll the logs of the run until it finishes. If False, the operator will not poll the logs, just track the status of the run.

  • polling_interval: The interval in seconds to poll the status of the run. The default value is 30.

  • run_python: dict: A dictionary with the following keys:

    • uri: str: The URI of the Python file to run.
    • args: list[str]: A list of arguments to pass to the Python file.
  • run_jar: dict: A dictionary with the following keys:

    • uri: str: The URI of the JAR file to run.
    • args: list[str]: A list of arguments to pass to the JAR file.
    • mainClass: str: The main class to run in the JAR file.
  • environment: dict: A dictionary with the following keys:

    • sparkDriverDiskGB: int: The disk size for the Spark driver.
    • sparkExecutorDiskGB: int: The disk size for the Spark executor.
    • sparkConfigs: dict: A dictionary of Spark configurations.
    • dependencies: list[dict]: A list of dependant libraries to install.

The dependencies argument is a list of dictionaries. There are two types of dependencies supported.

  1. PYPI dependencies:
{
    "sourceType": "PYPI",
    "libraryName": "package_name",
    "libraryVersion": "package_version"
}
  1. FILE dependencies:
{
    "sourceType": "FILE",
    "filePath": "s3://bucket/path/to/dependency.whl"
}

File types supported: .whl, .zip, .jar

Execute a SQL query

The WherobotsSqlOperator allows you to run SQL queries on the Wherobots cloud, from which you can build your ETLs and data transformation workflows by querying, manipulating, and producing datasets with WherobotsDB.

Refer to the Wherobots Documentation and this guidance to learn how to read data, transform data, and write results in Spatial SQL with WherobotsDB.

Refer to the Wherobots Apache Airflow Provider Documentation to get more detailed guidance about how to use the Wherobots Apache Airflow Provider.

Example

Below is an example Airflow DAG that executes a SQL query on Wherobots Cloud:

import datetime

from airflow import DAG
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator


with DAG(
    dag_id="example_wherobots_sql_dag",
    start_date=datetime.datetime.now(),
    schedule="@hourly",
    catchup=False
):
    # Create a `wherobots.test.airflow_example` table with 100 records
    # from the OMF `places_place` dataset.
    operator = WherobotsSqlOperator(
        task_id="execute_query",
        sql=f"""
        INSERT INTO wherobots.test.airflow_example
        SELECT id, geometry, confidence, geohash
        FROM wherobots_open_data.overture.places_place
        LIMIT 100
        """,
        return_last=False,
    )

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_providers_wherobots-1.0.0.tar.gz (13.4 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file airflow_providers_wherobots-1.0.0.tar.gz.

File metadata

File hashes

Hashes for airflow_providers_wherobots-1.0.0.tar.gz
Algorithm Hash digest
SHA256 1ee0af84f029b06570debccff21efa9d853f0bbd3f1c633066f8d7a28da17afa
MD5 94e702b2ae6943f3665c76b37ffa234c
BLAKE2b-256 58377b3385316a7624d3fb70b16ed7ac183a1eb8f787c441da51db200615e832

See more details on using hashes here.

File details

Details for the file airflow_providers_wherobots-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_providers_wherobots-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 84052799a40ce4ced6e0d81640b002464f234276e33d70b6acb9c5be9741881e
MD5 0ffab14ec10b9376bc813df24d4b59d3
BLAKE2b-256 952c6c9bf06888e700b05fc47537c6a7aa72e62158960752e2aaca77a4e47f31

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page