Skip to main content

Reference Implementation: GCS-to-BigQuery Ingestion Pipeline using gcp-pipeline-framework

Project description

Generic Ingestion

Unit 1 of Generic 3-Unit Deployment

ODP Ingestion Pipeline - reads mainframe extracts from GCS and loads to BigQuery.


Flow Diagram

                         Generic INGESTION FLOW
                         ─────────────────

  GCS Landing                  Beam Pipeline                    BigQuery ODP
  ───────────                  ─────────────                    ────────────

  1. .csv splits ──┐
  2. .ok file    ──┼──► Pub/Sub ──► Orchestration ──► 1. Read CSV      ┌──────────────────┐
                   │      Event      (Unit 3)         2. Parse HDR/TRL │ odp_generic.      │
                   │                                  3. Validate      │ - customers       │
                   │                                  4. Add Audit     │ - accounts        │
                   │                                  5. Write to BQ ─►│ - decision        │
                   │                                                   │ - applications    │
                   └──────────────────────────────────┘                └──────────────────┘
                               │
                               ▼
                        ┌─────────────┐
                        │ Archive to  │
                        │ GCS Archive │
                        └─────────────┘

Pattern

JOIN + MAP: 4 entities → 4 ODP tables

Entity ODP Table Key Fields
Customers odp_generic.customers customer_id, ssn, status, created_date
Accounts odp_generic.accounts account_id, customer_id, account_type, balance, open_date
Decision odp_generic.decision decision_id, customer_id, decision_code, score, decision_date
Applications odp_generic.applications application_id, customer_id, loan_amount, interest_rate, term_months, application_date, status, event_type, account_type

Components

Directory Purpose
data_ingestion/pipeline/ Beam pipeline and transforms
data_ingestion/config/ System configuration
data_ingestion/schema/ Python Schemas: Source-of-truth EntitySchema definitions used for validation, PII masking logic, and BigQuery schema generation.
data_ingestion/validation/ File and record validators

Schema Architecture

The ingestion unit maintains Python schema definitions that serve as the single source of truth:

  • Purpose: Used by the Beam pipeline for runtime CSV parsing, record-level validation, in-flight PII masking, and BigQuery schema generation.
  • Feature: Can dynamically generate BigQuery schemas during job execution (CREATE_IF_NEEDED).
  • Location: data_ingestion/schema/ — one Python module per entity (customers, accounts, decision, applications) plus registry.py for entity schema registration.

Library-Driven Ease of Use

The Generic ingestion pipeline is a Lean Consumer of the library framework. It achieves complex mainframe ingestion with minimal custom code by leveraging:

  1. Metadata-Driven Schema: data_ingestion/schema/customers.py simply defines an EntitySchema. The library's SchemaValidator handles all type checking and PII masking automatically.
  2. Standardized Parsing: Uses the HDRTRLParser from gcp-pipeline-beam to validate mainframe headers/trailers without regex boilerplate.
  3. Audit Integrity: Automatically injects _run_id and _processed_at using the AddAuditColumnsDoFn library transform.

How to Replicate this JOIN Ingestion (3-to-3)

To create a new ingestion unit for a multi-entity system, follow the Creating New Deployment Guide.

Key steps for this JOIN pattern:

  1. Define Schema: Create a new schema file using gcp_pipeline_core.schema.EntitySchema.
  2. Configure Pipeline: Inherit from gcp_pipeline_beam.pipelines.base.BasePipeline.
  3. Plug in Transforms: Use the fluent BeamPipelineBuilder to chain read_csv -> validate -> write_to_bigquery.
  4. CI/CD Config: Update your project and org identifiers in your CI/CD configuration.

Infrastructure & Configurations

Google Cloud Resources

This deployment requires the following GCP infrastructure, provisioned via Terraform:

  • Storage: GCS buckets for landing, archive, and error files.
  • Messaging: Pub/Sub Topic generic-file-notifications and Subscription generic-file-notifications-sub.
  • Processing: Cloud Dataflow (Apache Beam) for running the ingestion pipeline.
  • Data Warehouse: BigQuery dataset odp_generic for raw data storage.

For detailed infrastructure definitions, see infrastructure/terraform/systems/generic/ingestion/.

Pipeline Configuration (GenericPipelineOptions)

The ingestion pipeline accepts several command-line arguments to control its behavior:

Argument Description Required
--entity Generic entity to process (customers, accounts, decision, applications) Yes
--source_file GCS path to input CSV file Yes
--output_table Target BigQuery table (project:dataset.table) Yes
--error_table BQ table for failed records Yes
--run_id Unique identifier for tracking/auditing Yes
--extract_date Extract date in YYYYMMDD format Yes
--job_control_project GCP Project for job control table No

Technology Stack & Documentation


Dependencies

Library Purpose
gcp-pipeline-core Audit, logging, error handling
gcp-pipeline-beam Beam transforms, HDR/TRL parsing

NO Apache Airflow dependency - orchestration is separate unit.


Execution & Testing

1. Local Development Setup

Initialize the virtual environment:

./scripts/setup_deployment_venv.sh original-data-to-bigqueryload
source deployments/original-data-to-bigqueryload/venv/bin/activate

2. Unit Testing

Run the ingestion unit tests:

python -m pytest tests/unit/ -v

3. Local Execution (DirectRunner)

Run the Beam pipeline locally to validate parsing and schema:

python -m data_ingestion.pipeline.runner \
    --input_file=tests/data/sample_customers.csv \
    --output_table=my-project:odp_generic.customers \
    --runner=DirectRunner \
    --temp_location=/tmp/beam-temp

4. Cloud Execution

The ingestion unit is typically triggered by Airflow. To trigger it manually for testing on GCP:

  1. Upload a data file to GCS.
  2. Upload a corresponding .ok file.
  3. The generic_pubsub_trigger_dag will detect the file and launch a Dataflow job using this ingestion code.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pipeline_ref_ingestion-1.0.29.tar.gz (27.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcp_pipeline_ref_ingestion-1.0.29-py3-none-any.whl (33.6 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pipeline_ref_ingestion-1.0.29.tar.gz.

File metadata

File hashes

Hashes for gcp_pipeline_ref_ingestion-1.0.29.tar.gz
Algorithm Hash digest
SHA256 494bda75db97e1fc9f80896fc656931380e7c6b88dae95cbc3540b46b3797aa8
MD5 7ecdb3c5f191ff88fc1fb486f947db0d
BLAKE2b-256 f69b79aff50bb87254fe8e9cf647d46b65baf606dd1e5659aa081785569d577e

See more details on using hashes here.

File details

Details for the file gcp_pipeline_ref_ingestion-1.0.29-py3-none-any.whl.

File metadata

File hashes

Hashes for gcp_pipeline_ref_ingestion-1.0.29-py3-none-any.whl
Algorithm Hash digest
SHA256 2420accb79a89b5f42d74021b50ce559448bc8f71474d26cfb219613ad7c287a
MD5 8c8439509370fdff8041053eabc115a7
BLAKE2b-256 e514c241efd4c4d15e9ca48d94f818fd66cbd8c4f6428be0824b509f8b28546f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page