Skip to main content

Core library for GCP data pipelines

Project description

gcp-pipeline-core

Foundation library - audit, monitoring, error handling, job control.

NO Apache Beam or Airflow dependencies.


Architecture

                         GCP-PIPELINE-CORE
                         ─────────────────

  ┌─────────────────────────────────────────────────────────────────┐
  │                     FOUNDATION LAYER                             │
  │                                                                  │
  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
  │  │   Audit     │  │  Monitoring │  │   Error     │              │
  │  │   Trail     │  │   Metrics   │  │  Handling   │              │
  │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘              │
  │         │                │                │                      │
  │         ▼                ▼                ▼                      │
  │  ┌─────────────────────────────────────────────────────────┐    │
  │  │                  Utilities Layer                         │    │
  │  │  • Structured Logging (JSON)                             │    │
  │  │  • Run ID Generation                                     │    │
  │  │  • Configuration Management                              │    │
  │  └─────────────────────────────────────────────────────────┘    │
  │         │                │                │                      │
  │         ▼                ▼                ▼                      │
  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
  │  │   Clients   │  │ Job Control │  │   Schema    │              │
  │  │ GCS/BQ/PS   │  │  Repository │  │ Definitions │              │
  │  └─────────────┘  └─────────────┘  └─────────────┘              │
  │                                                                  │
  └─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
              Used by: gcp-pipeline-beam, gcp-pipeline-orchestration

Modules

Module Purpose Key Classes
audit/ Lineage tracking, reconciliation AuditTrail, ReconciliationEngine
monitoring/ Metrics, OTEL/Dynatrace MetricsCollector, OTELExporter
finops/ Cost tracking and labeling BigQueryCostTracker, FinOpsLabels
error_handling/ Error classification, retry ErrorHandler, RetryPolicy
job_control/ Pipeline status tracking JobControlRepository, PipelineJob
clients/ GCP service wrappers GCSClient, BigQueryClient, PubSubClient
utilities/ Logging, run ID configure_structured_logging, generate_run_id
data_quality/ Quality checks validate_row_types, check_duplicate_keys
schema.py Entity definitions EntitySchema, SchemaField

Component Flow

Pipeline Start
      │
      ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ generate_   │───►│ AuditTrail  │───►│ Structured  │
│ run_id()    │    │ .record_processing_start()    │    │ Logging     │
└─────────────┘    └─────────────┘    └─────────────┘
      │                  │                  │
      ▼                  ▼                  ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ JobControl  │    │ Metrics     │    │ Error       │
│ .create()   │    │ .record()   │    │ Handler     │
└─────────────┘    └─────────────┘    └─────────────┘
      │                  │                  │
      └──────────────────┴──────────────────┘
                         │
                         ▼
                  ┌─────────────┐
                  │ Reconcile   │
                  │ & Complete  │
                  └─────────────┘

Key Findings

1. Audit Trail & Lineage

  • AuditTrail: Implements robust tracking of run_id across all pipeline stages.
  • DuplicateDetector: Provides idempotency by tracking seen records and preventing double-processing.
  • Publisher: Supports automated publishing of audit records to BigQuery for centralized monitoring.

2. Sophisticated Error Handling

  • ErrorClassifier: Categorizes exceptions into:
    • Validation: Data errors (no retry).
    • Integration: Connection/API errors (retry with backoff).
    • Resource: Quota/Rate limit errors (exponential backoff).
  • RetryPolicy: Configurable backoff multipliers, jitter, and maximum retry attempts.

3. Job Control

  • JobControlRepository: Centralized state management for pipeline executions.
  • State Tracking: Granular tracking of failure stages, start/end times, and record counts.

4. Structured Logging

  • Standardized JSON logging with automated context injection (run_id, system_id).
  • Optimized for Cloud Logging and BigQuery ingestion.

5. FinOps & Cost Tracking

  • Cost Estimation: Automated cost estimation for BigQuery (Query/Load), GCS (Storage/Upload), and Pub/Sub (Publishing).
  • FinOpsLabels: Standardized GCP resource labeling for precise cost allocation.
  • Monitoring Integration: Seamless integration with MigrationMetrics for real-time cost visibility in audit logs.
  • Trackers:
    • BigQueryCostTracker: Estimates costs based on bytes billed and slot usage.
    • CloudStorageCostTracker: Estimates storage costs and upload fees.
    • PubSubCostTracker: Estimates throughput costs with 1KB minimum billing awareness.
  • Decorators: @track_bq_cost for automated tracking of BigQuery jobs.

Governance & Compliance

  • Zero-Bleed Policy: This library MUST NOT import apache_beam or airflow.
  • Portability: Must remain compatible with any Python environment (Cloud Functions, Cloud Run, local scripts, etc.).
  • Testing: All new features require unit tests in tests/unit/.

Usage

from gcp_pipeline_core.audit import AuditTrail, ReconciliationEngine
from gcp_pipeline_core.monitoring import MetricsCollector
from gcp_pipeline_core.utilities import configure_structured_logging, generate_run_id
from gcp_pipeline_core.schema import EntitySchema, SchemaField
from gcp_pipeline_core.job_control import JobControlRepository
from gcp_pipeline_core.error_handling import ErrorHandler
from gcp_pipeline_core.finops import BigQueryCostTracker, FinOpsLabels, track_bq_cost

Tests

python3.11 -m pytest tests/ -v
# 256 passed

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pipeline_core-1.0.19.tar.gz (64.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcp_pipeline_core-1.0.19-py3-none-any.whl (88.7 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pipeline_core-1.0.19.tar.gz.

File metadata

  • Download URL: gcp_pipeline_core-1.0.19.tar.gz
  • Upload date:
  • Size: 64.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for gcp_pipeline_core-1.0.19.tar.gz
Algorithm Hash digest
SHA256 6e40e4bad47269a66d8801831b47d7007a7cd5491f3395fd82fe451ab98e95a6
MD5 bb433fc5538d763292936288cd93a36c
BLAKE2b-256 68768c7da56f2a1ed4bdd5fc8554226b007e0da0262196757328186b6e73e4ae

See more details on using hashes here.

File details

Details for the file gcp_pipeline_core-1.0.19-py3-none-any.whl.

File metadata

File hashes

Hashes for gcp_pipeline_core-1.0.19-py3-none-any.whl
Algorithm Hash digest
SHA256 85d7cf32d7ac678a967473c2903d61e0b6df4557d207ab70e9873c08be55ead4
MD5 3b32a236185d819ad7fafb7be98c13ee
BLAKE2b-256 3a60a7d1d9962a61bd2055915adfb3e5a8d7ce4d7954815c1c5e69f42ad052f4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page