Core library for GCP data pipelines
Project description
gcp-pipeline-core
Foundation library - audit, monitoring, error handling, job control.
NO Apache Beam or Airflow dependencies.
Architecture
GCP-PIPELINE-CORE
─────────────────
┌─────────────────────────────────────────────────────────────────┐
│ FOUNDATION LAYER │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Audit │ │ Monitoring │ │ Error │ │
│ │ Trail │ │ Metrics │ │ Handling │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Utilities Layer │ │
│ │ • Structured Logging (JSON) │ │
│ │ • Run ID Generation │ │
│ │ • Configuration Management │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Clients │ │ Job Control │ │ Schema │ │
│ │ GCS/BQ/PS │ │ Repository │ │ Definitions │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
│
▼
Used by: gcp-pipeline-beam, gcp-pipeline-orchestration
Modules
| Module | Purpose | Key Classes |
|---|---|---|
audit/ |
Lineage tracking, reconciliation | AuditTrail, ReconciliationEngine |
monitoring/ |
Metrics, OTEL/Dynatrace | MetricsCollector, OTELExporter |
finops/ |
Cost tracking and labeling | BigQueryCostTracker, FinOpsLabels |
error_handling/ |
Error classification, retry | ErrorHandler, RetryPolicy |
job_control/ |
Pipeline status tracking | JobControlRepository, PipelineJob |
clients/ |
GCP service wrappers | GCSClient, BigQueryClient, PubSubClient |
utilities/ |
Logging, run ID | configure_structured_logging, generate_run_id |
data_quality/ |
Quality checks | validate_row_types, check_duplicate_keys |
schema.py |
Entity definitions | EntitySchema, SchemaField |
Component Flow
Pipeline Start
│
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ generate_ │───►│ AuditTrail │───►│ Structured │
│ run_id() │ │ .record_processing_start() │ │ Logging │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ JobControl │ │ Metrics │ │ Error │
│ .create() │ │ .record() │ │ Handler │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└──────────────────┴──────────────────┘
│
▼
┌─────────────┐
│ Reconcile │
│ & Complete │
└─────────────┘
Key Findings
1. Audit Trail & Lineage
- AuditTrail: Implements robust tracking of
run_idacross all pipeline stages. - DuplicateDetector: Provides idempotency by tracking seen records and preventing double-processing.
- Publisher: Supports automated publishing of audit records to BigQuery for centralized monitoring.
2. Sophisticated Error Handling
- ErrorClassifier: Categorizes exceptions into:
- Validation: Data errors (no retry).
- Integration: Connection/API errors (retry with backoff).
- Resource: Quota/Rate limit errors (exponential backoff).
- RetryPolicy: Configurable backoff multipliers, jitter, and maximum retry attempts.
3. Job Control
- JobControlRepository: Centralized state management for pipeline executions.
- State Tracking: Granular tracking of failure stages, start/end times, and record counts.
4. Structured Logging
- Standardized JSON logging with automated context injection (
run_id,system_id). - Optimized for Cloud Logging and BigQuery ingestion.
5. FinOps & Cost Tracking
- Cost Estimation: Automated cost estimation for BigQuery (Query/Load), GCS (Storage/Upload), and Pub/Sub (Publishing).
- FinOpsLabels: Standardized GCP resource labeling for precise cost allocation.
- Monitoring Integration: Seamless integration with
MigrationMetricsfor real-time cost visibility in audit logs. - Trackers:
BigQueryCostTracker: Estimates costs based on bytes billed and slot usage.CloudStorageCostTracker: Estimates storage costs and upload fees.PubSubCostTracker: Estimates throughput costs with 1KB minimum billing awareness.
- Decorators:
@track_bq_costfor automated tracking of BigQuery jobs.
Governance & Compliance
- Zero-Bleed Policy: This library MUST NOT import
apache_beamorairflow. - Portability: Must remain compatible with any Python environment (Cloud Functions, Cloud Run, local scripts, etc.).
- Testing: All new features require unit tests in
tests/unit/.
Usage
from gcp_pipeline_core.audit import AuditTrail, ReconciliationEngine
from gcp_pipeline_core.monitoring import MetricsCollector
from gcp_pipeline_core.utilities import configure_structured_logging, generate_run_id
from gcp_pipeline_core.schema import EntitySchema, SchemaField
from gcp_pipeline_core.job_control import JobControlRepository
from gcp_pipeline_core.error_handling import ErrorHandler
from gcp_pipeline_core.finops import BigQueryCostTracker, FinOpsLabels, track_bq_cost
Tests
python3.11 -m pytest tests/ -v
# 256 passed
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
gcp_pipeline_core-1.0.23.tar.gz
(64.6 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gcp_pipeline_core-1.0.23.tar.gz.
File metadata
- Download URL: gcp_pipeline_core-1.0.23.tar.gz
- Upload date:
- Size: 64.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
704a56d66f6ad9447e0a255f5883fb2f29a841b231ed81175455e929f057f005
|
|
| MD5 |
ee8a4e2ced1f36c807547861bd15a175
|
|
| BLAKE2b-256 |
1a6031c0cd91ad8f89ac3834a7704c5e402cc50b72f85d2e252432529ab06c5a
|
File details
Details for the file gcp_pipeline_core-1.0.23-py3-none-any.whl.
File metadata
- Download URL: gcp_pipeline_core-1.0.23-py3-none-any.whl
- Upload date:
- Size: 88.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e7c6466eac1f85a566b2290b2c10726523aef42c5e752c521894ba80b8fd51aa
|
|
| MD5 |
ad6e84cf5ceff9e7915e857f7af15bb0
|
|
| BLAKE2b-256 |
c4020c05f4cbe446a0aa94849c369c72af18f59d0790d8d74eea90f9fd6657f0
|