Start your Dataflow jobs execution directly from the Triggerer without going to the Worker.
Project description
Airflow Custom Deferrable Dataflow Operator
Trigger Different: Cut Your AirFlow Costs By Starting From Triggerer!
Use this simple Airflow Operator to start your Dataflow jobs execution directly from the Triggerer without going to the Worker!
Contents
How It Works
The main idea of this approach is to start the task instance execution directly on the Triggerer component, bypassing the worker entirely.
This strategy is effective because, in this case, the only action the operator performs is making an HTTP request to start the external processing service and waiting for the job to complete.
For this reason, we can leverage the async design of the Triggerer for this execution, significantly reducing resource consumption. By using this architecture, the Airflow task execution proccess will be something like the following:
To know more about how the tool works, check out the Medium article.
Installation
The installation process will depend on your cloud provider or how you have set up your environment.
Regarding Google Cloud Composer, for example, the DAGs folder is not synchronized with the Airflow Triggerer, as stated in the documentation.
Consequently, just uploading your code to the DAGs folder will not work, and you'll likely face an error like this: ImportError: Module "PACKAGE_NAME" does not define a "CLASS_NAME" attribute/class
In this case, it's necessary to import the missing code from PyPI, meaning that you'll need to install the operator/trigger as a new library.
To do so, you can use the following command:
pip install custom-deferrable-dataflow-operator
Usage
After installing the library, you can successfully import and use the operator in your Airflow DAGs, as shown below:
from deferrable_dataflow_operator import DeferrableDataflowOperator
dataflow_triggerer_job = DeferrableDataflowOperator(
trigger_kwargs={
"project_id": GCP_PROJECT_ID,
"region": GCP_REGION,
"body": {
"job_name": MY_JOB_NAME,
"parameters": {
"dataflow-parameters": MY_PARAMETERS
},
"environment": {**dataflow_env_vars},
"container_spec_gcs_path": TEMPLATE_GCS_PATH,
}
},
start_from_trigger=True,
task_id=MY_TASK_ID
)
In the trigger_kwargs parameter, it's important to specify your GCP project ID and region. The body parameter, on the other hand, should contain all the relevant information for your Dataflow job, as stated in the official documentation.
Contributing
This project is open to contributions! If you want to collaborate to improve the operator, please follow these steps:
- Open a new issue to discuss the feature or bug you want to address.
- Once approved, fork the repository and create a new branch.
- Implement the changes.
- Create a pull request with a detailed description of the changes.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file custom_deferrable_dataflow_operator-1.0.0.tar.gz.
File metadata
- Download URL: custom_deferrable_dataflow_operator-1.0.0.tar.gz
- Upload date:
- Size: 8.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d30dae35f7ce85f0a18d8c450bcb86c417a0b73b12bc76d5aa66c3a5b3fcc716
|
|
| MD5 |
cdc46765d40a6df6b59c18583290ef79
|
|
| BLAKE2b-256 |
57c88ac78e06a59ca75e6c93489a973d9a7b9a5e7d67d261b60e2a234617743e
|
File details
Details for the file custom_deferrable_dataflow_operator-1.0.0-py3-none-any.whl.
File metadata
- Download URL: custom_deferrable_dataflow_operator-1.0.0-py3-none-any.whl
- Upload date:
- Size: 9.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1330400df6afeb051cbae46c8bb5a3f5cca62045783a730356117d11285eb171
|
|
| MD5 |
1f1e668918af566e83e3d8fe7dd6bca7
|
|
| BLAKE2b-256 |
d85b2cf3b1440212f6a61324bc4ceb47a4042bdbfb5355ecbf4e7d724dec6279
|