Skip to main content

GCP Pipeline Reference: Generic Orchestration - Airflow DAGs for Cloud Composer (JOIN + MAP patterns)

Project description

Generic Orchestration

Unit 3 of Generic 3-Unit Deployment

Airflow DAGs for pipeline coordination and scheduling.


Flow Diagram

                         Generic ORCHESTRATION FLOW
                         ─────────────────────

  Pub/Sub                    Airflow DAGs                    External
  ───────                    ────────────                    ────────

  .ok file     ┌─────────────────────────────────────────────────────────┐
  arrives      │                                                         │
  │        │  ┌──────────────┐                                       │
  └───────►│  │ PubSub       │                                       │
               │  │ Sensor       │                                       │
               │  └──────┬───────┘                                       │
               │         │                                               │
               │         ▼                                               │
               │  ┌──────────────┐    ┌──────────────┐    ┌───────────┐ │
               │  │ Trigger      │───►│ Dataflow     │───►│ Wait for  │ │
               │  │ Ingestion    │    │ (generic-ingest)  │    │ Complete  │ │
               │  └──────────────┘    └──────────────┘    └─────┬─────┘ │
               │                                                │       │
               │  ┌─────────────────────────────────────────────┘       │
               │  │                                                     │
               │  ▼                                                     │
               │  ┌──────────────┐                                      │
               │  │ Entity       │  Waits for ALL 3 JOIN entities:      │
               │  │ Dependency   │  - customers ✓                       │
               │  │ Checker      │  - accounts  ✓                       │
               │  └──────┬───────┘  - decision  ✓                       │
               │         │                                              │
               │         ▼ (all ready)                                  │
               │  ┌──────────────┐    ┌──────────────┐                  │
               │  │ Trigger      │───►│ dbt run      │                  │
               │  │ dbt          │    │ (transform)  │                  │
               │  └──────────────┘    └──────────────┘                  │
               │                                                        │
               └────────────────────────────────────────────────────────┘

Pattern

MULTI-TARGET: Orchestrates ingestion of 4 entities (3 JOIN + 1 MAP) → waits for JOIN entities → triggers transformation to 3 FDP targets

Step Description
1 Pub/Sub sensor detects .ok file
2 Triggers Dataflow ingestion job
3 EntityDependencyChecker waits for all 3 JOIN entities
4 When all ready, triggers dbt transformation to event_transaction_excess, portfolio_account_excess, and portfolio_account_facility

Library-Driven Ease of Use

The Generic orchestration unit leverages the gcp-pipeline-orchestration library to coordinate a complex multi-entity arrival pattern:

  1. Event-Driven Triggering: Uses BasePubSubPullSensor with built-in .ok file filtering and metadata extraction to XCom.
  2. Cross-Entity Coordination: Uses EntityDependencyChecker to verify that all 3 JOIN entities (Customers, Accounts, Decision) plus handling the Applications MAP entity independently are loaded before triggering the FDP transformation.
  3. Local Development: All DAGs can be parsed and validated without a live Airflow environment thanks to the library's AIRFLOW_AVAILABLE stubbing mechanism.

How to Replicate this JOIN Orchestration

To create a new orchestration unit for a multi-entity system, follow the Creating New Deployment Guide and use the standardized DAG Templates.

Key steps for this JOIN pattern:

  1. Pub/Sub Sensor: Configure BasePubSubPullSensor for your system's notification topic.
  2. Job Control: Use JobControlRepository from core to track the state of each entity load.
  3. Dependency Check: Initialize EntityDependencyChecker with your list of REQUIRED_ENTITIES.
  4. DAG Triggering: Use TriggerDagRunOperator to chain the Ingestion and Transformation units.

Infrastructure & Configurations

Google Cloud Resources

This deployment requires the following GCP infrastructure, provisioned via Terraform:

  • Orchestration: Cloud Composer (Managed Apache Airflow).
  • Messaging: Pub/Sub Topic generic-file-notifications and Subscription generic-file-notifications-sub.
  • Identity & Access: Service Account with roles for Dataflow, BigQuery, and GCS.

For detailed infrastructure definitions, see infrastructure/terraform/systems/generic/orchestration/.

Airflow Configuration

The DAGs use several Airflow variables and connections:

Type Name Description Default / Source
Variable gcp_project_id Target GCP Project ID GCP_PROJECT_ID env var
Variable gcp_region GCP Region for Dataflow europe-west2
Variable generic_pubsub_subscription Pub/Sub subscription for file alerts generic-file-notifications-sub
Variable generic_landing_bucket GCS bucket for landing files <project>-generic-dev-landing
Variable generic_error_bucket GCS bucket for error files <project>-generic-dev-error
Connection google_cloud_default Connection for GCP resources -
Connection bigquery_default Connection for BigQuery -

Technology Stack & Documentation


DAGs

DAG Purpose
generic_pubsub_trigger_dag.py Triggered by Pub/Sub on .ok file arrival
generic_odp_load_dag.py Runs Dataflow for ODP load and checks entity dependencies
generic_fdp_transform_dag.py Runs dbt for FDP transformation
generic_error_handling_dag.py Error handling and DLQ

Dependencies

Library Purpose
gcp-pipeline-core Audit, logging, error handling
gcp-pipeline-orchestration DAG factory, sensors, operators

NO Apache Beam dependency - ingestion is separate unit.


Execution & Testing

1. Local DAG Validation

You can validate DAG syntax locally:

# Setup venv
./scripts/setup_deployment_venv.sh data-pipeline-orchestrator
source deployments/data-pipeline-orchestrator/venv/bin/activate

# Validate syntax
python dags/data_ingestion_dag.py

2. Testing End-to-End Flow

Use the simulation script to trigger the full Generic flow:

./scripts/gcp/06_test_pipeline.sh generic

3. Manual Pub/Sub Trigger

Alternatively, you can manually publish a notification to trigger the DAG:

gcloud pubsub topics publish generic-file-notifications \
    --message='{"name": "generic/customers/customers_20260101.ok", "bucket": "my-landing-bucket"}'

4. Deployment to Composer

Deploy the DAGs to your Cloud Composer environment:

gsutil cp dags/*.py gs://<composer-bucket>/dags/

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pipeline_ref_orchestration-1.0.14.tar.gz (23.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcp_pipeline_ref_orchestration-1.0.14-py3-none-any.whl (37.6 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pipeline_ref_orchestration-1.0.14.tar.gz.

File metadata

File hashes

Hashes for gcp_pipeline_ref_orchestration-1.0.14.tar.gz
Algorithm Hash digest
SHA256 9b57a3e18354f5a167b9d1069fbddd815122c8f25822116f9734292f3cdc1eb8
MD5 33e26ec8244c1c1a133275bbc8a8a952
BLAKE2b-256 8c70bfc2abc1f9523c5c4962dd3b793d6733aca23800214edda16c8d1644281b

See more details on using hashes here.

File details

Details for the file gcp_pipeline_ref_orchestration-1.0.14-py3-none-any.whl.

File metadata

File hashes

Hashes for gcp_pipeline_ref_orchestration-1.0.14-py3-none-any.whl
Algorithm Hash digest
SHA256 edc073e2f3225f574f150339f0b2796c852d29cf2fae6c3992f17bdcd024047d
MD5 54415f06171ea24deb28e9675a566108
BLAKE2b-256 4146b97c022d409d4962ba9d28df230850b33aad361f15e9852c5cc9d5715c16

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page