Testing framework for GCP data pipelines with mocks, fixtures, and assertions
Project description
GCP Pipeline Tester
A comprehensive testing framework for GCP data pipelines, providing mocks, fixtures, builders, and assertions for testing BigQuery, GCS, Pub/Sub, and Dataflow pipelines.
๐ Part of the GCP Pipeline Framework This library provides testing utilities for pipelines built with
gcp-pipeline-core and gcp-pipeline-beam.
Table of Contents
- Features
- Installation
- Quick Start
- Base Test Classes
- Mocks
- Builders
- Fixtures
- Assertions
- Dual-Run Comparison
- BDD Testing
- Running Tests
- Project Structure
- API Reference
Features
- Base Test Classes:
BasePipelineTest,BaseBeamTest,BaseValidationTest- foundational test classes with common assertions - Mocks: Mock implementations for GCS, BigQuery, Pub/Sub - test without real GCP connectivity
- Fixtures: Ready-to-use pytest fixtures for sample data and GCP services
- Builders: Fluent builders for constructing test records and configurations
- Assertions: Domain-specific assertion functions for pipeline testing
- Comparison: Dual-run comparison utilities for migration validation
- BDD: Behavior-driven development support with Gherkin step definitions
Installation
# Install from source
cd gcp-pipeline-libraries/gcp-pipeline-tester
pip install -e .
# Or with dev dependencies
pip install -e ".[dev]"
Quick Start
Basic Test with Base Class
from gcp_pipeline_tester import BasePipelineTest
class TestMyPipeline(BasePipelineTest):
def test_record_has_required_fields(self):
record = {"id": "1", "name": "John", "email": "john@example.com"}
self.assertFieldExists(record, "id")
self.assertFieldValue(record, "name", "John")
self.assertRecordStructure(record, ["id", "name", "email"])
Validation Testing
from gcp_pipeline_tester import BaseValidationTest
class TestMyValidator(BaseValidationTest):
def test_validation_passes(self):
errors = my_validator.validate({"id": "1", "name": "John"})
self.assertValidationPassed(errors)
def test_validation_fails_for_missing_field(self):
errors = my_validator.validate({})
self.assertValidationFailed(errors)
self.assertValidationError(errors, "id", "required")
Using Mocks
from gcp_pipeline_tester.mocks import GCSClientMock, BigQueryClientMock
class TestWithMocks(BasePipelineTest):
def test_gcs_operations(self):
gcs_mock = GCSClientMock()
gcs_mock.write_file("gs://bucket/input.csv", "id,name\n1,John")
files = gcs_mock.get_written_files()
self.assertIn("gs://bucket/input.csv", files)
def test_bigquery_operations(self):
bq_mock = BigQueryClientMock()
errors = bq_mock.insert_rows_json(
"project.dataset.table",
[{"id": "1", "name": "John"}]
)
self.assertEqual(errors, [])
self.assertEqual(len(bq_mock.get_inserted_rows()), 1)
Base Test Classes
BasePipelineTest
Root test class with record assertions.
from gcp_pipeline_tester import BasePipelineTest
class TestMyModule(BasePipelineTest):
def test_field_exists(self):
record = {'id': '123', 'name': 'John'}
self.assertFieldExists(record, 'id')
self.assertFieldValue(record, 'name', 'John')
self.assertRecordStructure(record, ['id', 'name'])
Available Assertions:
assertFieldExists(record, field)- Assert field exists in recordassertFieldValue(record, field, expected)- Assert field has expected valueassertRecordStructure(record, expected_fields)- Assert record has all expected fields
BaseValidationTest
Test class for validation logic.
from gcp_pipeline_tester import BaseValidationTest
class TestValidators(BaseValidationTest):
def test_valid_ssn(self):
errors = validate_ssn("123-45-6789")
self.assertValidationPassed(errors)
def test_invalid_ssn(self):
errors = validate_ssn("invalid")
self.assertValidationFailed(errors)
self.assertValidationError(errors, "ssn")
Available Assertions:
assertValidationPassed(errors)- Assert no validation errorsassertValidationFailed(errors)- Assert validation failedassertValidationError(errors, field, message=None)- Assert specific validation error
BaseBeamTest
Test class for Apache Beam pipelines.
from gcp_pipeline_tester import BaseBeamTest
import apache_beam as beam
class TestBeamTransforms(BaseBeamTest):
def test_parse_csv(self):
pipeline = self.create_test_pipeline()
result = (pipeline
| beam.Create(['"id","name"\n"1","John"'])
| beam.ParDo(RobustCsvParseDoFn(['id', 'name']))
)
self.assert_pcollection_contains(result, {'id': '1', 'name': 'John'})
Mocks
GCSClientMock
Mock GCS client for file operations without real GCP connectivity.
from gcp_pipeline_tester.mocks import GCSClientMock
def test_gcs_operations():
mock = GCSClientMock()
# Write file
mock.write_file("gs://bucket/file.csv", "id,name\n1,John")
# Get all written files
files = mock.get_written_files()
assert "gs://bucket/file.csv" in files
assert files["gs://bucket/file.csv"] == "id,name\n1,John"
BigQueryClientMock
Mock BigQuery client for table operations.
from gcp_pipeline_tester.mocks import BigQueryClientMock
def test_bigquery_operations():
mock = BigQueryClientMock()
# Insert rows
errors = mock.insert_rows_json(
"project.dataset.table",
[{"id": "1", "name": "John"}, {"id": "2", "name": "Jane"}]
)
assert errors == []
# Get inserted rows
rows = mock.get_inserted_rows()
assert len(rows) == 2
# Query (returns inserted rows)
results = mock.query("SELECT * FROM dataset.table")
PubSubClientMock
Mock Pub/Sub client for messaging.
from gcp_pipeline_tester.mocks import PubSubClientMock
def test_pubsub_operations():
mock = PubSubClientMock()
# Publish message
mock.publish("projects/proj/topics/topic", {"event": "file_ready"})
# Get published messages
messages = mock.get_published_messages()
assert len(messages) == 1
# Pull messages
pulled = mock.pull("projects/proj/subscriptions/sub")
Builders
RecordBuilder
Fluent builder for test records.
from gcp_pipeline_tester.builders import RecordBuilder
record = (RecordBuilder()
.with_field("id", "123")
.with_field("name", "John")
.with_field("email", "john@example.com")
.with_field("amount", 45.67)
.build())
assert record == {"id": "123", "name": "John", "email": "john@example.com", "amount": 45.67}
CSVRecordBuilder
Builder for CSV records (all values converted to strings).
from gcp_pipeline_tester.builders import CSVRecordBuilder
record = (CSVRecordBuilder(["id", "name", "amount"])
.with_field("id", 123)
.with_field("name", "John")
.with_field("amount", 45.67)
.build())
# All values are strings for CSV
assert record == {"id": "123", "name": "John", "amount": "45.67"}
PipelineConfigBuilder
Builder for pipeline configurations.
from gcp_pipeline_tester.builders import PipelineConfigBuilder
config = (PipelineConfigBuilder()
.with_pipeline_name("test_pipeline")
.with_run_id("run_001")
.with_source_file("gs://bucket/input.csv")
.build())
Fixtures
Ready-to-use pytest fixtures for common testing scenarios.
import pytest
from gcp_pipeline_tester.fixtures import sample_records, sample_config_dict
def test_with_sample_records(sample_records):
"""Use pre-generated sample records."""
assert len(sample_records) > 0
assert 'id' in sample_records[0]
def test_with_config(sample_config_dict):
"""Use sample pipeline config dictionary."""
assert sample_config_dict['pipeline_name'] == 'test_pipeline'
Available Fixtures
| Fixture | Description |
|---|---|
sample_records |
List of sample record dictionaries |
sample_config_dict |
Sample PipelineConfig dictionary |
gcs_client_mock |
GCSClientMock instance |
bq_client_mock |
BigQueryClientMock instance |
test_pipeline |
Beam test pipeline |
Assertions
Domain-specific assertion functions for various testing levels.
Record Assertions
Location: gcp_pipeline_tester.assertions.record_assertions
Assertions for validating individual record dictionaries.
| Assertion | Description |
|---|---|
assert_field_exists(record, field) |
Ensure field is present |
assert_field_value(record, field, value) |
Check specific field value |
assert_record_valid(record, schema) |
Validate record against schema |
assert_field_type(record, field, expected_type) |
Check field value type |
assert_field_not_empty(record, field) |
Check field is not empty |
Beam Assertions
Location: gcp_pipeline_tester.assertions.beam_assertions
Assertions for validating Apache Beam PCollection contents.
from gcp_pipeline_tester.assertions import assert_pcollection_equal, assert_record_structure
# Verify PCollection matches expected elements
assert_pcollection_equal(pcollection, expected_elements)
# Verify record has expected structure
assert_record_structure(record, expected_fields=["id", "name", "amount"])
Pipeline Assertions
Location: gcp_pipeline_tester.assertions.pipeline_assertions
Assertions for high-level pipeline execution status.
| Assertion | Description |
|---|---|
assert_pipeline_success(audit_record) |
Verify success flag in audit |
assert_no_errors(error_handler) |
Ensure no errors were recorded |
assert_metrics_recorded(metrics) |
Verify metrics were emitted |
assert_audit_trail_complete(audit) |
Check for required audit fields |
assert_pipeline_error_count(errors, count) |
Verify expected number of errors |
Dual-Run Comparison
Compare source (mainframe CSV) with target (BigQuery) for migration validation.
from gcp_pipeline_tester.comparison import DualRunComparison
comparison = DualRunComparison(
project_id="my-project",
source_file="mainframe_output.csv",
target_table="project:dataset.table",
job_name="customer_migration",
tolerance_percent=1.0, # Allow 1% difference
)
# Run comparison
report = comparison.compare()
# Check results
print(report.summary())
assert report.overall_status == "PASS"
Comparison Checks
| Check | Description |
|---|---|
row_count |
Compare record counts |
column_match |
Verify all columns present |
data_hash |
Compare data checksums |
sample_validation |
Validate sample records |
BDD Testing
Behavior-driven development support with Gherkin step definitions.
Data Quality Steps
Location: gcp_pipeline_tester.bdd.steps.dq_steps
Pre-built steps for testing data quality rules.
Given a record with ssn value "123-45-6789"
When I run the data quality validation
Then the record should be marked as valid
Pipeline Steps
Location: gcp_pipeline_tester.bdd.steps.pipeline_steps
Steps for testing end-to-end pipeline execution.
Given a pipeline for entity "customers"
And a source file "gs://bucket/customers.csv"
When I execute the pipeline
Then the pipeline should complete successfully
And the target table should contain 100 records
Writing Feature Files
# features/validation.feature
Feature: SSN Validation
Scenario: Valid SSN passes validation
Given I have a record with SSN "123-45-6789"
When I validate the SSN
Then validation should pass
Scenario: Invalid SSN fails validation
Given I have a record with SSN "invalid"
When I validate the SSN
Then validation should fail with error "Invalid SSN format"
Implementing Steps
from gcp_pipeline_tester.bdd import PipelineScenarioTest
from pytest_bdd import given, when, then, scenario
class TestSSNValidation(PipelineScenarioTest):
@scenario('features/validation.feature', 'Valid SSN passes validation')
def test_valid_ssn(self):
pass
@given('I have a record with SSN "<ssn>"')
def given_ssn(self, ssn):
self.context['ssn'] = ssn
@when('I validate the SSN')
def when_validate(self):
self.context['errors'] = validate_ssn(self.context['ssn'])
@then('validation should pass')
def then_pass(self):
assert len(self.context['errors']) == 0
Running Tests
# Run all tests
./run_tests.sh
# Or manually
PYTHONPATH=src pytest tests/unit -v
# With coverage
PYTHONPATH=src pytest tests/unit -v --cov=src/gcp_pipeline_tester --cov-report=html
Key Findings
1. Robust Mocking Infrastructure
- Comprehensive Mocks: Provides high-fidelity mock implementations for GCS, BigQuery, and Pub/Sub.
- Stateless Testing: Ensures that unit tests across the entire monorepo can run in isolated CI environments without requiring live GCP credentials or connectivity.
2. Standardized Base Classes
- Foundational Support: Includes
BasePipelineTest,BaseBeamTest, andBaseValidationTestto enforce consistent testing patterns and provide common assertions.
3. BDD-Style Integration Testing
- Complex Scenarios: Supports Behavior-Driven Development (BDD) using Gherkin-style steps.
- End-to-End Validation: Facilitates testing of multi-stage pipelines (e.g., Discovery -> Ingestion -> Transformation) in a single scenario.
4. Dual-Run Comparison
- Migration Verification: Specialized utilities to compare outputs from legacy systems against GCP-processed results, ensuring parity.
Governance & Compliance
- Unified Strategy: Integrated for a unified release and tagging strategy (
libs-1.0.x). - Standardized Mocking: Developers MUST use
testermocks instead of customunittest.mock.Mockobjects for GCP services to ensure consistency. - BDD Expansion: Encouraged use of BDD scenarios for any new multi-stage orchestration or processing logic.
Project Structure
gcp-pipeline-tester/
โโโ README.md # This file
โโโ pyproject.toml # Package configuration
โโโ pytest.ini # Test configuration
โโโ run_tests.sh # Test runner script
โโโ src/
โ โโโ gcp_pipeline_tester/
โ โโโ __init__.py # Main exports
โ โโโ base/ # Base test classes
โ โ โโโ pipeline_test.py # BasePipelineTest
โ โ โโโ beam_test.py # BaseBeamTest
โ โ โโโ validation_test.py
โ โ โโโ result.py # TestResult dataclass
โ โโโ mocks/ # GCP service mocks
โ โ โโโ gcs_mock.py
โ โ โโโ bigquery_mock.py
โ โ โโโ pubsub_mock.py
โ โโโ fixtures/ # Pytest fixtures
โ โ โโโ common.py
โ โ โโโ gcs.py
โ โ โโโ bigquery.py
โ โ โโโ beam.py
โ โโโ builders/ # Test data builders
โ โ โโโ record_builder.py
โ โ โโโ csv_builder.py
โ โ โโโ config_builder.py
โ โโโ comparison/ # Dual-run comparison
โ โ โโโ dual_run.py
โ โโโ assertions/ # Custom assertions
โ โโโ bdd/ # BDD step definitions
โโโ tests/
โโโ unit/ # Unit tests
API Reference
Base Classes
| Class | Description |
|---|---|
BasePipelineTest |
Root test class with record assertions |
BaseValidationTest |
Test class for validation logic |
BaseBeamTest |
Test class for Apache Beam pipelines |
TestResult |
Standardized test result dataclass |
Mocks
| Mock | Description |
|---|---|
GCSClientMock |
Mock GCS client for file operations |
GCSBucketMock |
Mock GCS bucket |
BigQueryClientMock |
Mock BigQuery client |
BigQueryTableMock |
Mock BigQuery table |
PubSubClientMock |
Mock Pub/Sub client |
Builders
| Builder | Description |
|---|---|
RecordBuilder |
Fluent builder for test records |
CSVRecordBuilder |
Builder for CSV records (converts to strings) |
PipelineConfigBuilder |
Builder for pipeline configurations |
Comparison
| Class | Description |
|---|---|
DualRunComparison |
Compare source vs target for migration validation |
ComparisonResult |
Single comparison check result |
ComparisonReport |
Complete comparison report |
Dependencies
- Python 3.9+
- pytest >= 7.0.0
- google-cloud-bigquery >= 3.0.0
- google-cloud-storage >= 2.0.0
- google-cloud-pubsub >= 2.0.0
Related Libraries
- gcp-pipeline-core and gcp-pipeline-beam: Core pipeline building libraries
License
This library is part of the Legacy Migration Reference project.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gcp_pipeline_tester-1.0.29.tar.gz.
File metadata
- Download URL: gcp_pipeline_tester-1.0.29.tar.gz
- Upload date:
- Size: 33.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7600986194feb4992a234091dad896364d33ff6dd32e664faa513b4b28792de2
|
|
| MD5 |
f1e79878482f397bcb3d97613a429d56
|
|
| BLAKE2b-256 |
68fac5518b8edd25120d3dc771736656f67a683a07ae6b5fa922345c21de57a2
|
File details
Details for the file gcp_pipeline_tester-1.0.29-py3-none-any.whl.
File metadata
- Download URL: gcp_pipeline_tester-1.0.29-py3-none-any.whl
- Upload date:
- Size: 38.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
480c7d5e032ae96bcedb8a01b5ed90e1e7d5e5212f9700bf6534f64273be387e
|
|
| MD5 |
145aa92c0695dd8896895418e633d3c8
|
|
| BLAKE2b-256 |
df2a4885df0e202d35b019f305b542fc11b49970c4e7a9e78fb6e90e70044ca1
|