Skip to main content

A lightweight, automated testing system for data pipelines and tools

Project description

Locaria Integrated Testing Framework

A lightweight, automated testing system for data pipelines and tools. Focuses on business-logic validation, data quality checks, and operational sanity tests rather than UI or cosmetic testing.

Features

  • Business Logic Validation - Test time splits sum to 100%, financial ratios are within bounds, etc.
  • Data Quality Checks - Schema validation, null checks, row count sanity, data freshness
  • Configurable Thresholds - Firestore-based configuration for easy threshold updates
  • Integrated Logging - BigQuery batch logging with consolidated test results (one row per test, grouping multiple issues)
  • Email Alerts - Real-time failure notifications via existing email manager API
  • Pipeline-Specific Tests - Custom business logic validation for different data domains

Quick Start

Basic Usage

All generic test methods support two optional permission fields:

  • issue_owner – who should receive the alert email (string or list, defaults to "analytics_hub.data_team_ack").
  • issue_ack_access – who should be able to view/acknowledge the issue in the FIN Profitability acknowledgment manager (defaults to issue_owner, but can differ).
from locaria_integrated_testing import DataQualityTests, FreshnessTests, RowCountTests, DuplicateTests, create_testkit

# Initialize TestKit directly (works with any configstore or no configstore)
testkit = create_testkit(
    repository_name="your_repository",
    pipeline_name="your_pipeline",
    bigquery_client=None,  # Required: BigQuery client instance for test result logging
    firestore_project_id=None,  # Optional: defaults to "locaria-dev-config-store"
    fail_on_error=False  # Optional: if True, pipeline stops on test failures
)

# Initialize test classes
data_quality_tests = DataQualityTests(testkit, caller_script="your_pipeline.py")
freshness_tests = FreshnessTests(testkit, caller_script="your_pipeline.py")
row_count_tests = RowCountTests(testkit, caller_script="your_pipeline.py")
duplicate_tests = DuplicateTests(testkit, caller_script="your_pipeline.py")

try:
    # Your data pipeline code
    df = extract_data()
    
    # Stage 1: Data quality tests with explicit permissions
    data_quality_tests.check_data_completeness(
        df, 
        completeness_threshold=0.95,
        issue_owner="analytics_hub.data_team_ack",
        issue_ack_access="analytics_hub.ops_ack"
    )
    duplicate_tests.check_duplicate_records(
        df, 
        ["employee_id", "date"],
        issue_owner="analytics_hub.data_team_ack",
        issue_ack_access="analytics_hub.ops_ack"
    )
    
    # Stage 2: Transform and load
    df_transformed = transform_data(df)
    row_count_tests.check_row_count_change(
        df_transformed, 
        "table_name", 
        "append",
        issue_owner="analytics_hub.data_team_ack",
        issue_ack_access="analytics_hub.ops_ack"
    )
    
    # Stage 3: Freshness checks
    load_to_bq(df_transformed, table="finance.time_splits")
    freshness_tests.check_data_freshness(
        df_transformed, 
        "timestamp",
        issue_owner="analytics_hub.data_team_ack",
        issue_ack_access="analytics_hub.ops_ack"
    )
    
finally:
    # Always finalize the test run
    testkit.finalize_run()

For complete working examples, see the plunet_employee_table pipeline or update_global_content_table pipeline in the locate_2_pulls repository.

Environment Setup

The framework is configstore-agnostic and accepts arguments directly. You can use it with any configstore implementation or without one entirely. TestKit requires a BigQuery client for logging test results to BigQuery.

Required Parameters

  • bigquery_client: A Google Cloud BigQuery client instance. This is required for logging test results to the locaria-dev-config-store.cross_project_logging.integrated_test_logs table.

Optional Environment Variables

For configuration when not using a configstore:

# Email API configuration (fallback)
export EMAIL_API_URL="https://your-app.appspot.com/api/tools/send_email_direct"

Default Configuration

