Skip to main content

Testing framework for GCP data pipelines with mocks, fixtures, and assertions

Project description

GCP Pipeline Tester

A comprehensive testing framework for GCP data pipelines, providing mocks, fixtures, builders, and assertions for testing BigQuery, GCS, Pub/Sub, and Dataflow pipelines.

๐Ÿ“– Part of the GCP Pipeline Framework This library provides testing utilities for pipelines built with gcp-pipeline-core and gcp-pipeline-beam.


Table of Contents


Features

  • Base Test Classes: BasePipelineTest, BaseBeamTest, BaseValidationTest - foundational test classes with common assertions
  • Mocks: Mock implementations for GCS, BigQuery, Pub/Sub - test without real GCP connectivity
  • Fixtures: Ready-to-use pytest fixtures for sample data and GCP services
  • Builders: Fluent builders for constructing test records and configurations
  • Assertions: Domain-specific assertion functions for pipeline testing
  • Comparison: Dual-run comparison utilities for migration validation
  • BDD: Behavior-driven development support with Gherkin step definitions

Installation

# Install from source
cd gcp-pipeline-libraries/gcp-pipeline-tester
pip install -e .

# Or with dev dependencies
pip install -e ".[dev]"

Quick Start

Basic Test with Base Class

from gcp_pipeline_tester import BasePipelineTest

class TestMyPipeline(BasePipelineTest):
    def test_record_has_required_fields(self):
        record = {"id": "1", "name": "John", "email": "john@example.com"}
        
        self.assertFieldExists(record, "id")
        self.assertFieldValue(record, "name", "John")
        self.assertRecordStructure(record, ["id", "name", "email"])

Validation Testing

from gcp_pipeline_tester import BaseValidationTest

class TestMyValidator(BaseValidationTest):
    def test_validation_passes(self):
        errors = my_validator.validate({"id": "1", "name": "John"})
        self.assertValidationPassed(errors)
    
    def test_validation_fails_for_missing_field(self):
        errors = my_validator.validate({})
        self.assertValidationFailed(errors)
        self.assertValidationError(errors, "id", "required")

Using Mocks

from gcp_pipeline_tester.mocks import GCSClientMock, BigQueryClientMock

class TestWithMocks(BasePipelineTest):
    def test_gcs_operations(self):
        gcs_mock = GCSClientMock()
        gcs_mock.write_file("gs://bucket/input.csv", "id,name\n1,John")

        files = gcs_mock.get_written_files()
        self.assertIn("gs://bucket/input.csv", files)

    def test_bigquery_operations(self):
        bq_mock = BigQueryClientMock()

        errors = bq_mock.insert_rows_json(
            "project.dataset.table",
            [{"id": "1", "name": "John"}]
        )

        self.assertEqual(errors, [])
        self.assertEqual(len(bq_mock.get_inserted_rows()), 1)

Base Test Classes

BasePipelineTest

Root test class with record assertions.

from gcp_pipeline_tester import BasePipelineTest

class TestMyModule(BasePipelineTest):
    def test_field_exists(self):
        record = {'id': '123', 'name': 'John'}
        self.assertFieldExists(record, 'id')
        self.assertFieldValue(record, 'name', 'John')
        self.assertRecordStructure(record, ['id', 'name'])

Available Assertions:

  • assertFieldExists(record, field) - Assert field exists in record
  • assertFieldValue(record, field, expected) - Assert field has expected value
  • assertRecordStructure(record, expected_fields) - Assert record has all expected fields

BaseValidationTest

Test class for validation logic.

from gcp_pipeline_tester import BaseValidationTest

class TestValidators(BaseValidationTest):
    def test_valid_ssn(self):
        errors = validate_ssn("123-45-6789")
        self.assertValidationPassed(errors)
    
    def test_invalid_ssn(self):
        errors = validate_ssn("invalid")
        self.assertValidationFailed(errors)
        self.assertValidationError(errors, "ssn")

Available Assertions:

  • assertValidationPassed(errors) - Assert no validation errors
  • assertValidationFailed(errors) - Assert validation failed
  • assertValidationError(errors, field, message=None) - Assert specific validation error

BaseBeamTest

Test class for Apache Beam pipelines.

from gcp_pipeline_tester import BaseBeamTest
import apache_beam as beam

class TestBeamTransforms(BaseBeamTest):
    def test_parse_csv(self):
        pipeline = self.create_test_pipeline()
        
        result = (pipeline
            | beam.Create(['"id","name"\n"1","John"'])
            | beam.ParDo(RobustCsvParseDoFn(['id', 'name']))
        )
        
        self.assert_pcollection_contains(result, {'id': '1', 'name': 'John'})

