Core library for GCP data pipelines
Project description
gcp-pipeline-core
Foundation library - audit, monitoring, error handling, job control.
NO Apache Beam or Airflow dependencies.
Architecture
GCP-PIPELINE-CORE
─────────────────
┌─────────────────────────────────────────────────────────────────┐
│ FOUNDATION LAYER │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Audit │ │ Monitoring │ │ Error │ │
│ │ Trail │ │ Metrics │ │ Handling │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Utilities Layer │ │
│ │ • Structured Logging (JSON) │ │
│ │ • Run ID Generation │ │
│ │ • Configuration Management │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Clients │ │ Job Control │ │ Schema │ │
│ │ GCS/BQ/PS │ │ Repository │ │ Definitions │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
│
▼
Used by: gcp-pipeline-beam, gcp-pipeline-orchestration
Modules
| Module | Purpose | Key Classes |
|---|---|---|
audit/ |
Lineage tracking, reconciliation | AuditTrail, ReconciliationEngine |
monitoring/ |
Metrics, health, alerts, OTEL tracing | MetricsCollector, MigrationMetrics, ObservabilityManager, AlertManager, OTELConfig |
finops/ |
Cost tracking and labeling | BigQueryCostTracker, FinOpsLabels |
error_handling/ |
Error classification, retry | ErrorHandler, RetryPolicy |
job_control/ |
Pipeline status tracking | JobControlRepository, PipelineJob |
clients/ |
GCP service wrappers | GCSClient, BigQueryClient, PubSubClient |
utilities/ |
Logging, run ID | configure_structured_logging, generate_run_id |
data_quality/ |
Quality checks, scoring, anomaly detection | DataQualityChecker, QualityScore, AnomalyDetector |
data_deletion/ |
Quarantine, safe deletion, recovery | DataDeletionFramework, SafeDataDeletion, RecoveryManager |
schema.py |
Entity definitions | EntitySchema, SchemaField |
Component Flow
Pipeline Start
│
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ generate_ │───►│ AuditTrail │───►│ Structured │
│ run_id() │ │ .record_processing_start() │ │ Logging │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ JobControl │ │ Metrics │ │ Error │
│ .create() │ │ .record() │ │ Handler │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└──────────────────┴──────────────────┘
│
▼
┌─────────────┐
│ Reconcile │
│ & Complete │
└─────────────┘
Key Findings
1. Audit Trail & Lineage
- AuditTrail: Implements robust tracking of
run_idacross all pipeline stages. - DuplicateDetector: Provides idempotency by tracking seen records and preventing double-processing.
- Publisher: Supports automated publishing of audit records to BigQuery for centralized monitoring.
2. Sophisticated Error Handling
- ErrorClassifier: Categorizes exceptions into:
- Validation: Data errors (no retry).
- Integration: Connection/API errors (retry with backoff).
- Resource: Quota/Rate limit errors (exponential backoff).
- RetryPolicy: Configurable backoff multipliers, jitter, and maximum retry attempts.
3. Job Control
- JobControlRepository: Centralized state management for pipeline executions.
- State Tracking: Granular tracking of failure stages, start/end times, and record counts.
4. Structured Logging
- Standardized JSON logging with automated context injection (
run_id,system_id). - Optimized for Cloud Logging and BigQuery ingestion.
5. FinOps & Cost Tracking
- Cost Estimation: Automated cost estimation for BigQuery (Query/Load), GCS (Storage/Upload), and Pub/Sub (Publishing).
- FinOpsLabels: Standardized GCP resource labeling for precise cost allocation.
- Monitoring Integration: Seamless integration with
MigrationMetricsfor real-time cost visibility in audit logs. - Trackers:
BigQueryCostTracker: Estimates costs based on bytes billed and slot usage.CloudStorageCostTracker: Estimates storage costs and upload fees.PubSubCostTracker: Estimates throughput costs with 1KB minimum billing awareness.
- Decorators:
@track_bq_costfor automated tracking of BigQuery jobs.
6. Monitoring & Observability
The monitoring/ module provides a layered observability stack — from low-level metric counters through health checks and alerts to full OpenTelemetry distributed tracing.
Architecture
ObservabilityManager (facade — combines all three)
├── MetricsCollector Thread-safe counters, gauges, histograms, timers
│ └── MigrationMetrics Standardized pipeline metrics + FinOps metrics
├── HealthChecker 5-check health assessment (error rate, queue depth, memory, etc.)
└── AlertManager Multi-backend alerting
├── LoggingAlertBackend Python logging (default)
├── SlackAlertBackend Slack Webhooks (Block Kit)
├── DynatraceAlertBackend Dynatrace Events API v2
├── ServiceNowAlertBackend ServiceNow incident creation
├── CloudMonitoringBackend Google Cloud Monitoring
└── DatadogAlertBackend Datadog (stub)
MigrationMetrics — Pipeline-Level Metrics
High-level metrics API with standardized names for all pipeline stages:
from gcp_pipeline_core.monitoring import MigrationMetrics
metrics = MigrationMetrics(run_id="run_20260323", system_id="generic", entity_type="customers")
# Record processing counts
metrics.record_read(1000)
metrics.record_validated(995)
metrics.record_failed(5, error_type="schema_validation")
metrics.record_written(995)
# FinOps integration
metrics.record_cost(0.42)
metrics.record_bytes_scanned(1_073_741_824)
# Timing
with metrics.start_timer("transform"):
transform_records()
# Summary (includes validation success rate, FinOps, timing)
summary = metrics.get_summary()
job_record = metrics.to_job_record() # Ready for pipeline_jobs table
HealthChecker — Automated Health Assessment
from gcp_pipeline_core.monitoring import HealthChecker, MetricsCollector
collector = MetricsCollector("my_pipeline", "run_001")
health = HealthChecker(collector)
results = health.run_all_checks()
# Checks: record_processing, error_rate (≤10%), queue_depth (≤1000),
# processing_time (≤3600s), memory_usage (≤1024MB)
if not health.is_healthy():
# Trigger alert
AlertManager — Multi-Backend Alerting
from gcp_pipeline_core.monitoring import AlertManager, AlertLevel
from gcp_pipeline_core.monitoring.alerts import SlackAlertBackend, DynatraceAlertBackend, ServiceNowAlertBackend
manager = AlertManager(alert_backends=[
SlackAlertBackend(webhook_url="https://hooks.slack.com/..."),
DynatraceAlertBackend(environment_url="https://xyz.live.dynatrace.com", api_token="dt0c01..."),
ServiceNowAlertBackend(instance_url="https://company.service-now.com", username="...", password="...",
assignment_group="Data Engineering"),
])
# Sends to all configured backends simultaneously
manager.create_alert(
level=AlertLevel.CRITICAL,
title="Pipeline Failure: customers",
message="Schema validation failed for 500 records",
source="gcp-pipeline-beam",
metric_name="records_failed",
threshold_value=0,
actual_value=500,
)
# Query recent alerts
critical_alerts = manager.get_recent_alerts(minutes=60, level=AlertLevel.CRITICAL)
ObservabilityManager — Unified Facade
from gcp_pipeline_core.monitoring import ObservabilityManager
obs = ObservabilityManager("my_pipeline", "run_001", alert_backends=[...])
obs.report_records_processed(1000)
obs.report_records_error(5)
# Auto-triggers WARNING alert if any health check fails
is_healthy = obs.check_health()
# Full summary: metrics + health + recent alerts
print(obs.export_metrics())
OpenTelemetry Integration (Optional)
Full distributed tracing with graceful degradation — if OTEL SDK is not installed, all tracing calls become no-ops (zero overhead).
from gcp_pipeline_core.monitoring import configure_otel, OTELConfig, trace_function, OTELContext
# Configure for GCP Cloud Trace
config = OTELConfig.for_gcp_otlp(service_name="generic-ingestion", project_id="my-project")
configure_otel(config)
# Or Dynatrace
config = OTELConfig.for_dynatrace(
service_name="generic-ingestion",
dynatrace_url="https://xyz.live.dynatrace.com/api/v2/otlp",
dynatrace_token="dt0c01...",
)
# Decorator: auto-creates spans per function call
@trace_function(span_name="validate_records", attributes={"entity": "customers"})
def validate_records(records):
...
# Context manager: pipeline-level + child spans
with OTELContext(run_id="run_001", system_id="generic", entity_type="customers") as ctx:
with ctx.span("parse_csv"):
parse_csv_files()
with ctx.span("validate"):
validate_records()
# Bridge: export existing MetricsCollector data to OTEL backends
from gcp_pipeline_core.monitoring import OTELMetricsBridge
bridge = OTELMetricsBridge(collector, meter_name="pipeline_metrics")
bridge.increment("records_processed", 100) # Forwards to BOTH collector AND OTEL
Supported OTEL exporters:
| Exporter | Config Factory | Endpoint |
|---|---|---|
| GCP Native OTel | OTELConfig.for_gcp_otlp() |
telemetry.googleapis.com:443 |
| Google Cloud Trace | OTELConfig.for_gcp() |
Cloud Trace API |
| Dynatrace | OTELConfig.for_dynatrace() |
OTLP/HTTP with Api-Token header |
| Any OTLP (Jaeger, etc.) | OTELConfig(exporter_type=OTLP, otlp_endpoint="...") |
gRPC OTLP |
| Console (debug) | OTELConfig.for_console() |
stdout |
| Disabled | OTELConfig.disabled() |
No-op |
7. Data Quality
Six-dimension quality scoring with anomaly detection:
- Completeness (95% threshold), Validity (90%), Accuracy (exact match), Uniqueness (100%), Timeliness (80%)
DataQualityChecker: orchestrates all dimensions withget_quality_report()ScoreCalculator: letter grades (A-F) from weighted dimension scoresAnomalyDetector: IQR-based outlier detection on numeric fields
8. Data Deletion & Recovery
Safe, auditable data deletion with approval workflows:
MalformationDetector: detects malformed records with 10 categorized reasonsQuarantineManager: 4-level quarantine (REVIEW, HOLD, DELETE, ARCHIVE)SafeDataDeletion: approval-gated deletion with configurable batch sizesRecoveryManager/GCSRecoveryManager: checkpoint-based recovery (in-memory or GCS-persisted)
Governance & Compliance
- Zero-Bleed Policy: This library MUST NOT import
apache_beamorairflow. - Portability: Must remain compatible with any Python environment (Cloud Functions, Cloud Run, local scripts, etc.).
- Testing: All new features require unit tests in
tests/unit/.
Usage
from gcp_pipeline_core.audit import AuditTrail, ReconciliationEngine
from gcp_pipeline_core.monitoring import MetricsCollector, MigrationMetrics, ObservabilityManager
from gcp_pipeline_core.monitoring import AlertManager, AlertLevel, OTELConfig, configure_otel
from gcp_pipeline_core.monitoring.alerts import SlackAlertBackend, DynatraceAlertBackend
from gcp_pipeline_core.utilities import configure_structured_logging, generate_run_id
from gcp_pipeline_core.schema import EntitySchema, SchemaField
from gcp_pipeline_core.job_control import JobControlRepository, JobStatus, PipelineJob
from gcp_pipeline_core.error_handling import ErrorHandler, RetryPolicy
from gcp_pipeline_core.finops import BigQueryCostTracker, FinOpsLabels, track_bq_cost
from gcp_pipeline_core.data_quality import DataQualityChecker
from gcp_pipeline_core.data_deletion import DataDeletionFramework
Tests
python3.11 -m pytest tests/ -v
# 256 passed
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gcp_pipeline_core-1.0.29.tar.gz.
File metadata
- Download URL: gcp_pipeline_core-1.0.29.tar.gz
- Upload date:
- Size: 71.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f8e714c2b285633e0a387dc407a62063564e8b3a976633e12733e3b0079a9d93
|
|
| MD5 |
83ed723436387a9387fca936cea20b53
|
|
| BLAKE2b-256 |
87cf0c0f56e2f4d057ee0eb85de12ca1f385ff6c420e0967cc52864e7dbd35c7
|
File details
Details for the file gcp_pipeline_core-1.0.29-py3-none-any.whl.
File metadata
- Download URL: gcp_pipeline_core-1.0.29-py3-none-any.whl
- Upload date:
- Size: 92.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
07ed3314570b8264012a1f459b8af76d67e6dba6050e6e078edf36eda7945204
|
|
| MD5 |
57b05547a26a5485df164f1ccf741339
|
|
| BLAKE2b-256 |
d7687174c42f5300aa4b03e4a0a659da401d3e60efd5b4a737e94884f78eb315
|