Skip to main content

Airflow extension for communicating with Wherobots Cloud

Project description

Airflow Providers for Wherobots

Airflow providers to bring Wherobots Cloud's spatial compute to your data workflows and ETLs.

Installation

If you use Poetry in your project, add the dependency with poetry add:

$ poetry add airflow-providers-wherobots

Otherwise, just pip install it:

$ pip install airflow-providers-wherobots

Create an http connection

Create a Connection in Airflow. This can be done from Apache Airflow's Web UI, or from the command-line. The default Wherobots connection name is wherobots_default; if you use another name you must specify that name with the wherobots_conn_id parameter when initializing Wherobots operators.

The only required fields for the connection are:

  • the Wherobots API endpoint in the host field;
  • your Wherobots API key in the password field.
$ airflow connections add "wherobots_default" \
    --conn-type "generic" \
    --conn-host "api.cloud.wherobots.com" \
    --conn-password "$(< api.key)"

Usage

Execute a Run on Wherobots Cloud

Wherobots allows users to upload their code (.py, .jar), execute it on the cloud, and monitor the status of the run. Each execution is called a Run.

The WherobotsRunOperator allows you to execute a Run on Wherobots Cloud. WherobotsRunOperator triggers the run according to the parameters you provide, and waits for the run to finish before completing the task.

Refer to the Wherobots Managed Storage Documentation to learn more about how to upload and manage your code on Wherobots Cloud.

Below is an example of WherobotsRunOperator

from airflow_providers_wherobots.operators.sql import WherobotsRunOperator

from wherobots.db.region import Region
from wherobots.db.runtime import Runtime

operator = WherobotsRunOperator(
        task_id="your_task_id",
        name="airflow_operator_test_run_{{ ts_nodash }}",
        region=Region.AWS_US_WEST_2,
        runtime=Runtime.TINY_A10_GPU,
        run_python={
            "uri": "s3://wbts-wbc-m97rcg45xi/42ly7mi0p1/data/shared/classification.py"
        },
        dag=dag,
        poll_logs=True,
    )

Arguments

The arguments for the WherobotsRunOperator constructor:

  • region: Region: The Wherobots region where runs are hosted. The values available can be found in wherobots.db.region.Region.

  • name: str: The name of the run. If not specified, a default name will be generated.

  • runtime: Runtime: The runtime dictates the size and amount of resources powering the run. The default value is Runtime.TINY; see available values here.

  • version: str: The WherobotsDB version to use. Defaults to latest.

  • poll_logs: bool: If True, the operator will poll and Logger.info() the run logs until the run finishes. If False, the operator will not poll the logs, only track the status of the run.

  • polling_interval: The interval in seconds to poll the status of the run. The default value is 30.

  • timeout_seconds: int: This parameter sets a maximum run time (in seconds) to prevent runaway processes. If the specified value exceeds the Max Workload Alive Hours, the timeout will be capped at the maximum permissible limit. Defaults to 3600 seconds (1 hour).

  • run_python: dict: A dictionary with the following keys:

    • uri: str: The URI of the Python file to run.
    • args: list[str]: A list of arguments to pass to the Python file.
  • run_jar: dict: A dictionary with the following keys:

    • uri: str: The URI of the JAR file to run.
    • args: list[str]: A list of arguments to pass to the JAR file.
    • mainClass: str: The main class to run in the JAR file.
  • environment: dict: A dictionary with the following keys:

    • sparkDriverDiskGB: int: The disk size for the Spark driver.
    • sparkExecutorDiskGB: int: The disk size for the Spark executor.
    • sparkConfigs: dict: A dictionary of Spark configurations.
    • dependencies: list[dict]: A list of dependant libraries to install.
  • wait_post_run_logs_timeout_seconds: int: Maximum duration (in seconds) the system waits for logs to become queryable after job completion. The default value is 60 seconds.

    For more detailed information about the environment parameter, refer to Get Run Logs in the Wherobots Documentation.

