Skip to main content

Reference Implementation: GCS-to-BigQuery Ingestion Pipeline using gcp-pipeline-framework

Project description

Generic Ingestion

Unit 1 of Generic 3-Unit Deployment

ODP Ingestion Pipeline - reads mainframe extracts from GCS and loads to BigQuery.


Flow Diagram

                         Generic INGESTION FLOW
                         ─────────────────

  GCS Landing                  Beam Pipeline                    BigQuery ODP
  ───────────                  ─────────────                    ────────────

  1. .csv splits ──┐
  2. .ok file    ──┼──► Pub/Sub ──► Orchestration ──► 1. Read CSV      ┌──────────────────┐
                   │      Event      (Unit 3)         2. Parse HDR/TRL │ odp_generic.      │
                   │                                  3. Validate      │ - customers       │
                   │                                  4. Add Audit     │ - accounts        │
                   │                                  5. Write to BQ ─►│ - decision        │
                   │                                                   │ - applications    │
                   └──────────────────────────────────┘                └──────────────────┘
                               │
                               ▼
                        ┌─────────────┐
                        │ Archive to  │
                        │ GCS Archive │
                        └─────────────┘

Pattern

JOIN + MAP: 4 entities → 4 ODP tables

Entity ODP Table Key Fields
Customers odp_generic.customers customer_id, ssn, status, created_date
Accounts odp_generic.accounts account_id, customer_id, account_type, balance, open_date
Decision odp_generic.decision decision_id, customer_id, decision_code, score, decision_date
Applications odp_generic.applications application_id, customer_id, loan_amount, interest_rate, term_months, application_date, status, event_type, account_type

Components

Directory Purpose
data_ingestion/pipeline/ Beam pipeline and transforms
data_ingestion/config/ System configuration
data_ingestion/schema/ Python Schemas: Source-of-truth EntitySchema definitions used for validation and PII masking logic.
data_ingestion/schemas/ JSON Schemas: Physical BigQuery table schemas used for manual loads, recovery, and external tool integration.
data_ingestion/validation/ File and record validators

Schema Architecture

The ingestion unit maintains two types of schemas to separate logical rules from physical storage contracts:

  1. Logical Engine (schema/ Python):
    • Purpose: Used by the Beam pipeline for runtime CSV parsing, record-level validation, and in-flight PII masking.
    • Feature: Can dynamically generate BigQuery schemas during job execution (CREATE_IF_NEEDED).
  2. Physical Contract (schemas/ JSON):
    • Purpose: Provides a tool-agnostic representation of the BigQuery tables.
    • Usage: Consumed by Terraform to pre-provision infrastructure (with partitioning/clustering) and used for manual bq load recovery operations.

This dual-schema approach ensures that Infrastructure builds the container, but Ingestion carries the blueprint.


Library-Driven Ease of Use

The Generic ingestion pipeline is a Lean Consumer of the library framework. It achieves complex mainframe ingestion with minimal custom code by leveraging:

  1. Metadata-Driven Schema: data_ingestion/schema/customers.py simply defines an EntitySchema. The library's SchemaValidator handles all type checking and PII masking automatically.
  2. Standardized Parsing: Uses the HDRTRLParser from gcp-pipeline-beam to validate mainframe headers/trailers without regex boilerplate.
  3. Audit Integrity: Automatically injects _run_id and _processed_at using the AddAuditColumnsDoFn library transform.

How to Replicate this JOIN Ingestion (3-to-3)

To create a new ingestion unit for a multi-entity system, follow the Creating New Deployment Guide.

Key steps for this JOIN pattern:

  1. Define Schema: Create a new schema file using gcp_pipeline_core.schema.EntitySchema.
  2. Configure Pipeline: Inherit from gcp_pipeline_beam.pipelines.base.BasePipeline.
  3. Plug in Transforms: Use the fluent BeamPipelineBuilder to chain read_csv -> validate -> write_to_bigquery.
  4. CI/CD Config: Update your project and org identifiers in your CI/CD configuration.

Infrastructure & Configurations

Google Cloud Resources

This deployment requires the following GCP infrastructure, provisioned via Terraform:

  • Storage: GCS buckets for landing, archive, and error files.
  • Messaging: Pub/Sub Topic generic-file-notifications and Subscription generic-file-notifications-sub.
  • Processing: Cloud Dataflow (Apache Beam) for running the ingestion pipeline.
  • Data Warehouse: BigQuery dataset odp_generic for raw data storage.

For detailed infrastructure definitions, see infrastructure/terraform/systems/generic/ingestion/.

Pipeline Configuration (GenericPipelineOptions)

The ingestion pipeline accepts several command-line arguments to control its behavior:

Argument Description Required
--entity Generic entity to process (customers, accounts, decision, applications) Yes
--source_file GCS path to input CSV file Yes
--output_table Target BigQuery table (project:dataset.table) Yes
--error_table BQ table for failed records Yes
--run_id Unique identifier for tracking/auditing Yes
--extract_date Extract date in YYYYMMDD format Yes
--job_control_project GCP Project for job control table No

Technology Stack & Documentation


Dependencies

Library Purpose
gcp-pipeline-core Audit, logging, error handling
gcp-pipeline-beam Beam transforms, HDR/TRL parsing

NO Apache Airflow dependency - orchestration is separate unit.


Execution & Testing

1. Local Development Setup

Initialize the virtual environment:

./scripts/setup_deployment_venv.sh original-data-to-bigqueryload
source deployments/original-data-to-bigqueryload/venv/bin/activate

2. Unit Testing

Run the ingestion unit tests:

python -m pytest tests/unit/ -v

3. Local Execution (DirectRunner)

Run the Beam pipeline locally to validate parsing and schema:

python -m data_ingestion.pipeline.main \
    --input_file=tests/data/sample_customers.csv \
    --output_table=my-project:odp_generic.customers \
    --runner=DirectRunner \
    --temp_location=/tmp/beam-temp

4. Cloud Execution

The ingestion unit is typically triggered by Airflow. To trigger it manually for testing on GCP:

  1. Upload a data file to GCS.
  2. Upload a corresponding .ok file.
  3. The generic_pubsub_trigger_dag will detect the file and launch a Dataflow job using this ingestion code.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pipeline_ref_ingestion-1.0.13.tar.gz (28.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcp_pipeline_ref_ingestion-1.0.13-py3-none-any.whl (33.7 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pipeline_ref_ingestion-1.0.13.tar.gz.

File metadata

File hashes

Hashes for gcp_pipeline_ref_ingestion-1.0.13.tar.gz
Algorithm Hash digest
SHA256 1fce11dafc281161dfb63ef52efa6fa818286af0b1713eee462378ff90e8dde0
MD5 e251ee356d1580aa4066a1ef8cd45c17
BLAKE2b-256 85690905b9ab85f4298f4d9bd5332da1d32b85b6d84149b5ab7f5f8096188537

See more details on using hashes here.

File details

Details for the file gcp_pipeline_ref_ingestion-1.0.13-py3-none-any.whl.

File metadata

File hashes

Hashes for gcp_pipeline_ref_ingestion-1.0.13-py3-none-any.whl
Algorithm Hash digest
SHA256 0245b602b1950c033d85dbe2f99e848dacd3651543f41f0a2463da59b9db669a
MD5 f14a6099de225ade111a18f34ae10ee7
BLAKE2b-256 5618ed6643ab28add39b63070d41b973d0c2957357259ba86d1842e2175315ce

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page