Core library for GCP data pipelines
Project description
gcp-pipeline-core
Foundation library - audit, monitoring, error handling, job control.
NO Apache Beam or Airflow dependencies.
Architecture
GCP-PIPELINE-CORE
─────────────────
┌─────────────────────────────────────────────────────────────────┐
│ FOUNDATION LAYER │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Audit │ │ Monitoring │ │ Error │ │
│ │ Trail │ │ Metrics │ │ Handling │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Utilities Layer │ │
│ │ • Structured Logging (JSON) │ │
│ │ • Run ID Generation │ │
│ │ • Configuration Management │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Clients │ │ Job Control │ │ Schema │ │
│ │ GCS/BQ/PS │ │ Repository │ │ Definitions │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
│
▼
Used by: gcp-pipeline-beam, gcp-pipeline-orchestration
Modules
| Module | Purpose | Key Classes |
|---|---|---|
audit/ |
Lineage tracking, reconciliation | AuditTrail, ReconciliationEngine |
monitoring/ |
Metrics, OTEL/Dynatrace | MetricsCollector, OTELExporter |
finops/ |
Cost tracking and labeling | BigQueryCostTracker, FinOpsLabels |
error_handling/ |
Error classification, retry | ErrorHandler, RetryPolicy |
job_control/ |
Pipeline status tracking | JobControlRepository, PipelineJob |
clients/ |
GCP service wrappers | GCSClient, BigQueryClient, PubSubClient |
utilities/ |
Logging, run ID | configure_structured_logging, generate_run_id |
data_quality/ |
Quality checks | validate_row_types, check_duplicate_keys |
schema.py |
Entity definitions | EntitySchema, SchemaField |
Component Flow
Pipeline Start
│
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ generate_ │───►│ AuditTrail │───►│ Structured │
│ run_id() │ │ .record_processing_start() │ │ Logging │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ JobControl │ │ Metrics │ │ Error │
│ .create() │ │ .record() │ │ Handler │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└──────────────────┴──────────────────┘
│
▼
┌─────────────┐
│ Reconcile │
│ & Complete │
└─────────────┘
Key Findings
1. Audit Trail & Lineage
- AuditTrail: Implements robust tracking of
run_idacross all pipeline stages. - DuplicateDetector: Provides idempotency by tracking seen records and preventing double-processing.
- Publisher: Supports automated publishing of audit records to BigQuery for centralized monitoring.
2. Sophisticated Error Handling
- ErrorClassifier: Categorizes exceptions into:
- Validation: Data errors (no retry).
- Integration: Connection/API errors (retry with backoff).
- Resource: Quota/Rate limit errors (exponential backoff).
- RetryPolicy: Configurable backoff multipliers, jitter, and maximum retry attempts.
3. Job Control
- JobControlRepository: Centralized state management for pipeline executions.
- State Tracking: Granular tracking of failure stages, start/end times, and record counts.
4. Structured Logging
- Standardized JSON logging with automated context injection (
run_id,system_id). - Optimized for Cloud Logging and BigQuery ingestion.
5. FinOps & Cost Tracking
- Cost Estimation: Automated cost estimation for BigQuery (Query/Load), GCS (Storage/Upload), and Pub/Sub (Publishing).
- FinOpsLabels: Standardized GCP resource labeling for precise cost allocation.
- Monitoring Integration: Seamless integration with
MigrationMetricsfor real-time cost visibility in audit logs. - Trackers:
BigQueryCostTracker: Estimates costs based on bytes billed and slot usage.CloudStorageCostTracker: Estimates storage costs and upload fees.PubSubCostTracker: Estimates throughput costs with 1KB minimum billing awareness.
- Decorators:
@track_bq_costfor automated tracking of BigQuery jobs.
Governance & Compliance
- Zero-Bleed Policy: This library MUST NOT import
apache_beamorairflow. - Portability: Must remain compatible with any Python environment (Cloud Functions, Cloud Run, local scripts, etc.).
- Testing: All new features require unit tests in
tests/unit/.
Usage
from gcp_pipeline_core.audit import AuditTrail, ReconciliationEngine
from gcp_pipeline_core.monitoring import MetricsCollector
from gcp_pipeline_core.utilities import configure_structured_logging, generate_run_id
from gcp_pipeline_core.schema import EntitySchema, SchemaField
from gcp_pipeline_core.job_control import JobControlRepository
from gcp_pipeline_core.error_handling import ErrorHandler
from gcp_pipeline_core.finops import BigQueryCostTracker, FinOpsLabels, track_bq_cost
Tests
python3.11 -m pytest tests/ -v
# 256 passed
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
gcp_pipeline_core-1.0.24.tar.gz
(66.1 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gcp_pipeline_core-1.0.24.tar.gz.
File metadata
- Download URL: gcp_pipeline_core-1.0.24.tar.gz
- Upload date:
- Size: 66.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
218c7de3570dcd3de4bfc96806d8122f2f6bbd5ca7da6e4cdb602f746757a16d
|
|
| MD5 |
7d7a40df31095982eb43298648be5936
|
|
| BLAKE2b-256 |
87dd0f098c2c9bf7412fd7ac9b1b5e0c0356fb45d62529295f37d43cd3f104b2
|
File details
Details for the file gcp_pipeline_core-1.0.24-py3-none-any.whl.
File metadata
- Download URL: gcp_pipeline_core-1.0.24-py3-none-any.whl
- Upload date:
- Size: 90.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b9d51609b7e0cbef2e5e23309dfadafc826608487c4fa0aa38a47ee17f2b5bce
|
|
| MD5 |
45099f202394e20776007bf399c6b3b4
|
|
| BLAKE2b-256 |
4a5609468bdc118966a0da5baa7dc5eeecc1add31f04a6c11506bcd8e7d15050
|