The framework uses sensible defaults when parameters are not provided:

  • Firestore Project: locaria-dev-config-store (default, can be overridden via firestore_project_id)
  • BigQuery Logging: Test results are written to locaria-dev-config-store.cross_project_logging.integrated_test_logs during finalize_run(). Results are consolidated by test name: when a test generates multiple issues (e.g., 40 duplicate warnings), they are grouped into a single summary row with a succinct message showing the count and first few identifiers, with full details stored in the metrics JSON field.
  • Email API: Can be configured via environment variable EMAIL_API_URL

If you're using a configstore (like the one in locate_2_pulls), you can extract values from it and pass them to TestKit. The framework doesn't depend on any specific configstore implementation.

Test Classes

SchemaTests

Schema validation tests for data quality assurance:

  • check_required_columns() - Validate required columns exist
  • check_data_types() - Validate column data types
  • check_null_constraints() - Check for nulls in critical fields
  • check_unique_constraints() - Validate unique key constraints
  • check_column_values() - Check values within expected ranges or sets
  • check_schema_completeness() - Comprehensive schema validation

DataQualityTests

Data quality tests for common validation scenarios. All methods support optional issue_owner (email recipients) and issue_ack_access (acknowledgment UI access) parameters for permission management:

  • check_numeric_ranges() - Values within expected ranges
  • check_data_completeness() - Data completeness above threshold
  • check_column_completeness() - Column-level completeness validation
  • check_date_ranges() - Date values within reasonable bounds

FreshnessTests

Data freshness tests for ensuring data is up-to-date. All methods support optional issue_owner / issue_ack_access parameters:

  • check_data_freshness() - Verify data is up-to-date
  • check_timestamp_progression() - Timestamps moving forward
  • check_data_consistency() - Data frequency and gap validation
  • check_partition_freshness() - BigQuery partition freshness validation
  • check_data_age_distribution() - Data age distribution analysis

Configuration

Configuration is stored in Firestore in the locaria-dev-config-store project (default, can be overridden via firestore_project_id parameter) under the integrated_testing_config collection.

Default Configuration

{
  "thresholds": {
    "row_count_change": {
      "warn_percentage": 20,
      "fail_percentage": 50
    },
    "out_of_office_percentage": {
      "warn_threshold": 25,
      "fail_threshold": 35
    },
    "time_split_tolerance": {
      "precision": 0.01
    },
    "data_freshness": {
      "max_age_hours": 24,
      "warn_age_hours": 12
    }
  },
  "test_switches": {
    "enable_schema_validation": true,
    "enable_business_logic_checks": true,
    "enable_freshness_checks": true,
    "enable_row_count_validation": true
  },
  "email_alerts": {
    "failure_recipients": ["data_team@locaria.com"],
    "warning_recipients": ["data_team@locaria.com"],
    "digest_frequency": "daily"
  }
}

Managing Configuration

from modules.integrated_tests import ConfigManager

# Initialize config manager
config_manager = ConfigManager()

# Create default configuration for a repository
config_manager.create_default_config_for_repository("your_repository")

# Update thresholds
config_manager.update_thresholds(
    "your_repository",
    "row_count_change",
    {"warn_percentage": 15, "fail_percentage": 40}
)

# Update test switches
config_manager.update_test_switches(
    "your_repository",
    {"enable_schema_validation": False}
)

Test Severity Levels

  • FAIL - Stops pipeline execution, logs error, sends immediate email alert
  • WARN - Continues pipeline execution, logs warning, sends digest email
  • PASS - Test passed, logs success

Email Templates

The framework uses pre-configured email templates in the email manager:

  • Test Failure Alert - Immediate notification for FAIL results
  • Test Warning Digest - Grouped notification for WARN results

Permission-Aware Recipients

When you include an issue_owner permission (string or list) in the metrics of log_warn/log_fail, TestKit now looks up the Access Controller Firestore (locaria-prod-authenticator) to determine which users have that permission. Their email addresses are automatically appended to the send_email_direct request (using append_recipients) so the correct acknowledge owners are copied without manual configuration.

Override the lookup project with TESTKIT_ACCESS_CONTROL_PROJECT=<project_id> if you maintain a different Access Controller instance. If the resolver cannot connect (e.g., missing credentials), the framework gracefully falls back to the static recipients configured in Firestore.

Email Routing Grants

