Beam ingestion library for GCP data pipelines
Project description
gcp-pipeline-beam
Ingestion library - Beam pipelines, transforms, file management.
Depends on: gcp-pipeline-core
NO Apache Airflow dependency.
Architecture
GCP-PIPELINE-BEAM
─────────────────
┌─────────────────────────────────────────────────────────────────┐
│ INGESTION LAYER │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ File Management │ │
│ │ • HDR/TRL Parser (header/trailer validation) │ │
│ │ • Split File Handler (reassemble split files) │ │
│ │ • File Archiver (move to archive bucket) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Validators │ │
│ │ • SchemaValidator (validate against EntitySchema) │ │
│ │ • SSN, Date, Numeric validators │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Beam Transforms │ │
│ │ • RobustCsvParseDoFn (parse CSV to dict) │ │
│ │ • SchemaValidateRecordDoFn (schema validation) │ │
│ │ • EnrichWithMetadataDoFn (add _run_id, etc.) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Base Pipeline │ │
│ │ • BasePipeline (abstract class) │ │
│ │ • PipelineConfig, PipelineOptions │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
│
▼
Uses: gcp-pipeline-core
Ingestion Flow
GCS Landing Beam Pipeline BigQuery
─────────── ───────────── ────────
file.csv ──────► ┌─────────────────────┐
file.csv.ok │ │
│ 1. HDRTRLParser │
│ • Validate HDR │
│ • Validate TRL │
│ • Check count │
│ │
│ 2. CSV Parser │
│ • CSV to dict │
│ │
│ 3. SchemaValidator │
│ • Required │────► Valid records ──► BigQuery
│ • Types │
│ • Allowed vals │────► Invalid ──► Error bucket
│ │
│ 4. EnrichMetadata │
│ • _run_id │
│ • _source_file │
│ • _processed_at │
│ │
└─────────────────────┘
│
▼
┌─────────────────────┐
│ Archive to GCS │
└─────────────────────┘
Split File Handling
The system supports processing files that have been split into multiple parts. The .ok file signals ALL splits are ready.
GCS Landing Bucket Pub/Sub & Processing
────────────────── ────────────────────
customers_1.csv ──┐
customers_2.csv ──┼── (data files)
customers_3.csv ──┘
│
│
customers.csv.ok ─────► Pub/Sub Notification
│ │
│ ▼
│ ┌─────────────────┐
│ │ Airflow Sensor │
│ │ (detects .ok) │
│ └────────┬────────┘
│ │
│ ▼
│ ┌─────────────────┐
│ │ File Discovery │
│ │ • List bucket │
│ │ • Find splits: │
│ │ customers_*.csv
│ └────────┬────────┘
│ │
└────────────────────────┘
│
▼
┌─────────────────┐
│ Process ALL │
│ split files │
│ in single job │
└─────────────────┘
Split File Discovery Logic
# 1. Pub/Sub receives notification for .ok file
# Message: {"name": "application1/customers/customers.csv.ok", "bucket": "landing"}
# 2. Sensor extracts entity name from .ok file
# entity = "customers" (from customers.csv.ok)
# 3. File discovery finds all matching splits
# pattern = f"gs://landing/application1/customers/customers*.csv"
# files = [
# "gs://landing/application1/customers/customers_1.csv",
# "gs://landing/application1/customers/customers_2.csv",
# "gs://landing/application1/customers/customers_3.csv",
# ]
# 4. All files processed in single Dataflow job
# pipeline.read_from_gcs(files) # Reads all splits
Key Points
| Aspect | Behavior |
|---|---|
| Trigger | Only .ok file triggers processing |
| Discovery | Pattern match: {entity}*.csv or {entity}_*.csv |
| Processing | All splits processed in single Dataflow job |
| Validation | Each split has own HDR/TRL - all validated |
| Audit | All records get same _run_id |
Modules
| Module | Purpose | Key Classes |
|---|---|---|
file_management/ |
HDR/TRL parsing, archival | HDRTRLParser, FileArchiver |
validators/ |
Schema-driven validation | SchemaValidator, ValidationError |
pipelines/base/ |
Base classes | BasePipeline, PipelineConfig |
pipelines/beam/transforms/ |
Beam DoFns | RobustCsvParseDoFn, SchemaValidateRecordDoFn |
Key Findings
1. Advanced HDR/TRL Parsing
- Configurable Parser: Highly flexible regex-based parsing for header and trailer validation.
- Support: Handles custom patterns, prefixes, and multi-field extraction for diverse source systems.
- Validation: Automated record count and checksum verification against trailer values.
2. Fluent Pipeline API
- BeamPipelineBuilder: Provides a clean, chainable interface for building pipelines:
read_csv()/read_from_bigquery()validate()(Schema-driven)transform()(Custom business logic)write_to_bigquery()/write_to_gcs()
3. Schema Validation & PII Masking
- SchemaValidator: Validates records against
EntitySchemadefinitions fromcore. - In-flight Masking: Supports PII masking during the ingestion process, ensuring sensitive data is protected before landing in BigQuery.
4. Split File Handling
- Specialized logic for reassembling and processing split files from source systems.
Governance & Compliance
- Domain Isolation: Depends on
coreandbeam; MUST NOT importairflow. - Testing: Every transform and pipeline component requires unit tests using
gcp-pipeline-tester. - Reuse: Prefer using
BeamPipelineBuilderfor consistent pipeline construction.
Usage
from gcp_pipeline_beam.file_management import HDRTRLParser, FileArchiver
from gcp_pipeline_beam.validators import SchemaValidator
from gcp_pipeline_beam.pipelines.base import BasePipeline, PipelineConfig
from gcp_pipeline_beam.pipelines.beam.transforms import RobustCsvParseDoFn, SchemaValidateRecordDoFn
Resource Configuration
The library includes automatic resource configuration based on file sizes. This helps optimize Dataflow worker types and Docker resource limits.
Quick Usage
from gcp_pipeline_beam.pipelines.beam import (
ResourceConfigurator,
get_optimal_pipeline_options,
get_docker_config,
print_resource_recommendations,
)
# Get recommendations for a 500 MB file
print_resource_recommendations(500)
# Get optimized pipeline options for Dataflow
options = get_optimal_pipeline_options(
file_size_mb=500,
project_id="my-project",
region="europe-west2"
)
# Get Docker configuration
docker_config = get_docker_config(500)
print(f"Memory limit: {docker_config.memory_limit}")
print(f"CPU limit: {docker_config.cpu_limit}")
File Size Guidelines
| File Size | Category | Dataflow Worker | Docker Memory |
|---|---|---|---|
| < 100 MB | Small | n1-standard-2 | 4G |
| 100 MB - 1 GB | Medium | n1-standard-4 | 8G |
| 1 GB - 10 GB | Large | n1-highmem-8 | 16G |
| 10 GB - 100 GB | XLarge | n1-highmem-16 | 32G |
| > 100 GB | Split Required | Split files first | N/A |
Auto-Configure from GCS File
config = ResourceConfigurator(project_id="my-project")
# Auto-detect file size and get optimal options
options = config.get_pipeline_options_for_file("gs://bucket/large-file.csv")
# Get full recommendation summary
summary = config.get_recommendation_summary(5000) # 5 GB
print(f"Should split: {summary['should_split']}")
print(f"Estimated cost: ${summary['estimates']['cost_usd']}")
For complete documentation, see BEAM_FILE_PROCESSING_GUIDE.md.
Tests
python3.11 -m pytest tests/ -v
# 478 passed
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gcp_pipeline_beam-1.0.26.tar.gz.
File metadata
- Download URL: gcp_pipeline_beam-1.0.26.tar.gz
- Upload date:
- Size: 64.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2a1f2e8a7522c173295f01ce41c4e4c1e516daa06c314a929b80511179167219
|
|
| MD5 |
4fb7fc5ad6004f66ffb293d6ccdb6d6e
|
|
| BLAKE2b-256 |
2a508354c8a6c5d1cbedb9dd462053126fe8ac4a62be2ee1153cbebf43d8ea5e
|
File details
Details for the file gcp_pipeline_beam-1.0.26-py3-none-any.whl.
File metadata
- Download URL: gcp_pipeline_beam-1.0.26-py3-none-any.whl
- Upload date:
- Size: 83.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6eb5c5607bc1b806b95750c3fc13aae1b61f00d759b1cf9ce6c1b34ea576fe68
|
|
| MD5 |
9550e04ac377a817ba5dc103bb457902
|
|
| BLAKE2b-256 |
fd7c7e103c576316297958f94747e0b7be2a2ea695b5ef2f7e0332fed35cd290
|