Skip to main content

Core library for GCP data pipelines

Project description

gcp-pipeline-core

Foundation library - audit, monitoring, error handling, job control.

NO Apache Beam or Airflow dependencies.


Architecture

                         GCP-PIPELINE-CORE
                         ─────────────────

  ┌─────────────────────────────────────────────────────────────────┐
  │                     FOUNDATION LAYER                             │
  │                                                                  │
  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
  │  │   Audit     │  │  Monitoring │  │   Error     │              │
  │  │   Trail     │  │   Metrics   │  │  Handling   │              │
  │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘              │
  │         │                │                │                      │
  │         ▼                ▼                ▼                      │
  │  ┌─────────────────────────────────────────────────────────┐    │
  │  │                  Utilities Layer                         │    │
  │  │  • Structured Logging (JSON)                             │    │
  │  │  • Run ID Generation                                     │    │
  │  │  • Configuration Management                              │    │
  │  └─────────────────────────────────────────────────────────┘    │
  │         │                │                │                      │
  │         ▼                ▼                ▼                      │
  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
  │  │   Clients   │  │ Job Control │  │   Schema    │              │
  │  │ GCS/BQ/PS   │  │  Repository │  │ Definitions │              │
  │  └─────────────┘  └─────────────┘  └─────────────┘              │
  │                                                                  │
  └─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
              Used by: gcp-pipeline-beam, gcp-pipeline-orchestration

Modules

Module Purpose Key Classes
audit/ Lineage tracking, reconciliation AuditTrail, ReconciliationEngine
monitoring/ Metrics, OTEL/Dynatrace MetricsCollector, OTELExporter
finops/ Cost tracking and labeling BigQueryCostTracker, FinOpsLabels
error_handling/ Error classification, retry ErrorHandler, RetryPolicy
job_control/ Pipeline status tracking JobControlRepository, PipelineJob
clients/ GCP service wrappers GCSClient, BigQueryClient, PubSubClient
utilities/ Logging, run ID configure_structured_logging, generate_run_id
data_quality/ Quality checks validate_row_types, check_duplicate_keys
schema.py Entity definitions EntitySchema, SchemaField

Component Flow

Pipeline Start
      │
      ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ generate_   │───►│ AuditTrail  │───►│ Structured  │
│ run_id()    │    │ .start()    │    │ Logging     │
└─────────────┘    └─────────────┘    └─────────────┘
      │                  │                  │
      ▼                  ▼                  ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ JobControl  │    │ Metrics     │    │ Error       │
│ .create()   │    │ .record()   │    │ Handler     │
└─────────────┘    └─────────────┘    └─────────────┘
      │                  │                  │
      └──────────────────┴──────────────────┘
                         │
                         ▼
                  ┌─────────────┐
                  │ Reconcile   │
                  │ & Complete  │
                  └─────────────┘

Key Findings

1. Audit Trail & Lineage

  • AuditTrail: Implements robust tracking of run_id across all pipeline stages.
  • DuplicateDetector: Provides idempotency by tracking seen records and preventing double-processing.
  • Publisher: Supports automated publishing of audit records to BigQuery for centralized monitoring.

2. Sophisticated Error Handling

  • ErrorClassifier: Categorizes exceptions into:
    • Validation: Data errors (no retry).
    • Integration: Connection/API errors (retry with backoff).
    • Resource: Quota/Rate limit errors (exponential backoff).
  • RetryPolicy: Configurable backoff multipliers, jitter, and maximum retry attempts.

3. Job Control

  • JobControlRepository: Centralized state management for pipeline executions.
  • State Tracking: Granular tracking of failure stages, start/end times, and record counts.

4. Structured Logging

  • Standardized JSON logging with automated context injection (run_id, systapplication1_id).
  • Optimized for Cloud Logging and BigQuery ingestion.

5. FinOps & Cost Tracking

  • Cost Estimation: Automated cost estimation for BigQuery (Query/Load), GCS (Storage/Upload), and Pub/Sub (Publishing).
  • FinOpsLabels: Standardized GCP resource labeling for precise cost allocation.
  • Monitoring Integration: Seamless integration with MigrationMetrics for real-time cost visibility in audit logs.
  • Trackers:
    • BigQueryCostTracker: Estimates costs based on bytes billed and slot usage.
    • CloudStorageCostTracker: Estimates storage costs and upload fees.
    • PubSubCostTracker: Estimates throughput costs with 1KB minimum billing awareness.
  • Decorators: @track_bq_cost for automated tracking of BigQuery jobs.

Governance & Compliance

  • Zero-Bleed Policy: This library MUST NOT import apache_beam or airflow.
  • Portability: Must remain compatible with any Python environment (Cloud Functions, Cloud Run, local scripts, etc.).
  • Testing: All new features require unit tests in tests/unit/.

Usage

from gcp_pipeline_core.audit import AuditTrail, ReconciliationEngine
from gcp_pipeline_core.monitoring import MetricsCollector
from gcp_pipeline_core.utilities import configure_structured_logging, generate_run_id
from gcp_pipeline_core.schema import EntitySchema, SchemaField
from gcp_pipeline_core.job_control import JobControlRepository
from gcp_pipeline_core.error_handling import ErrorHandler
from gcp_pipeline_core.finops import BigQueryCostTracker, FinOpsLabels, track_bq_cost

Tests

PYTHONPATH=src python -m pytest tests/unit/ -v
# 208 passed

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pipeline_core-1.0.2.tar.gz (60.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcp_pipeline_core-1.0.2-py3-none-any.whl (84.0 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pipeline_core-1.0.2.tar.gz.

File metadata

  • Download URL: gcp_pipeline_core-1.0.2.tar.gz
  • Upload date:
  • Size: 60.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for gcp_pipeline_core-1.0.2.tar.gz
Algorithm Hash digest
SHA256 0794518a91f9e54e0d9d16d687a8cb5718c4feac81ebdcc94abd7221ed07b2e6
MD5 5b867b88eed65aa93ad294f0a550b985
BLAKE2b-256 a493604a8ba07fdb5e3b268da963719d98cd6dacced2ef1dabd9ba21c599509a

See more details on using hashes here.

File details

Details for the file gcp_pipeline_core-1.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for gcp_pipeline_core-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 46ac7f90e8005b8eab0e7049def2ed1621596dc1bf00e40e32c518fed9437867
MD5 ff827f6d49eb7360aa385b9a06790998
BLAKE2b-256 2ec93b8c5364d1d773f472b6893850120406a28e3d8f429e180de2926a0c075e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page