In addition to issue_owner permissions set in pipeline code, TestKit also queries email routing grants from the Acknowledgment Manager Access Control system. Grants with email_routing: true can add additional email recipients based on repository, pipeline, and test matching. This allows admins to configure email routing through a web interface without modifying pipeline code.

When finalize_run() is called, TestKit:

  1. Collects permissions from issue_owner and issue_ack_access fields (as before)
  2. Queries Firestore for grants with email_routing: true
  3. Matches grants against each issue's repository, pipeline, and test_name
  4. Adds permissions from matching grants to the email recipient list
  5. Resolves all permissions to email addresses via PermissionResolver

Email routing grants use OR logic - they add recipients but don't replace issue_owner recipients. This ensures backward compatibility while enabling flexible email routing configuration.

Acknowledgment System

The acknowledgment system prevents email spam by allowing users to acknowledge known issues, which mutes them for a configurable period. Both warnings and failures can be acknowledged and stored in Firestore. The system supports permission-based access control through two fields:

  • issue_owner: determines which permission group(s) receive the alert email. This field also serves as the default acknowledgment permission when issue_ack_access is not provided.
  • issue_ack_access: (optional) determines who can see and acknowledge the issue in the Analytics Hub web interface. Provide this when you need a broader (or narrower) acknowledgment audience than the email recipients.

When you log an issue with log_warn or log_fail, include one or both fields in the metrics dictionary. If you provide a list for either field, users with any of those permissions can acknowledge the issue. Users with admin permissions can see and acknowledge all issues regardless of the settings.

The system works by detecting issues during test execution and storing them in Firestore during finalize_run(). The email system checks acknowledgment status before sending notifications, preventing spam for known issues. Users can acknowledge issues through the web interface, which mutes them for a configurable period (default 7 days). When mute periods expire, the framework automatically archives the old issue and creates a new unacknowledged issue, ensuring issues reappear for review after their mute period ends. Archived issues are preserved in the archives subcollection for historical tracking.

Firestore Structure

Collection: pipeline_acknowledgments
└── Document: {repo}%{pipeline}%{test_name}
    └── Subcollection: issues
        └── Document: {issue_key_simple}
            - acknowledged: bool
            - muted_until: timestamp (UTC)
            - status: "WARN" or "FAIL"
            - identifier: str
            - details: str
            - issue_first_occurrence: timestamp (UTC)
            - issue_last_occurrence: timestamp (UTC)
            - issue_owner: str or List[str]
            - issue_ack_access: Optional[str or List[str]]
            - acknowledged_by / acknowledged_at / acknowledgment_reason
    └── Subcollection: archives
        └── Document: {issue_key_simple}
            - Archived issues (expired mutes or manually deleted)

Web Interface

The acknowledgment system includes a modern web interface accessible at /tools/acknowledgment-manager in the Analytics Hub. The interface automatically filters issues based on your user permissions, showing only issues where you have the required issue_ack_access permission (or issue_owner when no explicit access list is provided).

The web interface provides real-time filtering by repository, pipeline, test type, and issue status. You can acknowledge or unacknowledge individual issues with configurable mute periods, view comprehensive issue information including first and last occurrence timestamps and ownership details, and perform bulk operations to handle multiple issues at once. When mute periods expire, the framework automatically archives the old issue and creates a new unacknowledged issue, ensuring issues reappear for review. Manual deletions are also archived for historical tracking.

For implementation details, see the acknowledgment manager routes and acknowledgment manager module in the FIN_profitability_gcp repository.

Documentation

Comprehensive documentation is available in the documentation/ directory:

Examples

See the locate_2_pulls repository for complete pipeline implementations:

Architecture

integrated_tests/
├── __init__.py                 # Main module exports
├── main/
│   └── testkit.py             # Core framework and orchestration
├── utils/
│   └── config_manager.py      # Firestore configuration management
├── generic_tests/
│   ├── __init__.py
│   ├── schema_tests.py        # Schema validation tests
│   ├── data_quality_tests.py  # Data quality tests
│   └── freshness_tests.py     # Data freshness tests
├── pipeline_specific_tests/   # Business logic tests per domain
│   └── __init__.py
├── examples/
│   └── sample_pipeline.py     # Usage examples
└── README.md

