Reference Implementation: GCS-to-BigQuery Ingestion Pipeline using gcp-pipeline-framework
Project description
Generic Ingestion
Unit 1 of Generic 3-Unit Deployment
ODP Ingestion Pipeline - reads mainframe extracts from GCS and loads to BigQuery.
Flow Diagram
Generic INGESTION FLOW
─────────────────
GCS Landing Beam Pipeline BigQuery ODP
─────────── ───────────── ────────────
1. .csv splits ──┐
2. .ok file ──┼──► Pub/Sub ──► Orchestration ──► 1. Read CSV ┌──────────────────┐
│ Event (Unit 3) 2. Parse HDR/TRL │ odp_generic. │
│ 3. Validate │ - customers │
│ 4. Add Audit │ - accounts │
│ 5. Write to BQ ─►│ - decision │
│ │ - applications │
└──────────────────────────────────┘ └──────────────────┘
│
▼
┌─────────────┐
│ Archive to │
│ GCS Archive │
└─────────────┘
Pattern
JOIN + MAP: 4 entities → 4 ODP tables
| Entity | ODP Table | Key Fields |
|---|---|---|
| Customers | odp_generic.customers |
customer_id, ssn, status, created_date |
| Accounts | odp_generic.accounts |
account_id, customer_id, account_type, balance, open_date |
| Decision | odp_generic.decision |
decision_id, customer_id, decision_code, score, decision_date |
| Applications | odp_generic.applications |
application_id, customer_id, loan_amount, interest_rate, term_months, application_date, status, event_type, account_type |
Components
| Directory | Purpose |
|---|---|
data_ingestion/pipeline/ |
Beam pipeline and transforms |
data_ingestion/config/ |
System configuration |
data_ingestion/schema/ |
Python Schemas: Source-of-truth EntitySchema definitions used for validation, PII masking logic, and BigQuery schema generation. |
data_ingestion/validation/ |
File and record validators |
Schema Architecture
The ingestion unit maintains Python schema definitions that serve as the single source of truth:
- Purpose: Used by the Beam pipeline for runtime CSV parsing, record-level validation, in-flight PII masking, and BigQuery schema generation.
- Feature: Can dynamically generate BigQuery schemas during job execution (
CREATE_IF_NEEDED). - Location:
data_ingestion/schema/— one Python module per entity (customers, accounts, decision, applications) plusregistry.pyfor entity schema registration.
Library-Driven Ease of Use
The Generic ingestion pipeline is a Lean Consumer of the library framework. It achieves complex mainframe ingestion with minimal custom code by leveraging:
- Metadata-Driven Schema:
data_ingestion/schema/customers.pysimply defines anEntitySchema. The library'sSchemaValidatorhandles all type checking and PII masking automatically. - Standardized Parsing: Uses the
HDRTRLParserfromgcp-pipeline-beamto validate mainframe headers/trailers without regex boilerplate. - Audit Integrity: Automatically injects
_run_idand_processed_atusing theAddAuditColumnsDoFnlibrary transform.
How to Replicate this JOIN Ingestion (3-to-3)
To create a new ingestion unit for a multi-entity system, follow the Creating New Deployment Guide.
Key steps for this JOIN pattern:
- Define Schema: Create a new schema file using
gcp_pipeline_core.schema.EntitySchema. - Configure Pipeline: Inherit from
gcp_pipeline_beam.pipelines.base.BasePipeline. - Plug in Transforms: Use the fluent
BeamPipelineBuilderto chainread_csv->validate->write_to_bigquery. - CI/CD Config: Update your project and org identifiers in your CI/CD configuration.
Infrastructure & Configurations
Google Cloud Resources
This deployment requires the following GCP infrastructure, provisioned via Terraform:
- Storage: GCS buckets for
landing,archive, anderrorfiles. - Messaging: Pub/Sub Topic
generic-file-notificationsand Subscriptiongeneric-file-notifications-sub. - Processing: Cloud Dataflow (Apache Beam) for running the ingestion pipeline.
- Data Warehouse: BigQuery dataset
odp_genericfor raw data storage.
For detailed infrastructure definitions, see infrastructure/terraform/systems/generic/ingestion/.
Pipeline Configuration (GenericPipelineOptions)
The ingestion pipeline accepts several command-line arguments to control its behavior:
| Argument | Description | Required |
|---|---|---|
--entity |
Generic entity to process (customers, accounts, decision, applications) |
Yes |
--source_file |
GCS path to input CSV file | Yes |
--output_table |
Target BigQuery table (project:dataset.table) |
Yes |
--error_table |
BQ table for failed records | Yes |
--run_id |
Unique identifier for tracking/auditing | Yes |
--extract_date |
Extract date in YYYYMMDD format |
Yes |
--job_control_project |
GCP Project for job control table | No |
Technology Stack & Documentation
- Google Cloud Dataflow - Managed Apache Beam service
- Apache Beam Python SDK - Programming model for data processing
- Google Cloud Storage - Input file landing zone
- Google BigQuery - Data warehouse target
- Google Cloud Pub/Sub - Event-driven triggers
- Apache Beam BigQuery I/O - Connector for BQ
Dependencies
| Library | Purpose |
|---|---|
gcp-pipeline-core |
Audit, logging, error handling |
gcp-pipeline-beam |
Beam transforms, HDR/TRL parsing |
NO Apache Airflow dependency - orchestration is separate unit.
Execution & Testing
1. Local Development Setup
Initialize the virtual environment:
./scripts/setup_deployment_venv.sh original-data-to-bigqueryload
source deployments/original-data-to-bigqueryload/venv/bin/activate
2. Unit Testing
Run the ingestion unit tests:
python -m pytest tests/unit/ -v
3. Local Execution (DirectRunner)
Run the Beam pipeline locally to validate parsing and schema:
python -m data_ingestion.pipeline.runner \
--input_file=tests/data/sample_customers.csv \
--output_table=my-project:odp_generic.customers \
--runner=DirectRunner \
--temp_location=/tmp/beam-temp
4. Cloud Execution
The ingestion unit is typically triggered by Airflow. To trigger it manually for testing on GCP:
- Upload a data file to GCS.
- Upload a corresponding
.okfile. - The
generic_pubsub_trigger_dagwill detect the file and launch a Dataflow job using this ingestion code.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gcp_pipeline_ref_ingestion-1.0.29.tar.gz.
File metadata
- Download URL: gcp_pipeline_ref_ingestion-1.0.29.tar.gz
- Upload date:
- Size: 27.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
494bda75db97e1fc9f80896fc656931380e7c6b88dae95cbc3540b46b3797aa8
|
|
| MD5 |
7ecdb3c5f191ff88fc1fb486f947db0d
|
|
| BLAKE2b-256 |
f69b79aff50bb87254fe8e9cf647d46b65baf606dd1e5659aa081785569d577e
|
File details
Details for the file gcp_pipeline_ref_ingestion-1.0.29-py3-none-any.whl.
File metadata
- Download URL: gcp_pipeline_ref_ingestion-1.0.29-py3-none-any.whl
- Upload date:
- Size: 33.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2420accb79a89b5f42d74021b50ce559448bc8f71474d26cfb219613ad7c287a
|
|
| MD5 |
8c8439509370fdff8041053eabc115a7
|
|
| BLAKE2b-256 |
e514c241efd4c4d15e9ca48d94f818fd66cbd8c4f6428be0824b509f8b28546f
|