Skip to main content

Data exchange agent for migrations and validation

Project description

Snowflake Data Exchange Agent

License Apache-2.0 Python

A REST API service for database migrations and data validation. Supports multiple databases including Snowflake, PostgreSQL, and SQL Server with queue-based task processing.

Quick Start

# Install
pip install snowflake-data-exchange-agent

# Run
data-exchange-agent --port 8080

# Test
curl http://localhost:8080/health

Installation

From PyPI (Production)

pip install snowflake-data-exchange-agent

Requirements & Dependencies

Python Version: 3.10, 3.11, or 3.12 (3.13 not yet supported)

Available dependency groups:

  • development: Testing and development tools (pytest, ruff, etc.)
  • all: Includes all development dependencies

Core dependencies include:

  • Snowflake Connector for Python
  • PySpark for data processing
  • Flask + Waitress for REST API
  • PostgreSQL support (psycopg2-binary)
  • AWS SDK (boto3)

Configuration

Create src/data_exchange_agent/configuration.toml:

selected_task_source = "api"

[application]
workers = 4
task_fetch_interval = 120
debug_mode = false

[task_source.api]
key = "api-key"

[connections.source.<sqlserver|postgresql|teradata>]
username = "username"
password = "password"
database = "database_name"
host = "127.0.0.1"
port = <1433|5432|1025>

[connections.target.snowflake_connection_name]
connection_name = "connection_name"

[connections.target.s3]
profile_name = "profile_name"
bucket_name = "bucket_name"

[connections.target.blob]
connection_string = "DefaultEndpointsProtocol=https;AccountName=account_name;AccountKey=account_key;EndpointSuffix=core.windows.net"
container_name = "container_name"
# Optional: Account name and use_default_credential if not using connection string
account_name="storage_account_name"
use_default_credential=<True|False>

For Snowflake, create ~/.snowflake/config.toml:

[connections.default]
account = "your_account.region"
user = "your_username"
password = "your_password"
warehouse = "COMPUTE_WH"
database = "PRODUCTION_DB"

API Usage

Command Line

# Basic usage
data-exchange-agent

# Production settings
data-exchange-agent --workers 8 --port 8080

# Debug mode
data-exchange-agent --debug --port 5001

Health Check

GET /health
{
  "status": "healthy",
  "version": "0.0.18",
  "database_connections": {
    "snowflake": "connected"
  }
}

Task Management

# Start processing
GET /handle_tasks

# Stop processing
GET /stop

# Get status
GET /get_handling_tasks_status

# Task count
GET /get_tasks_count

Add Task

POST /tasks
Content-Type: application/json
{
  "task_type": "data_extraction",
  "source_config": {
    "database": "postgresql",
    "query": "SELECT * FROM users"
  },
  "destination_config": {
    "type": "snowflake_stage",
    "stage": "@data_stage/users/"
  }
}

Development

Setup

git clone https://github.com/snowflakedb/migrations-data-validation.git
cd migrations-data-validation/data-exchange-agent
pip install -e .[development]

Testing

# Run all tests
pytest

# With coverage
pytest --cov=src/data_exchange_agent

# Run specific test types
pytest tests/unit/           # Unit tests only
pytest -m "not integration" # Non-integration tests

Code Quality

# Format code
ruff format .

# Lint code
ruff check .

# Auto-fix linting issues
ruff check --fix .

🔌 Extending the System

Adding a New Bulk Utility

Bulk utilities are command-line tools used to efficiently export data from databases (e.g., BCP for SQL Server). Follow these steps to add a new bulk utility:

1. Define the Bulk Utility Type

Add your new utility to the BulkUtilityType enum in src/data_exchange_agent/data_sources/bulk_utility_types.py:

class BulkUtilityType(str, Enum):
    BCP = "bcp"
    YOUR_UTILITY = "your_utility_name"  # Add this line

2. Create Configuration Class

Create a new configuration class in src/data_exchange_agent/config/sections/bulk_utilities/your_utility.py:

from data_exchange_agent.config.sections.bulk_utilities.base import BaseBulkUtilityConfig

class YourUtilityConfig(BaseBulkUtilityConfig):
    """Configuration class for YourUtility bulk utility settings."""

    def __init__(
        self,
        # Add utility-specific parameters
        utility_specific_parameter: str = "default_param",
    ) -> None:
        """Initialize YourUtility configuration."""
        self.utility_specific_parameter = utility_specific_parameter

    def _custom_validation(self) -> str | None:
        """Validate configuration parameters."""
        if not self.utility_specific_parameter:
            return "Utility specific parameter cannot be empty."
        return None

    def __repr__(self) -> str:
        """Return string representation."""
        return f"YourUtilityConfig(utility_specific_parameter='{self.utility_specific_parameter}')"