Best Practices

Test Design

Focus on business logic and data quality rather than just schema validation. Use descriptive test names that clearly explain the business rule being validated. Test at multiple stages of your pipeline: during data intake, after transformation, after loading to BigQuery, and in post-load validation. Include both positive and negative test cases to ensure comprehensive coverage.

Permission Management

Always explicitly specify the issue_owner parameter when calling test methods, even though it has a default value. This makes permission assignment clear and consistent across your codebase. Use a single permission string for issues that should be handled by one team, or a list of permissions when multiple teams need access. When the people who should acknowledge an issue differ from the people who should be emailed, add an issue_ack_access list so the right teams can mute issues without spamming the wider distribution. The default "analytics_hub.data_team_ack" (defined in config_manager.DEFAULT_ISSUE_OWNER_PERMISSION) remains appropriate for most data quality issues unless you need to route alerts elsewhere.

Error Handling

Always use try/finally blocks to ensure testkit.finalize_run() executes even when exceptions occur. This guarantees that test results are logged and emails are sent. Handle missing data gracefully by checking for None or empty DataFrames before running tests. Provide meaningful error messages that include context about what data was being tested and why the test failed. Log sufficient context in the metrics dictionary to enable effective debugging later.

Performance

Batch test operations when possible to reduce overhead. Use efficient pandas operations like vectorized comparisons rather than iterating over rows. Avoid unnecessary data copies by working with views or using in-place operations when appropriate. Cache configuration values when they're accessed frequently during a pipeline run.

Configuration

Use Firestore for dynamic configuration that can be updated without code changes. Provide sensible defaults for all thresholds and settings. Document all thresholds and switches clearly so other team members understand what each setting controls. Version control configuration changes by tracking them in your repository's changelog or configuration management system.

Troubleshooting

Common Issues

  1. BigQuery Logging Not Working

    • Verify BigQuery client is properly initialized
    • Check BigQuery permissions for the service account
    • Ensure cross_project_logging dataset exists in locaria-dev-config-store
    • Verify integrated_test_logs table exists or can be created
  2. Email Alerts Not Sending

    • Check EMAIL_API_URL environment variable
    • Verify email templates are configured in email manager
    • Check network connectivity
  3. Firestore Configuration Issues

    • Verify locaria-dev-config-store project access
    • Check collection and document permissions
    • Ensure configuration document exists
  4. Test Failures

    • Check test thresholds in Firestore
    • Verify data quality and schema
    • Review test logic and business rules

Debug Mode

Enable debug logging by setting the log level in configuration:

config_manager.update_repository_config(
    "your_repository",
    {"logging": {"log_level": "DEBUG"}}
)

Contributing

When adding new tests:

  1. Follow the existing naming conventions
  2. Include comprehensive error handling
  3. Add configuration options for thresholds
  4. Update documentation
  5. Add examples for new functionality

Additional Resources

Support

For questions or issues, contact the Data Team at data_team@locaria.com.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

locaria_integrated_testing-1.2.1.tar.gz (75.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

locaria_integrated_testing-1.2.1-py3-none-any.whl (77.4 kB view details)

Uploaded Python 3

File details

Details for the file locaria_integrated_testing-1.2.1.tar.gz.

File metadata

File hashes

Hashes for locaria_integrated_testing-1.2.1.tar.gz
Algorithm Hash digest
SHA256 825180838a914809a00df78e42d98a3efb098086e191932b95551ad9214f66d3
MD5 4a1fb97ba741159ece139f37ce9df434
BLAKE2b-256 f8cc48325f3fe24aae5b6f33349d0979271090f700c35b6035b8a0199a33cfd3

See more details on using hashes here.

File details

Details for the file locaria_integrated_testing-1.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for locaria_integrated_testing-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 38687687408c602f9a5c3c20addcb4e4ab67081919e251aca6ba246d1deece62
MD5 9c71010bfb063f5bbcec92653c83ecc9
BLAKE2b-256 378d8cae4b8cd8e07650c27bca8178bdaa4a0d5adcdd5e68e157c9672ecd9b71

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page