Skip to main content

Core library for GCP data pipelines

Project description

gcp-pipeline-core

Foundation library - audit, monitoring, error handling, job control.

NO Apache Beam or Airflow dependencies.


Architecture

                         GCP-PIPELINE-CORE
                         ─────────────────

  ┌─────────────────────────────────────────────────────────────────┐
  │                     FOUNDATION LAYER                             │
  │                                                                  │
  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
  │  │   Audit     │  │  Monitoring │  │   Error     │              │
  │  │   Trail     │  │   Metrics   │  │  Handling   │              │
  │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘              │
  │         │                │                │                      │
  │         ▼                ▼                ▼                      │
  │  ┌─────────────────────────────────────────────────────────┐    │
  │  │                  Utilities Layer                         │    │
  │  │  • Structured Logging (JSON)                             │    │
  │  │  • Run ID Generation                                     │    │
  │  │  • Configuration Management                              │    │
  │  └─────────────────────────────────────────────────────────┘    │
  │         │                │                │                      │
  │         ▼                ▼                ▼                      │
  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
  │  │   Clients   │  │ Job Control │  │   Schema    │              │
  │  │ GCS/BQ/PS   │  │  Repository │  │ Definitions │              │
  │  └─────────────┘  └─────────────┘  └─────────────┘              │
  │                                                                  │
  └─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
              Used by: gcp-pipeline-beam, gcp-pipeline-orchestration

Modules

Module Purpose Key Classes
audit/ Lineage tracking, reconciliation AuditTrail, ReconciliationEngine
monitoring/ Metrics, OTEL/Dynatrace MetricsCollector, OTELExporter
finops/ Cost tracking and labeling BigQueryCostTracker, FinOpsLabels
error_handling/ Error classification, retry ErrorHandler, RetryPolicy
job_control/ Pipeline status tracking JobControlRepository, PipelineJob
clients/ GCP service wrappers GCSClient, BigQueryClient, PubSubClient
utilities/ Logging, run ID configure_structured_logging, generate_run_id
data_quality/ Quality checks validate_row_types, check_duplicate_keys
schema.py Entity definitions EntitySchema, SchemaField

Component Flow

Pipeline Start
      │
      ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ generate_   │───►│ AuditTrail  │───►│ Structured  │
│ run_id()    │    │ .record_processing_start()    │    │ Logging     │
└─────────────┘    └─────────────┘    └─────────────┘
      │                  │                  │
      ▼                  ▼                  ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ JobControl  │    │ Metrics     │    │ Error       │
│ .create()   │    │ .record()   │    │ Handler     │
└─────────────┘    └─────────────┘    └─────────────┘
      │                  │                  │
      └──────────────────┴──────────────────┘
                         │
                         ▼
                  ┌─────────────┐
                  │ Reconcile   │
                  │ & Complete  │
                  └─────────────┘

Key Findings

1. Audit Trail & Lineage

  • AuditTrail: Implements robust tracking of run_id across all pipeline stages.
  • DuplicateDetector: Provides idempotency by tracking seen records and preventing double-processing.
  • Publisher: Supports automated publishing of audit records to BigQuery for centralized monitoring.

2. Sophisticated Error Handling

  • ErrorClassifier: Categorizes exceptions into:
    • Validation: Data errors (no retry).
    • Integration: Connection/API errors (retry with backoff).
    • Resource: Quota/Rate limit errors (exponential backoff).
  • RetryPolicy: Configurable backoff multipliers, jitter, and maximum retry attempts.

3. Job Control

  • JobControlRepository: Centralized state management for pipeline executions.
  • State Tracking: Granular tracking of failure stages, start/end times, and record counts.

4. Structured Logging

  • Standardized JSON logging with automated context injection (run_id, system_id).
  • Optimized for Cloud Logging and BigQuery ingestion.

5. FinOps & Cost Tracking

  • Cost Estimation: Automated cost estimation for BigQuery (Query/Load), GCS (Storage/Upload), and Pub/Sub (Publishing).
  • FinOpsLabels: Standardized GCP resource labeling for precise cost allocation.
  • Monitoring Integration: Seamless integration with MigrationMetrics for real-time cost visibility in audit logs.
  • Trackers:
    • BigQueryCostTracker: Estimates costs based on bytes billed and slot usage.
    • CloudStorageCostTracker: Estimates storage costs and upload fees.
    • PubSubCostTracker: Estimates throughput costs with 1KB minimum billing awareness.
  • Decorators: @track_bq_cost for automated tracking of BigQuery jobs.

Governance & Compliance

  • Zero-Bleed Policy: This library MUST NOT import apache_beam or airflow.
  • Portability: Must remain compatible with any Python environment (Cloud Functions, Cloud Run, local scripts, etc.).
  • Testing: All new features require unit tests in tests/unit/.

Usage

from gcp_pipeline_core.audit import AuditTrail, ReconciliationEngine
from gcp_pipeline_core.monitoring import MetricsCollector
from gcp_pipeline_core.utilities import configure_structured_logging, generate_run_id
from gcp_pipeline_core.schema import EntitySchema, SchemaField
from gcp_pipeline_core.job_control import JobControlRepository
from gcp_pipeline_core.error_handling import ErrorHandler
from gcp_pipeline_core.finops import BigQueryCostTracker, FinOpsLabels, track_bq_cost

Tests

python3.11 -m pytest tests/ -v
# 256 passed

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pipeline_core-1.0.20.tar.gz (64.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcp_pipeline_core-1.0.20-py3-none-any.whl (88.7 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pipeline_core-1.0.20.tar.gz.

File metadata

  • Download URL: gcp_pipeline_core-1.0.20.tar.gz
  • Upload date:
  • Size: 64.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for gcp_pipeline_core-1.0.20.tar.gz
Algorithm Hash digest
SHA256 6277af1615f8e248b9dc272d5f46d0846ccf8c03be39b43844677f074d50722d
MD5 feeae06fa6388989a38b0e2b23d51b30
BLAKE2b-256 e49184e6aa621c2f310f914892c1cb3d2558d5538babf2c82ab458071898a0d0

See more details on using hashes here.

File details

Details for the file gcp_pipeline_core-1.0.20-py3-none-any.whl.

File metadata

File hashes

Hashes for gcp_pipeline_core-1.0.20-py3-none-any.whl
Algorithm Hash digest
SHA256 fcf4a0c5dbd9c929855d1601d459553eeef58d49546feae0b25d1bdc81edab67
MD5 bf915a139a3b48f9a567a4f2a883f642
BLAKE2b-256 20fce041b6be36fa8116adda2cf912731f159507bc37893a096dfc724b3cfca8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page