3. Register the Bulk Utility

Register your utility in src/data_exchange_agent/config/sections/bulk_utilities/__init__.py:

from data_exchange_agent.config.sections.bulk_utilities.your_utility import YourUtilityConfig
from data_exchange_agent.constants.connection_types import ConnectionType

# Add to registry
BulkUtilityRegistry.register(ConnectionType.YOUR_UTILITY, YourUtilityConfig) # Add this

# Add to __all__
__all__ = [
    "BaseBulkUtilityConfig",
    "BulkUtilityRegistry",
    "BCPBulkUtilityConfig",
    "YourUtilityConfig",  # Add this
]

4. Update ConnectionType Enum

Add your utility type to src/data_exchange_agent/constants/connection_types.py:

class ConnectionType(str, Enum):
    # Bulk utilities
    BCP = BulkUtilityType.BCP.value
    YOUR_UTILITY = BulkUtilityType.YOUR_UTILITY.value  # Add this

5. Create Data Source Implementation

Create the data source class in src/data_exchange_agent/data_sources/your_utility_data_source.py:

from data_exchange_agent.data_sources.base import BaseDataSource
from data_exchange_agent.data_sources.bulk_utility_types import BulkUtilityType

class YourUtilityDataSource(BaseDataSource):
    """Data source implementation for YourUtility."""

    @inject
    def __init__(
        self,
        engine: str,
        statement: str,
        results_folder_path: str = None,
        base_file_name: str = "result",
        logger: SFLogger = Provide[container_keys.SF_LOGGER],
        program_config: ConfigManager = Provide[container_keys.PROGRAM_CONFIG],
    ) -> None:
        """Initialize YourUtilityDataSource."""
        self.logger = logger
        self._statement = statement

        # Get configuration
        bulk_utility_config = program_config[config_keys.BULK_UTILITY]
        utility_config = bulk_utility_config.get(BulkUtilityType.YOUR_UTILITY, None)

        # Use config values or defaults
        self.utility_specific_parameter = utility_config.utility_specific_parameter if utility_config else "default_param"

    def export_data(self) -> bool:
        """Export data using your utility command."""
        # Implement the export logic
        pass

6. Add Configuration to TOML

Users can now configure your bulk utility in configuration.toml:

[bulk_utility.your_utility_name]
# Add your custom parameters
utility_specific_parameter = "param"

7. Write Tests

Create tests in tests/data_sources/test_your_utility_data_source.py to verify functionality.

Example: BCP Implementation

See the existing BCP implementation for reference:

  • Config: src/data_exchange_agent/config/sections/bulk_utilities/bcp.py
  • Data Source: src/data_exchange_agent/data_sources/bcp_data_source.py
  • Configuration example in configuration_example.toml

🤝 Contributing

We welcome contributions! See our Contributing Guide for details on how to collaborate, set up your development environment, and submit PRs.


📄 License

This project is licensed under the Apache License 2.0. See the LICENSE file for details.

🆘 Support


Developed with ❄️ by Snowflake

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snowflake_data_exchange_agent-1.1.1.tar.gz (131.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

snowflake_data_exchange_agent-1.1.1-py3-none-any.whl (94.8 kB view details)

Uploaded Python 3

File details

Details for the file snowflake_data_exchange_agent-1.1.1.tar.gz.

File metadata

File hashes

Hashes for snowflake_data_exchange_agent-1.1.1.tar.gz
Algorithm Hash digest
SHA256 552704a134f31f544fe6981610ed780617e65ebf1339b8817587721f0079c06f
MD5 4120edc60438c1578f8b0bb958f79057
BLAKE2b-256 ed260a92106447db7dbacb03782184347978722c6d508b22fa2d934d665dc291

See more details on using hashes here.

File details

Details for the file snowflake_data_exchange_agent-1.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for snowflake_data_exchange_agent-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5ca03ec66046b025d67472ef3021578dd5225a966f434f2478a27b35d26bffe5
MD5 87a850c943967e8366fee4d1a70f3da1
BLAKE2b-256 9ef5935a0fc3611d02a7a4cc02007df8d12b75a316fb3c38bf2d148c5380cf85

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page