Skip to main content

Reference Implementation: GCS-to-BigQuery Ingestion Pipeline using gcp-pipeline-framework

Project description

Generic Ingestion

Unit 1 of Generic 3-Unit Deployment

ODP Ingestion Pipeline - reads mainframe extracts from GCS and loads to BigQuery.


Flow Diagram

                         Generic INGESTION FLOW
                         ─────────────────

  GCS Landing                  Beam Pipeline                    BigQuery ODP
  ───────────                  ─────────────                    ────────────

  1. .csv splits ──┐
  2. .ok file    ──┼──► Pub/Sub ──► Orchestration ──► 1. Read CSV      ┌──────────────┐
                   │      Event      (Unit 3)         2. Parse HDR/TRL │ odp_generic.      │
                   │                                  3. Validate      │ - customers  │
                   │                                  4. Add Audit     │ - accounts   │
                   │                                  5. Write to BQ ─►│ - decision   │
                   └──────────────────────────────────┘                └──────────────┘
                               │
                               ▼
                        ┌─────────────┐
                        │ Archive to  │
                        │ GCS Archive │
                        └─────────────┘

Pattern

JOIN: 3 entities (Customers, Accounts, Decision) → 3 ODP tables

Entity ODP Table Key Fields
Customers odp_generic.customers customer_id, ssn, status, created_date
Accounts odp_generic.accounts account_id, customer_id, account_type, balance, open_date
Decision odp_generic.decision decision_id, customer_id, decision_code, score, decision_date

Components

Directory Purpose
data_ingestion/pipeline/ Beam pipeline and transforms
data_ingestion/config/ System configuration
data_ingestion/schema/ Python Schemas: Source-of-truth EntitySchema definitions used for validation and PII masking logic.
data_ingestion/schemas/ JSON Schemas: Physical BigQuery table schemas used for manual loads, recovery, and external tool integration.
data_ingestion/validation/ File and record validators

Schema Architecture

The ingestion unit maintains two types of schemas to separate logical rules from physical storage contracts:

  1. Logical Engine (schema/ Python):
    • Purpose: Used by the Beam pipeline for runtime CSV parsing, record-level validation, and in-flight PII masking.
    • Feature: Can dynamically generate BigQuery schemas during job execution (CREATE_IF_NEEDED).
  2. Physical Contract (schemas/ JSON):
    • Purpose: Provides a tool-agnostic representation of the BigQuery tables.
    • Usage: Consumed by Terraform to pre-provision infrastructure (with partitioning/clustering) and used for manual bq load recovery operations.

This dual-schema approach ensures that Infrastructure builds the container, but Ingestion carries the blueprint.


Library-Driven Ease of Use

The Generic ingestion pipeline is a Lean Consumer of the library framework. It achieves complex mainframe ingestion with minimal custom code by leveraging:

  1. Metadata-Driven Schema: data_ingestion/schema/customers.py simply defines an EntitySchema. The library's SchemaValidator handles all type checking and PII masking automatically.
  2. Standardized Parsing: Uses the HDRTRLParser from gcp-pipeline-beam to validate mainframe headers/trailers without regex boilerplate.
  3. Audit Integrity: Automatically injects _run_id and _processed_at using the AddAuditColumnsDoFn library transform.

How to Replicate this JOIN Ingestion (3-to-3)

To create a new ingestion unit for a multi-entity system, follow the Creating New Deployment Guide.

Key steps for this JOIN pattern:

  1. Define Schema: Create a new schema file using gcp_pipeline_core.schema.EntitySchema.
  2. Configure Pipeline: Inherit from gcp_pipeline_beam.pipelines.base.BasePipeline.
  3. Plug in Transforms: Use the fluent BeamPipelineBuilder to chain read_csv -> validate -> write_to_bigquery.
  4. CI/CD Config: Update your project and org identifiers in your CI/CD configuration.

Infrastructure & Configurations

Google Cloud Resources

This deployment requires the following GCP infrastructure, provisioned via Terraform:

  • Storage: GCS buckets for landing, archive, and error files.
  • Messaging: Pub/Sub Topic generic-file-notifications and Subscription generic-file-notifications-sub.
  • Processing: Cloud Dataflow (Apache Beam) for running the ingestion pipeline.
  • Data Warehouse: BigQuery dataset odp_generic for raw data storage.

For detailed infrastructure definitions, see infrastructure/terraform/systems/generic/ingestion/.

Pipeline Configuration (EMPipelineOptions)

The ingestion pipeline accepts several command-line arguments to control its behavior:

Argument Description Required
--entity Generic entity to process (customers, accounts, decision) Yes
--input_file GCS path to input file Yes
--output_table Target BigQuery table (project:dataset.table) Yes
--error_table BQ table for failed records Yes
--run_id Unique identifier for tracking/auditing Yes
--extract_date Extract date in YYYYMMDD format Yes
--job_control_project GCP Project for job control table No

Technology Stack & Documentation


Dependencies

Library Purpose
gcp-pipeline-core Audit, logging, error handling
gcp-pipeline-beam Beam transforms, HDR/TRL parsing

NO Apache Airflow dependency - orchestration is separate unit.


Execution & Testing

1. Local Development Setup

Initialize the virtual environment:

./scripts/setup_deployment_venv.sh original-data-to-bigqueryload
source deployments/original-data-to-bigqueryload/venv/bin/activate

2. Unit Testing

Run the ingestion unit tests:

python -m pytest tests/unit/ -v

3. Local Execution (DirectRunner)

Run the Beam pipeline locally to validate parsing and schema:

python -m data_ingestion.pipeline.main \
    --input_file=tests/data/sample_customers.csv \
    --output_table=my-project:odp_generic.customers \
    --runner=DirectRunner \
    --temp_location=/tmp/beam-temp

4. Cloud Execution

The ingestion unit is typically triggered by Airflow. To trigger it manually for testing on GCP:

  1. Upload a data file to GCS.
  2. Upload a corresponding .ok file.
  3. The generic_pubsub_trigger_dag will detect the file and launch a Dataflow job using this ingestion code.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pipeline_ref_ingestion-1.0.7.tar.gz (24.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcp_pipeline_ref_ingestion-1.0.7-py3-none-any.whl (29.6 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pipeline_ref_ingestion-1.0.7.tar.gz.

File metadata

File hashes

Hashes for gcp_pipeline_ref_ingestion-1.0.7.tar.gz
Algorithm Hash digest
SHA256 a9d4f77f42df46f66f0597b4e102b32481f2b4b5764f3ec2305e9dac0799823c
MD5 84c2649c1974e9b44e2fa5e9aa96118f
BLAKE2b-256 31d8784a5871a74894f5f7f637c6748d1bfbf9141bf642903e88e71862a38963

See more details on using hashes here.

File details

Details for the file gcp_pipeline_ref_ingestion-1.0.7-py3-none-any.whl.

File metadata

File hashes

Hashes for gcp_pipeline_ref_ingestion-1.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 047e2f4c73da9315eac50959fed10d31c2d29b6dc0f1e4b735d52fed5f03a2c0
MD5 5243fad02a9793353957fb4a56a76013
BLAKE2b-256 3d2d5cc62c10d6fed0700a2771a875580dd603d8830aadba227e4e7e7c4d3e74

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page