[!IMPORTANT] Today Wherobots Cloud offers free access to the "Tiny" runtime through the Community Edition Organization. If you need access to larger runtimes (including Memory-Optimized and GPU-Optimized runtimes), consider upgrading to a Professional Edition Organization, Wherobots Cloud's pay-as-you-go plan. For more information, refer to the Upgrade Organization guidance in the Wherobots Documentation.

[!WARNING] The run_* arguments are mutually exclusive, you can only specify one of them.

The dependencies argument is a list of dictionaries. There are two types of dependencies supported.

  1. PYPI dependencies:
{
    "sourceType": "PYPI",
    "libraryName": "package_name",
    "libraryVersion": "package_version"
}
  1. FILE dependencies:
{
    "sourceType": "FILE",
    "filePath": "s3://bucket/path/to/dependency.whl"
}

The file types supported are .whl, .zip, and .jar.

Execute a SQL query

The WherobotsSqlOperator allows you to run SQL queries on the Wherobots cloud, from which you can build your ETLs and data transformation workflows by querying, manipulating, and producing datasets with WherobotsDB.

Refer to the Wherobots Documentation and this guidance to learn how to read data, transform data, and write results in Spatial SQL with WherobotsDB.

Refer to the Wherobots Apache Airflow Provider Documentation to get more detailed guidance about how to use the Wherobots Apache Airflow Provider.

Example

Below is an example Airflow DAG that executes a SQL query on Wherobots Cloud:

import datetime

from airflow import DAG
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator

from wherobots.db.region import Region
from wherobots.db.runtime import Runtime

with DAG(
    dag_id="example_wherobots_sql_dag",
    start_date=datetime.datetime.now(),
    schedule="@hourly",
    catchup=False
):
    # Create a `wherobots.test.airflow_example` table with 100 records
    # from the OMF `places_place` dataset.
    operator = WherobotsSqlOperator(
        task_id="execute_query",
        return_last=False,
        runtime=Runtime.TINY,
        region=Region.AWS_US_WEST_2,
        sql=f"""
        INSERT INTO wherobots.test.airflow_example
        SELECT id, geometry, confidence, geohash
        FROM wherobots_open_data.overture.places_place
        LIMIT 100
        """,
    )

Arguments

  • region: Region: The Wherobots Compute Region where the SQL query executions are hosted. The values available can be found in wherobots.db.region.Region.
  • runtime: Runtime: The runtime dictates the size and amount of resources powering the run. The default value is Runtime.TINY; see available values here.
  • sql: str: The Spatial SQL query to execute.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_providers_wherobots-1.4.2.tar.gz (254.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_providers_wherobots-1.4.2-py3-none-any.whl (16.7 kB view details)

Uploaded Python 3

File details

Details for the file airflow_providers_wherobots-1.4.2.tar.gz.

File metadata

File hashes

Hashes for airflow_providers_wherobots-1.4.2.tar.gz
Algorithm Hash digest
SHA256 1f5cc9f8c78616be1857a7e234195e971ac6abe0c336ba8d2ee4ef47dfedddfe
MD5 3835f9ca0b778789e4b227f2721023e8
BLAKE2b-256 6c0124546e31b750de2e8e299ac57d7d5321602f8778899a7b203e15f6602db8

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_providers_wherobots-1.4.2.tar.gz:

Publisher: pypi-publish.yaml on wherobots/airflow-providers-wherobots

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_providers_wherobots-1.4.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_providers_wherobots-1.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4bd0921011d3287c68b4978ebf4f41b8d62c83ffe821fd3d432921d93d1ad794
MD5 48c6e07b2061dd23e5a1a73c78c5424b
BLAKE2b-256 5bb6ea05e3329bd26d1059c813164f74ad4f169cc3ddd14ee6f2357f45bf1d05

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_providers_wherobots-1.4.2-py3-none-any.whl:

Publisher: pypi-publish.yaml on wherobots/airflow-providers-wherobots

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page