A Python SDK for performing data quality validations on streaming data records and DataFrames
Project description
IBM watsonx.data intelligence SDK Version 0.5.3
A comprehensive Python SDK for performing data quality validations on streaming data records (arrays), Pandas DataFrames, and PySpark DataFrames with complete REST API integration for IBM Cloud Pak for Data.
Features
Core Validation
- Array-based Records: Optimized for streaming data where records are arrays of values
- Metadata-driven: Define table structure and column mappings once
- Fluent API: Chainable method calls for intuitive rule definition
- Score-based Results: Each validation returns detailed scores and pass rates
- Data Quality Dimensions: Track validation checks by 8 standard DQ dimensions (Accuracy, Completeness, Conformity, Consistency, Coverage, Timeliness, Uniqueness, Validity)
- Nine Validation Checks: Comprehensive validation coverage
- LengthCheck: Validates length of any value (converts to string)
- ValidValuesCheck: Validates against allowed list with case-insensitive option
- ComparisonCheck: Compares values using operators, supports all types
- CaseCheck: Validates character case (upper, lower, name, sentence)
- CompletenessCheck: Validates presence (non-null) of values
- RangeCheck: Validates values within min/max range
- RegexCheck: Validates values match regular expression patterns
- FormatCheck: Validates value formats using intelligent format detection
- DataTypeCheck: Validates data types with intelligent type inference
- Type Safety: Full type hints throughout
- Extensible: Easy to add new checks via BaseCheck
DataFrame Integration
- Pandas Support: Memory-efficient chunked processing for large DataFrames
- PySpark Support: Distributed validation using Spark UDFs
- Consistent API: Same interface for both Pandas and PySpark
- Struct Column Output: Single validation result column containing all metrics
- Scalable: Handles DataFrames from thousands to billions of rows
REST API Integration
- GlossaryProvider: Fetch glossary terms and data quality constraints from IBM Cloud Pak for Data
- CamsProvider: Fetch data assets from CAMS (Catalog Asset Management System)
- IssuesProvider: Manage data quality issues (occurrences, tested records, ignored status)
- DQSearchProvider: Search for DQ checks and assets by native ID
- Thread-Safe: Concurrent access support with thread-local sessions
Authentication
- Multi-Environment Support: IBM Cloud, AWS Cloud, Government Cloud, and On-Premises
- Automatic Protocol Handling: Environment-specific authentication methods
- Type-Safe Configuration: Full type hints and validation
- SSL Control: Configurable SSL verification for on-premises
Installation
From Source
git clone https://github.com/IBM/data-intelligence-sdk.git
cd data-intelligence-sdk
pip install -e .
With DataFrame Support
# Install with Pandas support
pip install -e ".[pandas]"
# Install with PySpark support
pip install -e ".[spark]"
# Install with both Pandas and PySpark
pip install -e ".[dataframes]"
# Install everything (including dev dependencies)
pip install -e ".[all]"
For Development
pip install -e ".[dev]"
Quick Start
Array-based Validation
from wxdi.dq_validator import (
AssetMetadata, ColumnMetadata, DataType,
Validator, ValidationRule,
ComparisonCheck, ComparisonOperator, ValidValuesCheck, LengthCheck
)
# 1. Define asset metadata
metadata = AssetMetadata(
table_name='employee_data',
columns=[
ColumnMetadata('emp_id', DataType.INTEGER),
ColumnMetadata('name', DataType.STRING, length=100),
ColumnMetadata('age', DataType.INTEGER),
ColumnMetadata('department', DataType.STRING, length=50),
ColumnMetadata('salary', DataType.DECIMAL, precision=10, scale=2),
]
)
# 2. Create validator with rules
validator = Validator(metadata)
# Add validation rules
validator.add_rule(
ValidationRule('name')
.add_check(LengthCheck(min_length=2, max_length=100))
)
validator.add_rule(
ValidationRule('department')
.add_check(ValidValuesCheck(
['Engineering', 'Sales', 'HR', 'Finance'],
case_sensitive=False # Default is False
))
)
validator.add_rule(
ValidationRule('age')
.add_check(ComparisonCheck(
operator=ComparisonOperator.GREATER_THAN_OR_EQUAL,
target_value=18
))
)
# 3. Validate records (arrays)
records = [
[1001, 'John Doe', 30, 'Engineering', 75000.00],
[1002, 'J', 25, 'SALES', 65000.00], # Will fail: name too short
[1003, 'Bob Smith', 17, 'HR', 50000.00], # Will fail: age < 18
]
results = validator.validate_batch(records)
# 4. Check results
for idx, result in enumerate(results):
if result.is_valid:
print(f"Record {idx}: ✓ PASS (Score: {result.score})")
else:
print(f"Record {idx}: ✗ FAIL (Score: {result.score})")
for error in result.errors:
print(f" - {error.column_name}: {error.message}")
Pandas DataFrame Validation
import pandas as pd
from wxdi.dq_validator import AssetMetadata, ColumnMetadata, DataType, Validator, ValidationRule
from wxdi.dq_validator.checks import LengthCheck, ValidValuesCheck
from wxdi.dq_validator.integrations import PandasValidator
# Define metadata and validator (same as array-based validation)
metadata = AssetMetadata(
table_name='employees',
columns=[
ColumnMetadata('emp_id', DataType.INTEGER),
ColumnMetadata('name', DataType.STRING, length=100),
ColumnMetadata('department', DataType.STRING, length=50),
]
)
validator = Validator(metadata)
validator.add_rule(ValidationRule('name').add_check(LengthCheck(min_length=2)))
validator.add_rule(ValidationRule('department').add_check(
ValidValuesCheck(['Engineering', 'Sales', 'HR'], case_sensitive=False)
))
# Create DataFrame
df = pd.DataFrame({
'emp_id': [1001, 1002, 1003],
'name': ['John Doe', 'J', 'Alice'],
'department': ['Engineering', 'SALES', 'Marketing']
})
# Create Pandas validator
pandas_validator = PandasValidator(validator, chunk_size=10000)
# Get summary statistics (memory efficient)
summary = pandas_validator.get_summary_statistics(df)
print(f"Pass Rate: {summary['pass_rate']:.2f}%")
# Add validation column (returns DataFrame with struct column)
df_validated = pandas_validator.add_validation_column(df)
print(df_validated['dq_validation_result'])
# Get invalid rows
invalid_df = pandas_validator.get_invalid_rows(df)
print(f"Found {len(invalid_df)} invalid rows")
# Expand validation column into separate columns
df_expanded = pandas_validator.expand_validation_column(df_validated)
print(df_expanded[['name', 'dq_is_valid', 'dq_score', 'dq_pass_rate']])
PySpark DataFrame Validation
from pyspark.sql import SparkSession
from wxdi.dq_validator import AssetMetadata, ColumnMetadata, DataType, Validator, ValidationRule
from wxdi.dq_validator.checks import LengthCheck, ValidValuesCheck
from wxdi.dq_validator.integrations import SparkValidator
# Initialize Spark
spark = SparkSession.builder.appName("DataQuality").getOrCreate()
# Define metadata and validator (same as above)
metadata = AssetMetadata(
table_name='employees',
columns=[
ColumnMetadata('emp_id', DataType.INTEGER),
ColumnMetadata('name', DataType.STRING, length=100),
ColumnMetadata('department', DataType.STRING, length=50),
]
)
validator = Validator(metadata)
validator.add_rule(ValidationRule('name').add_check(LengthCheck(min_length=2)))
validator.add_rule(ValidationRule('department').add_check(
ValidValuesCheck(['Engineering', 'Sales', 'HR'], case_sensitive=False)
))
# Create DataFrame
df = spark.createDataFrame([
(1001, 'John Doe', 'Engineering'),
(1002, 'J', 'SALES'),
(1003, 'Alice', 'Marketing')
], ['emp_id', 'name', 'department'])
# Create Spark validator
spark_validator = SparkValidator(validator)
# Get summary statistics (distributed aggregation)
summary = spark_validator.get_summary_statistics(df)
print(f"Pass Rate: {summary['pass_rate']:.2f}%")
# Add validation column (returns DataFrame with struct column)
df_validated = spark_validator.add_validation_column(df)
df_validated.select('name', 'dq_validation_result').show()
# Get invalid rows (distributed filtering)
invalid_df = spark_validator.get_invalid_rows(df)
print(f"Found {invalid_df.count()} invalid rows")
# Expand validation column
df_expanded = spark_validator.expand_validation_column(df_validated)
df_expanded.select('name', 'dq_is_valid', 'dq_score', 'dq_pass_rate').show()
# Write validation report
spark_validator.write_validation_report(df, output_path='validation_report', format='parquet')
Core Concepts
AssetMetadata
Defines the structure of your data asset (table) with column information:
metadata = AssetMetadata(
table_name='my_table',
columns=[
ColumnMetadata('id', DataType.INTEGER),
ColumnMetadata('name', DataType.STRING, length=100),
ColumnMetadata('amount', DataType.DECIMAL, precision=10, scale=2),
]
)
ValidationRule
Defines validation rules for a specific column:
rule = ValidationRule('column_name')
rule.add_check(LengthCheck(min_length=5, max_length=50))
rule.add_check(ValidValuesCheck(['value1', 'value2']))
Validator
Orchestrates validation across all rules:
validator = Validator(metadata)
validator.add_rule(rule1)
validator.add_rule(rule2)
result = validator.validate(record) # Single record
results = validator.validate_batch(records) # Multiple records
Data Quality Dimensions
Each validation check is associated with a Data Quality Dimension that categorizes the type of quality issue it addresses. The SDK supports 8 standard data quality dimensions:
from wxdi.dq_validator.data_quality_dimension import DataQualityDimension
# Available dimensions:
DataQualityDimension.ACCURACY # Data correctly represents real-world values
DataQualityDimension.COMPLETENESS # All required data is present
DataQualityDimension.CONFORMITY # Data conforms to specified formats
DataQualityDimension.CONSISTENCY # Data is consistent across systems
DataQualityDimension.COVERAGE # Data covers the required scope
DataQualityDimension.TIMELINESS # Data is available when needed
DataQualityDimension.UNIQUENESS # No duplicate records exist
DataQualityDimension.VALIDITY # Data values are valid and reasonable
Default Dimensions by Check:
LengthCheck→ VALIDITYValidValuesCheck→ VALIDITYComparisonCheck→ VALIDITYCaseCheck→ CONSISTENCYCompletenessCheck→ COMPLETENESSRangeCheck→ VALIDITYRegexCheck→ VALIDITYFormatCheck→ VALIDITYDataTypeCheck→ VALIDITY
Getting and Setting Dimensions:
from wxdi.dq_validator.checks import LengthCheck
from wxdi.dq_validator.data_quality_dimension import DataQualityDimension
# Create a check (uses default dimension)
check = LengthCheck(min_length=5, max_length=50)
# Get the current dimension
dimension = check.get_dimension()
print(dimension) # DataQualityDimension.VALIDITY
# Change the dimension
check.set_dimension(DataQualityDimension.CONFORMITY)
# Verify the change
print(check.get_dimension()) # DataQualityDimension.CONFORMITY
Use Cases:
- Categorize validation failures by dimension for better reporting
- Track dimension-specific metrics (e.g., completeness rate, validity rate)
- Prioritize remediation efforts based on dimension criticality
- Align with data governance frameworks that use dimension-based quality metrics
Validation Checks
1. LengthCheck
Validates the length of any value (converted to string).
# String length
LengthCheck(min_length=3, max_length=20)
# Works with any type (converts to string)
LengthCheck(min_length=5) # Integer 12345 → "12345" (length=5)
Parameters:
min_length(int, optional): Minimum allowed length (inclusive)max_length(int, optional): Maximum allowed length (inclusive)
Edge Cases:
- None values: Returns error
- Any type: Converts to string using
str(value) - At least one of min_length or max_length must be specified
2. ValidValuesCheck
Validates that a value is in a predefined list of allowed values.
# Case-insensitive (default)
ValidValuesCheck(['active', 'inactive', 'pending'], case_sensitive=False)
# Case-sensitive
ValidValuesCheck(['Active', 'Inactive'], case_sensitive=True)
Parameters:
valid_values(list): List of allowed valuescase_sensitive(bool, default=False): If False, string comparisons are case-insensitive
Edge Cases:
- None values: Returns error
- Case-insensitive: 'ACTIVE' matches 'active' when case_sensitive=False
- Non-string types: Always exact match (case_sensitive ignored)
3. ComparisonCheck
Validates that a value satisfies a comparison operation.
# Column vs constant
ComparisonCheck(
operator=ComparisonOperator.GREATER_THAN,
target_value=18
)
# Column vs column
ComparisonCheck(
operator=ComparisonOperator.GREATER_THAN,
target_column='min_salary'
)
# Using string operator
ComparisonCheck(operator='>=', target_value=0)
Operators:
ComparisonOperator.GREATER_THANor'>'ComparisonOperator.LESS_THANor'<'ComparisonOperator.GREATER_THAN_OR_EQUALor'>='ComparisonOperator.LESS_THAN_OR_EQUALor'<='ComparisonOperator.EQUALor'=='ComparisonOperator.NOT_EQUALor'!='
Parameters:
operator(ComparisonOperator or str): Comparison operatortarget_column(str, optional): Column name to compare againsttarget_value(any, optional): Constant value to compare against
Supported Types:
- Numbers (int, float, Decimal)
- Strings (lexicographic comparison)
- Dates and datetimes
- Booleans
- Any comparable type
4. CaseCheck
Validates the character case of string values.
from wxdi.dq_validator import CaseCheck, ColumnCaseEnum
# Upper case
CaseCheck(case_type=ColumnCaseEnum.UPPER_CASE)
# Lower case
CaseCheck(case_type=ColumnCaseEnum.LOWER_CASE)
# Name case (Title Case)
CaseCheck(case_type=ColumnCaseEnum.NAME_CASE)
# Sentence case
CaseCheck(case_type=ColumnCaseEnum.SENTENCE_CASE)
Parameters:
case_type(ColumnCaseEnum): Type of case validation
Case Types:
ANY_CASE: Any case is validUPPER_CASE: All uppercase (ABC)LOWER_CASE: All lowercase (abc)NAME_CASE: Title case (John Doe)SENTENCE_CASE: First letter uppercase (Hello world)
5. CompletenessCheck
Validates presence (non-null) of values.
# Require non-null values
CompletenessCheck(missing_values_allowed=False)
# Allow null values
CompletenessCheck(missing_values_allowed=True)
Parameters:
missing_values_allowed(bool): Whether None/null values are allowed
6. RangeCheck
Validates values within min/max range.
# Numeric range
RangeCheck(min_value=0, max_value=100)
# Date range
from datetime import date
RangeCheck(min_value=date(2020, 1, 1), max_value=date(2025, 12, 31))
# String range (lexicographic)
RangeCheck(min_value='A', max_value='Z')
Parameters:
min_value(any, optional): Minimum allowed value (inclusive)max_value(any, optional): Maximum allowed value (inclusive)
Supported Types:
- Numeric types (int, float, Decimal)
- Date and datetime
- Strings (lexicographic comparison)
7. RegexCheck
Validates values match regular expression patterns.
# Phone number pattern
RegexCheck(pattern=r'^\d{3}-\d{3}-\d{4}$')
# Email pattern (case-insensitive)
RegexCheck(pattern=r'^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$', case_sensitive=False)
Parameters:
pattern(str): Regular expression patterncase_sensitive(bool, default=True): Whether pattern matching is case-sensitive
8. FormatCheck
Validates value formats using intelligent format detection.
from wxdi.dq_validator import FormatCheck, FormatConstraintType
# Valid formats
FormatCheck(
constraint_type=FormatConstraintType.ValidFormats,
formats={'%Y-%m-%d', '%d/%m/%Y', '%m-%d-%Y'}
)
# Invalid formats
FormatCheck(
constraint_type=FormatConstraintType.InvalidFormats,
formats={'%Y%m%d'} # Reject this format
)
Parameters:
constraint_type(FormatConstraintType): ValidFormats or InvalidFormatsformats(set): Set of format strings
Features:
- Intelligent format detection using InferredTypeEngine
- Supports date, time, and timestamp formats
- UTF-16 compatible format matching
9. DataTypeCheck
Validates data types with intelligent type inference.
from wxdi.dq_validator import DataTypeCheck, DataType
# Integer type
DataTypeCheck(expected_type=DataType.INTEGER)
# Date type
DataTypeCheck(expected_type=DataType.DATE)
# Decimal type
DataTypeCheck(expected_type=DataType.DECIMAL)
Parameters:
expected_type(DataType): Expected data type
Supported Types:
- INTEGER, FLOAT, DECIMAL
- STRING, BOOLEAN
- DATE, TIME, DATETIME, TIMESTAMP
Features:
- Intelligent type inference
- Handles numeric formats (US and DE)
- Date/time format detection
DataFrame Integration
Features
- Pandas Support: Memory-efficient chunked processing for large DataFrames
- PySpark Support: Distributed validation using Spark UDFs
- Consistent API: Same interface for both Pandas and PySpark
- Struct Column Output: Single validation result column containing all metrics
- Column Prefix: Configurable
dq_prefix to prevent column name conflicts - Summary Statistics: Aggregated validation metrics without collecting data
- Invalid Row Filtering: Extract rows that failed validation
- Column Expansion: Expand struct column into individual columns
PandasValidator
PandasValidator(validator: Validator, chunk_size: int = 10000, column_prefix: str = "dq_")
Methods:
get_summary_statistics(df: pd.DataFrame) -> Dict[str, Any]: Get aggregated validation metricsadd_validation_column(df: pd.DataFrame) -> pd.DataFrame: Add struct column with validation resultsget_invalid_rows(df: pd.DataFrame) -> pd.DataFrame: Filter rows that failed validationget_valid_rows(df: pd.DataFrame) -> pd.DataFrame: Filter rows that passed validationexpand_validation_column(df: pd.DataFrame) -> pd.DataFrame: Expand struct into separate columns
Memory Efficiency:
- Processes data in chunks (default: 10,000 rows)
- O(chunk_size) memory complexity
- Suitable for DataFrames larger than available RAM
SparkValidator
SparkValidator(validator: Validator, column_prefix: str = "dq_")
Methods:
get_summary_statistics(df: DataFrame) -> Dict[str, Any]: Distributed aggregation of validation metricsadd_validation_column(df: DataFrame) -> DataFrame: Add struct column using UDFget_invalid_rows(df: DataFrame) -> DataFrame: Distributed filtering of invalid rowsget_valid_rows(df: DataFrame) -> DataFrame: Distributed filtering of valid rowsexpand_validation_column(df: DataFrame) -> DataFrame: Expand struct into separate columnswrite_validation_report(df: DataFrame, output_path: str, format: str = 'parquet', mode: str = 'overwrite'): Write validation resultsget_error_sample(df: DataFrame, limit: int = 100) -> List[Dict]: Collect sample of errors
Distributed Processing:
- All operations use Spark's distributed computing
- O(1) driver memory for aggregations
- Scales to billions of rows
Validation Result Structure
The dq_validation_result struct column contains:
{
'is_valid': bool, # True if all checks passed
'score': str, # "5/5" format (passed/total)
'pass_rate': float, # Percentage (0-100)
'total_checks': int, # Total number of checks
'passed_checks': int, # Number of passed checks
'failed_checks': int, # Number of failed checks
'error_count': int, # Number of errors
'errors': List[str] # List of error messages
}
REST API Integration
Provider Configuration
ProviderConfig supports two authentication methods:
Option 1: Static Auth Token
from wxdi.dq_validator.provider import ProviderConfig
config = ProviderConfig(
url="https://your-instance.cloud.ibm.com",
auth_token="Bearer your-token",
project_id="your-project-id" # or catalog_id
)
Option 2: AuthConfig (Recommended for automatic token management)
from wxdi.dq_validator.provider import ProviderConfig
from wxdi.common.auth import AuthConfig, EnvironmentType
# Create AuthConfig for your environment
auth_config = AuthConfig(
environment_type=EnvironmentType.IBM_CLOUD,
api_key="your-api-key"
)
# Pass AuthConfig to ProviderConfig
config = ProviderConfig(
url="https://your-instance.cloud.ibm.com",
auth_config=auth_config,
project_id="your-project-id"
)
# Token is automatically retrieved when needed
token = config.auth_token # Calls AuthProvider.get_token() internally
The auth_config parameter enables automatic token management across all providers. When both auth_token and auth_config are provided, auth_config takes precedence for token retrieval.
GlossaryProvider
Fetch glossary terms and data quality constraints from IBM Cloud Pak for Data.
from wxdi.dq_validator.provider import GlossaryProvider
glossary = GlossaryProvider(config)
# Get published artifact by ID
term = glossary.get_published_artifact_by_id("term-id")
# Get term by version ID
term = glossary.get_term_by_version_id("version-id")
CamsProvider
Fetch data assets from CAMS (Catalog Asset Management System).
from wxdi.dq_validator.provider import CamsProvider
cams = CamsProvider(config)
# Get asset by ID
asset = cams.get_asset_by_id(
asset_id="asset-id",
options={"hide_deprecated_response_fields": "false"}
)
# Access column information
for column in asset.column_info:
print(f"Column: {column.name}, Type: {column.data_type}")
IssuesProvider
Manage data quality issues (occurrences, tested records, ignored status).
from wxdi.dq_validator.provider import IssuesProvider
issues = IssuesProvider(config)
# Update issue occurrences
issues.update_issue_occurrences(issue_id="issue-123", occurrences=767)
# Update tested records
issues.update_tested_records(issue_id="issue-123", tested_records=1000)
# Set ignored status
issues.set_issue_ignored(issue_id="issue-123", ignored=True)
# Update multiple metrics at once
issues.update_issue_metrics(
issue_id="issue-123",
occurrences=767,
tested_records=1000
)
DQSearchProvider
Search for DQ checks and assets by native ID.
from wxdi.dq_validator.provider import DQSearchProvider
dq_search = DQSearchProvider(config)
# Search DQ check
check = dq_search.search_dq_check(
native_id="asset-id/check-id",
check_type="format",
project_id="project-id"
)
# Search DQ asset
asset = dq_search.search_dq_asset(
native_id="asset-id/column-name",
asset_type="column",
project_id="project-id"
)
DQAssetsProvider
Retrieve data assets from CAMS with filtering and pagination support.
from wxdi.dq_validator.provider import DQAssetsProvider
assets = DQAssetsProvider(config)
# Get assets by project ID
assets_list = assets.get_assets(
project_id="project-id",
include_children=True,
asset_type="table"
)
# Get assets by catalog ID
assets_list = assets.get_assets(
catalog_id="catalog-id",
limit=100,
start_token="next-page-token"
)
ChecksProvider
Create and manage data quality checks in CAMS.
from wxdi.dq_validator.provider import ChecksProvider
checks = ChecksProvider(config)
# Create a new check
check_id = checks.create_check(
native_id="asset-id/column-name",
check_type="format",
dimension_id="dimension-id",
project_id="project-id"
)
# Get existing checks
checks_list = checks.get_checks(
native_id="asset-id/column-name",
project_id="project-id",
include_children=True
)
DimensionsProvider
Search for data quality dimensions by name.
from wxdi.dq_validator.provider import DimensionsProvider
dimensions = DimensionsProvider(config)
# Search for a dimension by name (case-insensitive)
dimension_id = dimensions.search_dimension("Completeness")
Authentication Module
The SDK includes a comprehensive authentication module for generating Bearer tokens across different IBM Cloud environments and on-premises installations.
Supported Environments
| Environment | Enum Value | Auth Method | Required Credentials |
|---|---|---|---|
| IBM Cloud Standard | EnvironmentType.IBM_CLOUD |
POST (form-encoded) | API Key |
| AWS Cloud (MCSP) | EnvironmentType.AWS_CLOUD |
POST (header-based) | API Key |
| IBM Government Cloud | EnvironmentType.GOV_CLOUD |
POST (JSON) | API Key |
| On-Premises | EnvironmentType.ON_PREM |
GET (headers) | User ID + Password |
Quick Start - Authentication
from wxdi.dq_validator import EnvironmentType, AuthConfig, TokenGenerator
# IBM Cloud Standard (Production)
config = AuthConfig(
url="https://iam.cloud.ibm.com/identity/token",
environment=EnvironmentType.IBM_CLOUD,
api_key="your-api-key-here"
)
generator = TokenGenerator(config)
token = generator.generate_token()
print(token) # Bearer eyJhbGc...
Authentication Examples
1. IBM Cloud Standard
config = AuthConfig(
url="https://iam.cloud.ibm.com/identity/token",
environment=EnvironmentType.IBM_CLOUD,
api_key="your-api-key"
)
generator = TokenGenerator(config)
token = generator.generate_token()
# Returns: "Bearer {access_token}"
2. AWS Cloud (MCSP)
config = AuthConfig(
url="https://account-iam.platform.test.saas.ibm.com/api/2.0/accounts/your-account-id/apikeys/token",
environment=EnvironmentType.AWS_CLOUD,
api_key="your-aws-cloud-api-key"
)
generator = TokenGenerator(config)
token = generator.generate_token()
3. IBM Government Cloud
config = AuthConfig(
url="https://dai.ibmforusgov.com/api/rest/mcsp/apikeys/token",
environment=EnvironmentType.GOV_CLOUD,
api_key="your-gov-api-key"
)
generator = TokenGenerator(config)
token = generator.generate_token()
4. On-Premises Installation
config = AuthConfig(
url="https://localhost:8443/v1/preauth/validateAuth",
environment=EnvironmentType.ON_PREM,
user_id="admin",
password="your-password"
)
generator = TokenGenerator(config)
token = generator.generate_token()
Using Generated Tokens
import requests
# Generate token
config = AuthConfig(
url="https://iam.cloud.ibm.com/identity/token",
environment=EnvironmentType.IBM_CLOUD,
api_key="your-api-key"
)
generator = TokenGenerator(config)
token = generator.generate_token()
# Use token in API requests
headers = {
'Authorization': token, # Already in "Bearer {token}" format
'Content-Type': 'application/json'
}
response = requests.get('https://api.example.com/endpoint', headers=headers)
ValidationResult
Each validation returns a ValidationResult object:
result = validator.validate(record)
# Properties
result.is_valid # bool: True if no errors
result.score # str: "5/5" (passed/total)
result.pass_rate # float: 100.0 (percentage)
result.total_checks # int: Total number of checks
result.passed_checks # int: Number of passed checks
result.failed_checks # int: Number of failed checks
result.errors # List[ValidationError]: List of errors
# Convert to dictionary
result_dict = result.to_dict()
ValidationError
Each error contains detailed information:
error = result.errors[0]
error.column_name # str: Name of the column
error.check_name # str: Type of check that failed
error.message # str: Human-readable error message
error.value # any: The value that failed
error.expected # any: Expected value/constraint
# Convert to dictionary
error_dict = error.to_dict()
Examples
See complete working examples in the examples/ directory:
basic_usage.py- Array-based validation examplepandas_dataframe_usage.py- Pandas DataFrame validation examplespark_dataframe_usage.py- PySpark DataFrame validation exampleconsolidation_usage.py- Consolidated statistics and dimension-based reportingauth_usage.py- Authentication examples (296 lines)assets_usage.py- DQAssetsProvider usage examples (210 lines)glossary_usage.py- GlossaryProvider usage examples (250 lines)checks_usage.py- ChecksProvider usage examples (272 lines)dimensions_usage.py- DimensionsProvider usage examples (146 lines)issues_usage.py- IssuesProvider usage examples (124 lines)dq_workflow_usage.py- Complete DQ workflow (317 lines)
Project Structure
data-intelligence-sdk/
├── src/
│ └── wxdi/
│ ├── __init__.py
│ ├── common/
│ │ ├── __init__.py
│ │ └── auth/
│ │ ├── __init__.py
│ │ ├── auth_config.py
│ │ ├── auth_provider.py
│ │ ├── gov_cloud_authenticator.py
│ │ └── gov_cloud_token_manager.py
│ └── dq_validator/
│ ├── __init__.py
│ ├── metadata.py # DataType, ColumnMetadata, AssetMetadata
│ ├── datatypes.py # DataType enum
│ ├── data_quality_dimension.py # DataQualityDimension enum
│ ├── base.py # BaseCheck, ValidationError
│ ├── result.py # ValidationResult
│ ├── rule.py # ValidationRule
│ ├── validator.py # Validator
│ ├── rule_loader.py # RuleLoader for external providers
│ ├── inferred_engine.py # InferredTypeEngine
│ ├── format_engine.py # FormatEngine
│ ├── issue_reporting.py # Issue reporter utility
│ ├── datetime_formats.py # Date/time format definitions
│ ├── utils.py # Utility functions
│ ├── version.py # Version information
│ ├── result_consolidator.py # Result consolidation
│ ├── checks/
│ │ ├── __init__.py
│ │ ├── length_check.py
│ │ ├── valid_values_check.py
│ │ ├── comparison_check.py
│ │ ├── case_check.py
│ │ ├── completeness_check.py
│ │ ├── range_check.py
│ │ ├── regex_check.py
│ │ ├── format_check.py
│ │ └── datatype_check.py
│ ├── integrations/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── pandas_validator.py
│ │ └── spark_validator.py
│ └── provider/
│ ├── __init__.py
│ ├── base_provider.py
│ ├── config.py
│ ├── glossary.py
│ ├── cams.py
│ ├── assets.py
│ ├── checks.py
│ ├── dimensions.py
│ ├── issues.py
│ ├── dq_search.py
│ ├── constraint_model.py
│ ├── data_asset_model.py
│ └── response_model.py
├── examples/
│ ├── basic_usage.py
│ ├── pandas_dataframe_usage.py
│ ├── spark_dataframe_usage.py
│ ├── auth_usage.py
│ ├── assets_usage.py
│ ├── checks_usage.py
│ ├── dimensions_usage.py
│ ├── issues_usage.py
│ └── dq_workflow_usage.py
├── setup.py
├── requirements.txt
├── pyproject.toml
└── README.md
Contributing
Contributions are welcome! Please follow these guidelines:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
License
Apache License 2.0 - see LICENSE file for details
Support
For issues, questions, or contributions, please open an issue on GitHub.
Documentation
- README.md: This file - comprehensive user guide
Key Features Summary
✅ 9 Validation Checks - Comprehensive validation coverage
✅ DataFrame Support - Pandas and PySpark integration
✅ REST API Integration - Complete provider module
✅ Multi-Environment Auth - 4 cloud environments supported
✅ Memory Efficient - Chunked processing for Pandas
✅ Distributed Processing - Spark UDF-based validation
✅ Thread-Safe - Concurrent access support
✅ Type-Safe - Full type hints throughout
✅ Extensible - Easy to add new checks
✅ Production Ready - 400+ tests, fully documented
Python Support
- Python 3.8
- Python 3.9
- Python 3.10
- Python 3.11
- Python 3.12
Dependencies
Core:
- pydantic >= 2.12.0
- requests >= 2.28.0
- regex >= 2023.0.0
Optional:
- pandas >= 1.3.0 (for Pandas support)
- pyspark >= 3.0.0 (for PySpark support)
Development:
- pytest >= 7.0.0
- pytest-cov >= 4.0.0
- pytest-mock >= 3.7.0
- black >= 23.0.0
- mypy >= 1.0.0
- flake8 >= 6.0.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file data_intelligence_sdk-0.5.3.tar.gz.
File metadata
- Download URL: data_intelligence_sdk-0.5.3.tar.gz
- Upload date:
- Size: 90.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f3c4f77765d087d684c876c43556d5084268ff61a14b6b8d29e88f4b83e14fc2
|
|
| MD5 |
fae6043e674dac589d315c117b82fbde
|
|
| BLAKE2b-256 |
958061bc2fd68b96b820cbd487aa2a477739887a18738e45d9518e062ef178c6
|
File details
Details for the file data_intelligence_sdk-0.5.3-py3-none-any.whl.
File metadata
- Download URL: data_intelligence_sdk-0.5.3-py3-none-any.whl
- Upload date:
- Size: 112.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bc3c5ef54fb4adb9698bf53b75fe2034602955a97b7fdfc26386a27bd38b404a
|
|
| MD5 |
f13dc4dda5ecb2a8be9eb52d4ff31524
|
|
| BLAKE2b-256 |
a2b9a6b4a1ca4a0f89f06e46564f2b6e6a6e6a880762819c285aa94781c38e10
|