Mocks

GCSClientMock

Mock GCS client for file operations without real GCP connectivity.

from gcp_pipeline_tester.mocks import GCSClientMock

def test_gcs_operations():
    mock = GCSClientMock()

    # Write file
    mock.write_file("gs://bucket/file.csv", "id,name\n1,John")

    # Get all written files
    files = mock.get_written_files()
    assert "gs://bucket/file.csv" in files
    assert files["gs://bucket/file.csv"] == "id,name\n1,John"

BigQueryClientMock

Mock BigQuery client for table operations.

from gcp_pipeline_tester.mocks import BigQueryClientMock

def test_bigquery_operations():
    mock = BigQueryClientMock()
    
    # Insert rows
    errors = mock.insert_rows_json(
        "project.dataset.table",
        [{"id": "1", "name": "John"}, {"id": "2", "name": "Jane"}]
    )
    assert errors == []
    
    # Get inserted rows
    rows = mock.get_inserted_rows()
    assert len(rows) == 2
    
    # Query (returns inserted rows)
    results = mock.query("SELECT * FROM dataset.table")

PubSubClientMock

Mock Pub/Sub client for messaging.

from gcp_pipeline_tester.mocks import PubSubClientMock

def test_pubsub_operations():
    mock = PubSubClientMock()
    
    # Publish message
    mock.publish("projects/proj/topics/topic", {"event": "file_ready"})
    
    # Get published messages
    messages = mock.get_published_messages()
    assert len(messages) == 1
    
    # Pull messages
    pulled = mock.pull("projects/proj/subscriptions/sub")

Builders

RecordBuilder

Fluent builder for test records.

from gcp_pipeline_tester.builders import RecordBuilder

record = (RecordBuilder()
    .with_field("id", "123")
    .with_field("name", "John")
    .with_field("email", "john@example.com")
    .with_field("amount", 45.67)
    .build())

assert record == {"id": "123", "name": "John", "email": "john@example.com", "amount": 45.67}

CSVRecordBuilder

Builder for CSV records (all values converted to strings).

from gcp_pipeline_tester.builders import CSVRecordBuilder

record = (CSVRecordBuilder(["id", "name", "amount"])
    .with_field("id", 123)
    .with_field("name", "John")
    .with_field("amount", 45.67)
    .build())

# All values are strings for CSV
assert record == {"id": "123", "name": "John", "amount": "45.67"}

PipelineConfigBuilder

Builder for pipeline configurations.

from gcp_pipeline_tester.builders import PipelineConfigBuilder

config = (PipelineConfigBuilder()
    .with_pipeline_name("test_pipeline")
    .with_run_id("run_001")
    .with_source_file("gs://bucket/input.csv")
    .build())

Fixtures

Ready-to-use pytest fixtures for common testing scenarios.

import pytest
from gcp_pipeline_tester.fixtures import sample_records, sample_config_dict

def test_with_sample_records(sample_records):
    """Use pre-generated sample records."""
    assert len(sample_records) > 0
    assert 'id' in sample_records[0]

def test_with_config(sample_config_dict):
    """Use sample pipeline config dictionary."""
    assert sample_config_dict['pipeline_name'] == 'test_pipeline'

Available Fixtures

Fixture Description
sample_records List of sample record dictionaries
sample_config_dict Sample PipelineConfig dictionary
gcs_client_mock GCSClientMock instance
bq_client_mock BigQueryClientMock instance
test_pipeline Beam test pipeline

Assertions

Domain-specific assertion functions for various testing levels.

Record Assertions

Location: gcp_pipeline_tester.assertions.record_assertions

Assertions for validating individual record dictionaries.

Assertion Description
assert_field_exists(record, field) Ensure field is present
assert_field_value(record, field, value) Check specific field value
assert_record_valid(record, schema) Validate record against schema
assert_field_type(record, field, expected_type) Check field value type
assert_field_not_empty(record, field) Check field is not empty

Beam Assertions

Location: gcp_pipeline_tester.assertions.beam_assertions

Assertions for validating Apache Beam PCollection contents.

from gcp_pipeline_tester.assertions import assert_pcollection_equal, assert_record_structure

# Verify PCollection matches expected elements
assert_pcollection_equal(pcollection, expected_elements)

# Verify record has expected structure
assert_record_structure(record, expected_fields=["id", "name", "amount"])

Pipeline Assertions

Location: gcp_pipeline_tester.assertions.pipeline_assertions

Assertions for high-level pipeline execution status.

