GCP Pipeline Reference: Generic Orchestration - Airflow DAGs for Cloud Composer (JOIN + MAP patterns)
Project description
Generic Orchestration
Unit 3 of Generic 3-Unit Deployment
Airflow DAGs for pipeline coordination and scheduling.
Flow Diagram
Generic ORCHESTRATION FLOW
─────────────────────
Pub/Sub Airflow DAGs External
─────── ──────────── ────────
.ok file ┌─────────────────────────────────────────────────────────┐
arrives │ │
│ │ ┌──────────────┐ │
└───────►│ │ PubSub │ │
│ │ Sensor │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────────┐ │
│ │ Trigger │───►│ Dataflow │───►│ Wait for │ │
│ │ Ingestion │ │ (generic-ingest) │ │ Complete │ │
│ └──────────────┘ └──────────────┘ └─────┬─────┘ │
│ │ │
│ ┌─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Entity │ Waits for ALL 3 JOIN entities: │
│ │ Dependency │ - customers ✓ │
│ │ Checker │ - accounts ✓ │
│ └──────┬───────┘ - decision ✓ │
│ │ │
│ ▼ (all ready) │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Trigger │───►│ dbt run │ │
│ │ dbt │ │ (transform) │ │
│ └──────────────┘ └──────────────┘ │
│ │
└────────────────────────────────────────────────────────┘
Pattern
MULTI-TARGET: Orchestrates ingestion of 4 entities (3 JOIN + 1 MAP) → waits for JOIN entities → triggers transformation to 3 FDP targets
| Step | Description |
|---|---|
| 1 | Pub/Sub sensor detects .ok file |
| 2 | Triggers Dataflow ingestion job |
| 3 | EntityDependencyChecker waits for all 3 JOIN entities |
| 4 | When all ready, triggers dbt transformation to event_transaction_excess, portfolio_account_excess, and portfolio_account_facility |
Library-Driven Ease of Use
The Generic orchestration unit leverages the gcp-pipeline-orchestration library to coordinate a complex multi-entity arrival pattern:
- Event-Driven Triggering: Uses
BasePubSubPullSensorwith built-in.okfile filtering and metadata extraction to XCom. - Cross-Entity Coordination: Uses
EntityDependencyCheckerto verify that all 3 JOIN entities (Customers, Accounts, Decision) plus handling the Applications MAP entity independently are loaded before triggering the FDP transformation. - Local Development: All DAGs can be parsed and validated without a live Airflow environment thanks to the library's
AIRFLOW_AVAILABLEstubbing mechanism.
How to Replicate this JOIN Orchestration
To create a new orchestration unit for a multi-entity system, follow the Creating New Deployment Guide and use the standardized DAG Templates.
Key steps for this JOIN pattern:
- Pub/Sub Sensor: Configure
BasePubSubPullSensorfor your system's notification topic. - Job Control: Use
JobControlRepositoryfromcoreto track the state of each entity load. - Dependency Check: Initialize
EntityDependencyCheckerwith your list ofREQUIRED_ENTITIES. - DAG Triggering: Use
TriggerDagRunOperatorto chain the Ingestion and Transformation units.
Infrastructure & Configurations
Google Cloud Resources
This deployment requires the following GCP infrastructure, provisioned via Terraform:
- Orchestration: Cloud Composer (Managed Apache Airflow).
- Messaging: Pub/Sub Topic
generic-file-notificationsand Subscriptiongeneric-file-notifications-sub. - Identity & Access: Service Account with roles for Dataflow, BigQuery, and GCS.
For detailed infrastructure definitions, see infrastructure/terraform/systems/generic/orchestration/.
Airflow Configuration
The DAGs use several Airflow variables and connections:
| Type | Name | Description | Default / Source |
|---|---|---|---|
| Variable | gcp_project_id |
Target GCP Project ID | GCP_PROJECT_ID env var |
| Variable | gcp_region |
GCP Region for Dataflow | europe-west2 |
| Variable | generic_pubsub_subscription |
Pub/Sub subscription for file alerts | generic-file-notifications-sub |
| Variable | generic_landing_bucket |
GCS bucket for landing files | <project>-generic-dev-landing |
| Variable | generic_error_bucket |
GCS bucket for error files | <project>-generic-dev-error |
| Connection | google_cloud_default |
Connection for GCP resources | - |
| Connection | bigquery_default |
Connection for BigQuery | - |
Technology Stack & Documentation
- Google Cloud Composer - Managed Apache Airflow
- Apache Airflow - Workflow orchestration
- Airflow Google Cloud Operators - GCP integration
- Cloud Pub/Sub - Messaging service for event triggers
- Airflow Cross-DAG Dependencies - Orchestrating complex flows
DAGs
| DAG | Purpose |
|---|---|
generic_pubsub_trigger_dag.py |
Triggered by Pub/Sub on .ok file arrival |
generic_ingestion_dag.py |
Runs Dataflow for ODP load and checks entity dependencies |
generic_transformation_dag.py |
Runs dbt for FDP transformation |
generic_error_handling_dag.py |
Error handling and DLQ |
generic_pipeline_status_dag.py |
Pipeline status monitoring |
Dependencies
| Library | Purpose |
|---|---|
gcp-pipeline-core |
Audit, logging, error handling |
gcp-pipeline-orchestration |
DAG factory, sensors, operators |
NO Apache Beam dependency - ingestion is separate unit.
Execution & Testing
1. Local DAG Validation
You can validate DAG syntax locally:
# Setup venv
./scripts/setup_deployment_venv.sh data-pipeline-orchestrator
source deployments/data-pipeline-orchestrator/venv/bin/activate
# Validate syntax
python dags/generic_pubsub_trigger_dag.py
2. Testing End-to-End Flow
Use the simulation script to trigger the full Generic flow:
./scripts/gcp/06_test_pipeline.sh generic
3. Manual Pub/Sub Trigger
Alternatively, you can manually publish a notification to trigger the DAG:
gcloud pubsub topics publish generic-file-notifications \
--message='{"name": "generic/customers/customers_20260101.ok", "bucket": "my-landing-bucket"}'
4. Deployment to Composer
Deploy the DAGs to your Cloud Composer environment:
gsutil cp dags/*.py gs://<composer-bucket>/dags/
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gcp_pipeline_ref_orchestration-1.0.29.tar.gz.
File metadata
- Download URL: gcp_pipeline_ref_orchestration-1.0.29.tar.gz
- Upload date:
- Size: 23.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9570321e49ec25998060f6f6cbd557459931902cc9a7fcee231ba9f91028618c
|
|
| MD5 |
f48a964baf86ed50c19924e291b15058
|
|
| BLAKE2b-256 |
f771f4d4cae5105bba719863aa3a6d72da627b79a66afde79fc49c355546350d
|
File details
Details for the file gcp_pipeline_ref_orchestration-1.0.29-py3-none-any.whl.
File metadata
- Download URL: gcp_pipeline_ref_orchestration-1.0.29-py3-none-any.whl
- Upload date:
- Size: 37.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0a6a8f6ca5b78106707842f93ed9d321a4e04f285c5dd7ccaec983f5b67520b7
|
|
| MD5 |
d9a6a23eb3e83b2bf502929e20039cec
|
|
| BLAKE2b-256 |
165912896010140e9060de838abf14815acdaae46676599179161ea47f88d167
|