Skip to main content

Start your Dataflow jobs execution directly from the Triggerer without going to the Worker.

Project description

Airflow Custom Deferrable Dataflow Operator

Trigger Different: Cut Your AirFlow Costs By Starting From Triggerer!

Use this simple Airflow Operator to start your Dataflow jobs execution directly from the Triggerer without going to the Worker!

Contents

How It Works

The main idea of this approach is to start the task instance execution directly on the Triggerer component, bypassing the worker entirely.

This strategy is effective because, in this case, the only action the operator performs is making an HTTP request to start the external processing service and waiting for the job to complete.

For this reason, we can leverage the async design of the Triggerer for this execution, significantly reducing resource consumption. By using this architecture, the Airflow task execution proccess will be something like the following:

airflow_diagram

To know more about how the tool works, check out the Medium article.

Installation

The installation process will depend on your cloud provider or how you have set up your environment.

Regarding Google Cloud Composer, for example, the DAGs folder is not synchronized with the Airflow Triggerer, as stated in the documentation.

Consequently, just uploading your code to the DAGs folder will not work, and you'll likely face an error like this: ImportError: Module "PACKAGE_NAME" does not define a "CLASS_NAME" attribute/class

In this case, it's necessary to import the missing code from PyPI, meaning that you'll need to install the operator/trigger as a new library.

To do so, you can use the following command:

pip install custom-deferrable-dataflow-operator

Usage

After installing the library, you can successfully import and use the operator in your Airflow DAGs, as shown below:

from deferrable_dataflow_operator import DeferrableDataflowOperator

dataflow_triggerer_job = DeferrableDataflowOperator(
    trigger_kwargs={
        "project_id": GCP_PROJECT_ID,
        "region": GCP_REGION,
        "body": {
            "job_name": MY_JOB_NAME,
            "parameters": {
                "dataflow-parameters": MY_PARAMETERS
            },
            "environment": {**dataflow_env_vars},
            "container_spec_gcs_path": TEMPLATE_GCS_PATH,
        }
    },
    start_from_trigger=True,
    task_id=MY_TASK_ID
)

In the trigger_kwargs parameter, it's important to specify your GCP project ID and region. The body parameter, on the other hand, should contain all the relevant information for your Dataflow job, as stated in the official documentation.

Contributing

This project is open to contributions! If you want to collaborate to improve the operator, please follow these steps:

  1. Open a new issue to discuss the feature or bug you want to address.
  2. Once approved, fork the repository and create a new branch.
  3. Implement the changes.
  4. Create a pull request with a detailed description of the changes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

custom_deferrable_dataflow_operator-1.0.0.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file custom_deferrable_dataflow_operator-1.0.0.tar.gz.

File metadata

File hashes

Hashes for custom_deferrable_dataflow_operator-1.0.0.tar.gz
Algorithm Hash digest
SHA256 d30dae35f7ce85f0a18d8c450bcb86c417a0b73b12bc76d5aa66c3a5b3fcc716
MD5 cdc46765d40a6df6b59c18583290ef79
BLAKE2b-256 57c88ac78e06a59ca75e6c93489a973d9a7b9a5e7d67d261b60e2a234617743e

See more details on using hashes here.

File details

Details for the file custom_deferrable_dataflow_operator-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for custom_deferrable_dataflow_operator-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1330400df6afeb051cbae46c8bb5a3f5cca62045783a730356117d11285eb171
MD5 1f1e668918af566e83e3d8fe7dd6bca7
BLAKE2b-256 d85b2cf3b1440212f6a61324bc4ceb47a4042bdbfb5355ecbf4e7d724dec6279

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page