Assertion Description
assert_pipeline_success(audit_record) Verify success flag in audit
assert_no_errors(error_handler) Ensure no errors were recorded
assert_metrics_recorded(metrics) Verify metrics were emitted
assert_audit_trail_complete(audit) Check for required audit fields
assert_pipeline_error_count(errors, count) Verify expected number of errors

Dual-Run Comparison

Compare source (mainframe CSV) with target (BigQuery) for migration validation.

from gcp_pipeline_tester.comparison import DualRunComparison

comparison = DualRunComparison(
    project_id="my-project",
    source_file="mainframe_output.csv",
    target_table="project:dataset.table",
    job_name="customer_migration",
    tolerance_percent=1.0,  # Allow 1% difference
)

# Run comparison
report = comparison.compare()

# Check results
print(report.summary())
assert report.overall_status == "PASS"

Comparison Checks

Check Description
row_count Compare record counts
column_match Verify all columns present
data_hash Compare data checksums
sample_validation Validate sample records

BDD Testing

Behavior-driven development support with Gherkin step definitions.

Data Quality Steps

Location: gcp_pipeline_tester.bdd.steps.dq_steps

Pre-built steps for testing data quality rules.

Given a record with ssn value "123-45-6789"
When I run the data quality validation
Then the record should be marked as valid

Pipeline Steps

Location: gcp_pipeline_tester.bdd.steps.pipeline_steps

Steps for testing end-to-end pipeline execution.

Given a pipeline for entity "customers"
And a source file "gs://bucket/customers.csv"
When I execute the pipeline
Then the pipeline should complete successfully
And the target table should contain 100 records

Writing Feature Files

# features/validation.feature
Feature: SSN Validation
  
  Scenario: Valid SSN passes validation
    Given I have a record with SSN "123-45-6789"
    When I validate the SSN
    Then validation should pass
  
  Scenario: Invalid SSN fails validation
    Given I have a record with SSN "invalid"
    When I validate the SSN
    Then validation should fail with error "Invalid SSN format"

Implementing Steps

from gcp_pipeline_tester.bdd import PipelineScenarioTest
from pytest_bdd import given, when, then, scenario

class TestSSNValidation(PipelineScenarioTest):
    
    @scenario('features/validation.feature', 'Valid SSN passes validation')
    def test_valid_ssn(self):
        pass
    
    @given('I have a record with SSN "<ssn>"')
    def given_ssn(self, ssn):
        self.context['ssn'] = ssn
    
    @when('I validate the SSN')
    def when_validate(self):
        self.context['errors'] = validate_ssn(self.context['ssn'])
    
    @then('validation should pass')
    def then_pass(self):
        assert len(self.context['errors']) == 0

Running Tests

# Run all tests
./run_tests.sh

# Or manually
PYTHONPATH=src pytest tests/unit -v

# With coverage
PYTHONPATH=src pytest tests/unit -v --cov=src/gcp_pipeline_tester --cov-report=html

Key Findings

1. Robust Mocking Infrastructure

  • Comprehensive Mocks: Provides high-fidelity mock implementations for GCS, BigQuery, and Pub/Sub.
  • Stateless Testing: Ensures that unit tests across the entire monorepo can run in isolated CI environments without requiring live GCP credentials or connectivity.

2. Standardized Base Classes

  • Foundational Support: Includes BasePipelineTest, BaseBeamTest, and BaseValidationTest to enforce consistent testing patterns and provide common assertions.

3. BDD-Style Integration Testing

  • Complex Scenarios: Supports Behavior-Driven Development (BDD) using Gherkin-style steps.
  • End-to-End Validation: Facilitates testing of multi-stage pipelines (e.g., Discovery -> Ingestion -> Transformation) in a single scenario.

4. Dual-Run Comparison

  • Migration Verification: Specialized utilities to compare outputs from legacy systems against GCP-processed results, ensuring parity.

Governance & Compliance

  • Unified Strategy: Integrated for a unified release and tagging strategy (libs-1.0.x).
  • Standardized Mocking: Developers MUST use tester mocks instead of custom unittest.mock.Mock objects for GCP services to ensure consistency.
  • BDD Expansion: Encouraged use of BDD scenarios for any new multi-stage orchestration or processing logic.

Project Structure

