Skip to main content

Core library for GCP data pipelines

Project description

gcp-pipeline-core

Foundation library - audit, monitoring, error handling, job control.

NO Apache Beam or Airflow dependencies.


Architecture

                         GCP-PIPELINE-CORE
                         ─────────────────

  ┌─────────────────────────────────────────────────────────────────┐
  │                     FOUNDATION LAYER                             │
  │                                                                  │
  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
  │  │   Audit     │  │  Monitoring │  │   Error     │              │
  │  │   Trail     │  │   Metrics   │  │  Handling   │              │
  │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘              │
  │         │                │                │                      │
  │         ▼                ▼                ▼                      │
  │  ┌─────────────────────────────────────────────────────────┐    │
  │  │                  Utilities Layer                         │    │
  │  │  • Structured Logging (JSON)                             │    │
  │  │  • Run ID Generation                                     │    │
  │  │  • Configuration Management                              │    │
  │  └─────────────────────────────────────────────────────────┘    │
  │         │                │                │                      │
  │         ▼                ▼                ▼                      │
  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
  │  │   Clients   │  │ Job Control │  │   Schema    │              │
  │  │ GCS/BQ/PS   │  │  Repository │  │ Definitions │              │
  │  └─────────────┘  └─────────────┘  └─────────────┘              │
  │                                                                  │
  └─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
              Used by: gcp-pipeline-beam, gcp-pipeline-orchestration

Modules

Module Purpose Key Classes
audit/ Lineage tracking, reconciliation AuditTrail, ReconciliationEngine
monitoring/ Metrics, OTEL/Dynatrace MetricsCollector, OTELExporter
finops/ Cost tracking and labeling BigQueryCostTracker, FinOpsLabels
error_handling/ Error classification, retry ErrorHandler, RetryPolicy
job_control/ Pipeline status tracking JobControlRepository, PipelineJob
clients/ GCP service wrappers GCSClient, BigQueryClient, PubSubClient
utilities/ Logging, run ID configure_structured_logging, generate_run_id
data_quality/ Quality checks validate_row_types, check_duplicate_keys
schema.py Entity definitions EntitySchema, SchemaField

Component Flow

Pipeline Start
      │
      ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ generate_   │───►│ AuditTrail  │───►│ Structured  │
│ run_id()    │    │ .start()    │    │ Logging     │
└─────────────┘    └─────────────┘    └─────────────┘
      │                  │                  │
      ▼                  ▼                  ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ JobControl  │    │ Metrics     │    │ Error       │
│ .create()   │    │ .record()   │    │ Handler     │
└─────────────┘    └─────────────┘    └─────────────┘
      │                  │                  │
      └──────────────────┴──────────────────┘
                         │
                         ▼
                  ┌─────────────┐
                  │ Reconcile   │
                  │ & Complete  │
                  └─────────────┘

Key Findings

1. Audit Trail & Lineage

  • AuditTrail: Implements robust tracking of run_id across all pipeline stages.
  • DuplicateDetector: Provides idempotency by tracking seen records and preventing double-processing.
  • Publisher: Supports automated publishing of audit records to BigQuery for centralized monitoring.

2. Sophisticated Error Handling

  • ErrorClassifier: Categorizes exceptions into:
    • Validation: Data errors (no retry).
    • Integration: Connection/API errors (retry with backoff).
    • Resource: Quota/Rate limit errors (exponential backoff).
  • RetryPolicy: Configurable backoff multipliers, jitter, and maximum retry attempts.

3. Job Control

  • JobControlRepository: Centralized state management for pipeline executions.
  • State Tracking: Granular tracking of failure stages, start/end times, and record counts.

4. Structured Logging

  • Standardized JSON logging with automated context injection (run_id, systapplication1_id).
  • Optimized for Cloud Logging and BigQuery ingestion.

5. FinOps & Cost Tracking

  • Cost Estimation: Automated cost estimation for BigQuery (Query/Load), GCS (Storage/Upload), and Pub/Sub (Publishing).
  • FinOpsLabels: Standardized GCP resource labeling for precise cost allocation.
  • Monitoring Integration: Seamless integration with MigrationMetrics for real-time cost visibility in audit logs.
  • Trackers:
    • BigQueryCostTracker: Estimates costs based on bytes billed and slot usage.
    • CloudStorageCostTracker: Estimates storage costs and upload fees.
    • PubSubCostTracker: Estimates throughput costs with 1KB minimum billing awareness.
  • Decorators: @track_bq_cost for automated tracking of BigQuery jobs.

Governance & Compliance

  • Zero-Bleed Policy: This library MUST NOT import apache_beam or airflow.
  • Portability: Must remain compatible with any Python environment (Cloud Functions, Cloud Run, local scripts, etc.).
  • Testing: All new features require unit tests in tests/unit/.

Usage

from gcp_pipeline_core.audit import AuditTrail, ReconciliationEngine
from gcp_pipeline_core.monitoring import MetricsCollector
from gcp_pipeline_core.utilities import configure_structured_logging, generate_run_id
from gcp_pipeline_core.schema import EntitySchema, SchemaField
from gcp_pipeline_core.job_control import JobControlRepository
from gcp_pipeline_core.error_handling import ErrorHandler
from gcp_pipeline_core.finops import BigQueryCostTracker, FinOpsLabels, track_bq_cost

Tests

PYTHONPATH=src python -m pytest tests/unit/ -v
# 208 passed

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pipeline_core-1.0.1.tar.gz (57.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcp_pipeline_core-1.0.1-py3-none-any.whl (79.2 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pipeline_core-1.0.1.tar.gz.

File metadata

  • Download URL: gcp_pipeline_core-1.0.1.tar.gz
  • Upload date:
  • Size: 57.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for gcp_pipeline_core-1.0.1.tar.gz
Algorithm Hash digest
SHA256 ea23dc988d58a4b9a245c98babc0063e69ffa389427f223af34b2277add9c507
MD5 6a6719f53851531e5df2d7163213da03
BLAKE2b-256 1131d5b0224450e37843a2659fea17687ccd4e109ff1f67f4e64a8d39badde23

See more details on using hashes here.

File details

Details for the file gcp_pipeline_core-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for gcp_pipeline_core-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ed9180e310f8c001823c04279a423cd329d442ae099bf47c99656f426efc0814
MD5 b060c2203e6b327beff1dc7512e1200c
BLAKE2b-256 c14a4d1a8d3ccd90946e71c9b2a6c1e188638e9b15cd1fa8129063e7b3c73848

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page