gcp-pipeline-tester/
โ”œโ”€โ”€ README.md                    # This file
โ”œโ”€โ”€ pyproject.toml               # Package configuration
โ”œโ”€โ”€ pytest.ini                   # Test configuration
โ”œโ”€โ”€ run_tests.sh                 # Test runner script
โ”œโ”€โ”€ src/
โ”‚   โ””โ”€โ”€ gcp_pipeline_tester/
โ”‚       โ”œโ”€โ”€ __init__.py          # Main exports
โ”‚       โ”œโ”€โ”€ base/                # Base test classes
โ”‚       โ”‚   โ”œโ”€โ”€ pipeline_test.py # BasePipelineTest
โ”‚       โ”‚   โ”œโ”€โ”€ beam_test.py     # BaseBeamTest
โ”‚       โ”‚   โ”œโ”€โ”€ validation_test.py
โ”‚       โ”‚   โ””โ”€โ”€ result.py        # TestResult dataclass
โ”‚       โ”œโ”€โ”€ mocks/               # GCP service mocks
โ”‚       โ”‚   โ”œโ”€โ”€ gcs_mock.py
โ”‚       โ”‚   โ”œโ”€โ”€ bigquery_mock.py
โ”‚       โ”‚   โ””โ”€โ”€ pubsub_mock.py
โ”‚       โ”œโ”€โ”€ fixtures/            # Pytest fixtures
โ”‚       โ”‚   โ”œโ”€โ”€ common.py
โ”‚       โ”‚   โ”œโ”€โ”€ gcs.py
โ”‚       โ”‚   โ”œโ”€โ”€ bigquery.py
โ”‚       โ”‚   โ””โ”€โ”€ beam.py
โ”‚       โ”œโ”€โ”€ builders/            # Test data builders
โ”‚       โ”‚   โ”œโ”€โ”€ record_builder.py
โ”‚       โ”‚   โ”œโ”€โ”€ csv_builder.py
โ”‚       โ”‚   โ””โ”€โ”€ config_builder.py
โ”‚       โ”œโ”€โ”€ comparison/          # Dual-run comparison
โ”‚       โ”‚   โ””โ”€โ”€ dual_run.py
โ”‚       โ”œโ”€โ”€ assertions/          # Custom assertions
โ”‚       โ””โ”€โ”€ bdd/                 # BDD step definitions
โ””โ”€โ”€ tests/
    โ””โ”€โ”€ unit/                    # Unit tests

API Reference

Base Classes

Class Description
BasePipelineTest Root test class with record assertions
BaseValidationTest Test class for validation logic
BaseBeamTest Test class for Apache Beam pipelines
TestResult Standardized test result dataclass

Mocks

Mock Description
GCSClientMock Mock GCS client for file operations
GCSBucketMock Mock GCS bucket
BigQueryClientMock Mock BigQuery client
BigQueryTableMock Mock BigQuery table
PubSubClientMock Mock Pub/Sub client

Builders

Builder Description
RecordBuilder Fluent builder for test records
CSVRecordBuilder Builder for CSV records (converts to strings)
PipelineConfigBuilder Builder for pipeline configurations

Comparison

Class Description
DualRunComparison Compare source vs target for migration validation
ComparisonResult Single comparison check result
ComparisonReport Complete comparison report

Dependencies

  • Python 3.9+
  • pytest >= 7.0.0
  • google-cloud-bigquery >= 3.0.0
  • google-cloud-storage >= 2.0.0
  • google-cloud-pubsub >= 2.0.0

Related Libraries


License

This library is part of the Legacy Migration Reference project.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pipeline_tester-1.0.29.tar.gz (33.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcp_pipeline_tester-1.0.29-py3-none-any.whl (38.4 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pipeline_tester-1.0.29.tar.gz.

File metadata

  • Download URL: gcp_pipeline_tester-1.0.29.tar.gz
  • Upload date:
  • Size: 33.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for gcp_pipeline_tester-1.0.29.tar.gz
Algorithm Hash digest
SHA256 7600986194feb4992a234091dad896364d33ff6dd32e664faa513b4b28792de2
MD5 f1e79878482f397bcb3d97613a429d56
BLAKE2b-256 68fac5518b8edd25120d3dc771736656f67a683a07ae6b5fa922345c21de57a2

See more details on using hashes here.

File details

Details for the file gcp_pipeline_tester-1.0.29-py3-none-any.whl.

File metadata

File hashes

Hashes for gcp_pipeline_tester-1.0.29-py3-none-any.whl
Algorithm Hash digest
SHA256 480c7d5e032ae96bcedb8a01b5ed90e1e7d5e5212f9700bf6534f64273be387e
MD5 145aa92c0695dd8896895418e633d3c8
BLAKE2b-256 df2a4885df0e202d35b019f305b542fc11b49970c4e7a9e78fb6e90e70044ca1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page