Skip to main content

Data exchange agent for migrations and validation

Project description

Snowflake Data Exchange Agent

License Apache-2.0 Python

A REST API service for database migrations and data validation. Supports Snowflake, SQL Server, and Amazon Redshift with queue-based task processing. The architecture is extensible to support additional database types in the future.

Quick Start

# Install
pip install snowflake-data-exchange-agent

# Run
data-exchange-agent --port 8080

# Test
curl http://localhost:8080/health

Installation

From PyPI (Production)

pip install snowflake-data-exchange-agent

Requirements & Dependencies

Python Version: 3.10, 3.11, or 3.12 (3.13 not yet supported)

Available dependency groups:

  • development: Testing and development tools (pytest, ruff, etc.)
  • all: Includes all development dependencies

Core dependencies include:

  • Snowflake Connector for Python
  • PySpark for data processing
  • Flask + Waitress for REST API
  • ODBC support (pyodbc) for SQL Server
  • AWS SDK (boto3)

Configuration

Create src/data_exchange_agent/configuration.toml:

selected_task_source = "api"

[application]
workers = 4
task_fetch_interval = 5
debug_mode = false

[task_source.api]
key = "api-key"

# SQL Server connection (standard authentication)
[connections.source.sqlserver]
username = "username"
password = "password"
database = "database_name"
host = "127.0.0.1"
port = 1433

# Amazon Redshift connection (IAM authentication for provisioned cluster)
[connections.source.redshift]
username = "demo-user"
database = "snowconvert_demo"
auth_method = "iam-provisioned-cluster"
cluster_id = "migrations-aws"
region = "us-west-2"
access_key_id = "your-access-key-id"
secret_access_key = "your-secret-access-key"

# Amazon Redshift connection (standard authentication)
# [connections.source.redshift]
# username = "myuser"
# password = "mypassword"
# database = "mydatabase"
# host = "my-cluster.abcdef123456.us-west-2.redshift.amazonaws.com"
# port = 5439
# auth_method = "standard"

[connections.target.snowflake_connection_name]
connection_name = "connection_name"

[connections.target.s3]
profile_name = "profile_name"
bucket_name = "bucket_name"

[connections.target.blob]
connection_string = "DefaultEndpointsProtocol=https;AccountName=account_name;AccountKey=account_key;EndpointSuffix=core.windows.net"
container_name = "container_name"
# Optional: Account name and use_default_credential if not using connection string
account_name="storage_account_name"
use_default_credential=<True|False>

For Snowflake, create ~/.snowflake/config.toml:

[connections.default]
account = "your_account.region"
user = "your_username"
password = "your_password"
warehouse = "COMPUTE_WH"
database = "PRODUCTION_DB"

API Usage

Command Line

# Basic usage
data-exchange-agent

# Production settings
data-exchange-agent --workers 8 --port 8080

# Debug mode
data-exchange-agent --debug --port 5001

Health Check

GET /health
{
  "status": "healthy",
  "version": "0.0.18",
  "database_connections": {
    "snowflake": "connected"
  }
}

Task Management

# Start processing
GET /handle_tasks

# Stop processing
GET /stop

# Get status
GET /get_handling_tasks_status

# Task count
GET /get_tasks_count

Add Task

POST /tasks
Content-Type: application/json
{
  "task_type": "data_extraction",
  "source_config": {
    "database": "sqlserver",
    "query": "SELECT * FROM users"
  },
  "destination_config": {
    "type": "snowflake_stage",
    "stage": "@data_stage/users/"
  }
}

Development

Setup

git clone https://github.com/snowflakedb/migrations-data-validation.git
cd migrations-data-validation/data-exchange-agent
pip install -e .[development]

Testing

# Run all tests
pytest

# With coverage
pytest --cov=src/data_exchange_agent

# Run specific test types
pytest tests/unit/           # Unit tests only
pytest -m "not integration" # Non-integration tests

Code Quality

# Format code
ruff format .

# Lint code
ruff check .

# Auto-fix linting issues
ruff check --fix .

🐳 Docker

The Data Exchange Agent can be run in a Docker container with configuration injected via environment variables at runtime.

Building the Image

cd data-exchange-agent
docker build -t data-exchange-agent .

How It Works

The Dockerfile uses configuration templates that are processed at container startup:

  1. docker-artifacts/configuration.template.toml - Agent configuration template
  2. docker-artifacts/snowflake.config.template.toml - Snowflake connection template
  3. docker-artifacts/docker-entrypoint.sh - Entrypoint script that uses envsubst to substitute environment variables into the templates before starting the agent

This approach ensures that sensitive credentials (passwords) are never baked into the Docker image—they are only injected at runtime.

Environment Variables

Data Source Configuration (Required for database connections)

Variable Description Default
DATA_SOURCE_USERNAME Username for the source database -
DATA_SOURCE_PASSWORD Password for the source database -
DATA_SOURCE_HOST Hostname of the source database -
DATA_SOURCE_PORT Port of the source database 1433
DATA_SOURCE_DATABASE Database name on the source -

Snowflake Connection Configuration (Required for Snowflake integration)

Variable Description Default
SNOWFLAKE_ACCOUNT Snowflake account identifier (e.g., myaccount.us-west-2.aws) -
SNOWFLAKE_USER Snowflake username -
SNOWFLAKE_PASSWORD Snowflake password -
SNOWFLAKE_WAREHOUSE Snowflake warehouse name -
SNOWFLAKE_ROLE Snowflake role -
SNOWFLAKE_DATABASE Default Snowflake database -
SNOWFLAKE_SCHEMA Default Snowflake schema -

Application Configuration

Variable Description Default
AGENT_AFFINITY Agent affinity label for task routing (required) -
WORKER_COUNT Number of worker threads 1

Running the Container

docker run -p 5000:5000 \
  -e DATA_SOURCE_USERNAME="db_user" \
  -e DATA_SOURCE_PASSWORD="db_password" \
  -e DATA_SOURCE_HOST="db.example.com" \
  -e DATA_SOURCE_PORT="1433" \
  -e DATA_SOURCE_DATABASE="mydb" \
  -e SNOWFLAKE_ACCOUNT="myaccount.us-west-2.aws" \
  -e SNOWFLAKE_USER="snowflake_user" \
  -e SNOWFLAKE_PASSWORD="snowflake_password" \
  -e SNOWFLAKE_WAREHOUSE="COMPUTE_WH" \
  -e SNOWFLAKE_ROLE="DATA_ENGINEER" \
  -e SNOWFLAKE_DATABASE="PROD_DB" \
  -e SNOWFLAKE_SCHEMA="PUBLIC" \
  -e AGENT_AFFINITY="blue" \
  -e WORKER_COUNT="8" \
  data-exchange-agent

You can also pass additional arguments to the agent:

docker run -p 8080:8080 \
  -e DATA_SOURCE_PASSWORD="secret" \
  -e SNOWFLAKE_PASSWORD="secret" \
  # ... other env vars ...
  data-exchange-agent --port 8080 --debug

Running in Snowpark Container Services (SPCS)

When deploying the Data Exchange Agent in Snowpark Container Services, you can use the special @SPCS_CONNECTION connection name to automatically use Snowflake-provided credentials.

How It Works

When running in SPCS, Snowflake automatically provides:

  • An OAuth token at /snowflake/session/token
  • Environment variables: SNOWFLAKE_HOST, SNOWFLAKE_ACCOUNT, SNOWFLAKE_DATABASE, SNOWFLAKE_SCHEMA

The @SPCS_CONNECTION feature reads these credentials automatically, so you don't need to configure Snowflake passwords or account details manually.

Note: SNOWFLAKE_WAREHOUSE is not provided by SPCS. You can set it manually via environment variable or use the QUERY_WAREHOUSE parameter when creating the service.

Configuration

By default, the Docker image uses @SPCS_CONNECTION. No additional Snowflake configuration is needed:

[task_source.snowflake_stored_procedure]
connection_name = "@SPCS_CONNECTION"

[connections.target.snowflake_connection_name]
connection_name = "@SPCS_CONNECTION"

Environment Variables for SPCS

Variable Description Default
SNOWFLAKE_CONNECTION_NAME Connection mode: @SPCS_CONNECTION for SPCS credentials, or a named connection from ~/.snowflake/config.toml @SPCS_CONNECTION
SNOWFLAKE_WAREHOUSE Warehouse for queries (not provided by SPCS, must be set manually) -

Switching to Manual Credentials

If you need to use traditional Snowflake credentials instead of SPCS-provided ones (e.g., for testing outside SPCS), set the SNOWFLAKE_CONNECTION_NAME environment variable:

# Use a named connection from ~/.snowflake/config.toml
docker run \
  -e SNOWFLAKE_CONNECTION_NAME="MY_SNOWFLAKE_CONNECTION" \
  -e SNOWFLAKE_ACCOUNT="myaccount.us-west-2.aws" \
  -e SNOWFLAKE_USER="user" \
  -e SNOWFLAKE_PASSWORD="password" \
  # ... other env vars ...
  data-exchange-agent

Example SPCS Service Definition

CREATE SERVICE data_exchange_agent
  IN COMPUTE POOL my_compute_pool
  QUERY_WAREHOUSE = MY_WAREHOUSE
  FROM SPECIFICATION $$
  spec:
    containers:
    - name: agent
      image: /my_db/my_schema/my_repo/data-exchange-agent:latest
      env:
        DATA_SOURCE_HOST: "source-db.example.com"
        DATA_SOURCE_PORT: "1433"
        DATA_SOURCE_DATABASE: "mydb"
        DATA_SOURCE_USERNAME: "user"
        AGENT_AFFINITY: "spcs-agent"
        WORKER_COUNT: "4"
      secrets:
      - snowflakeSecret: my_db_password_secret
        secretKeyRef: password
        envVarName: DATA_SOURCE_PASSWORD
  $$;

For more details, see the Snowflake SPCS documentation.

🔌 Extending the System

Adding a New Bulk Utility

Bulk utilities are command-line tools used to efficiently export data from databases (e.g., BCP for SQL Server). Follow these steps to add a new bulk utility:

1. Define the Bulk Utility Type

Add your new utility to the BulkUtilityType enum in src/data_exchange_agent/data_sources/bulk_utility_types.py:

class BulkUtilityType(str, Enum):
    BCP = "bcp"
    YOUR_UTILITY = "your_utility_name"  # Add this line

2. Create Configuration Class

Create a new configuration class in src/data_exchange_agent/config/sections/bulk_utilities/your_utility.py:

from data_exchange_agent.config.sections.bulk_utilities.base import BaseBulkUtilityConfig

class YourUtilityConfig(BaseBulkUtilityConfig):
    """Configuration class for YourUtility bulk utility settings."""

    def __init__(
        self,
        # Add utility-specific parameters
        utility_specific_parameter: str = "default_param",
    ) -> None:
        """Initialize YourUtility configuration."""
        self.utility_specific_parameter = utility_specific_parameter

    def _custom_validation(self) -> str | None:
        """Validate configuration parameters."""
        if not self.utility_specific_parameter:
            return "Utility specific parameter cannot be empty."
        return None

    def __repr__(self) -> str:
        """Return string representation."""
        return f"YourUtilityConfig(utility_specific_parameter='{self.utility_specific_parameter}')"

3. Register the Bulk Utility

Register your utility in src/data_exchange_agent/config/sections/bulk_utilities/__init__.py:

from data_exchange_agent.config.sections.bulk_utilities.your_utility import YourUtilityConfig
from data_exchange_agent.constants.connection_types import ConnectionType

# Add to registry
BulkUtilityRegistry.register(ConnectionType.YOUR_UTILITY, YourUtilityConfig) # Add this

# Add to __all__
__all__ = [
    "BaseBulkUtilityConfig",
    "BulkUtilityRegistry",
    "BCPBulkUtilityConfig",
    "YourUtilityConfig",  # Add this
]

4. Update ConnectionType Enum

Add your utility type to src/data_exchange_agent/constants/connection_types.py:

class ConnectionType(str, Enum):
    # Bulk utilities
    BCP = BulkUtilityType.BCP.value
    YOUR_UTILITY = BulkUtilityType.YOUR_UTILITY.value  # Add this

5. Create Data Source Implementation

Create the data source class in src/data_exchange_agent/data_sources/your_utility_data_source.py:

from data_exchange_agent.data_sources.base import BaseDataSource
from data_exchange_agent.data_sources.bulk_utility_types import BulkUtilityType

class YourUtilityDataSource(BaseDataSource):
    """Data source implementation for YourUtility."""

    @inject
    def __init__(
        self,
        engine: str,
        statement: str,
        results_folder_path: str = None,
        base_file_name: str = "result",
        logger: SFLogger = Provide[container_keys.SF_LOGGER],
        program_config: ConfigManager = Provide[container_keys.PROGRAM_CONFIG],
    ) -> None:
        """Initialize YourUtilityDataSource."""
        self.logger = logger
        self._statement = statement

        # Get configuration
        bulk_utility_config = program_config[config_keys.BULK_UTILITY]
        utility_config = bulk_utility_config.get(BulkUtilityType.YOUR_UTILITY, None)

        # Use config values or defaults
        self.utility_specific_parameter = utility_config.utility_specific_parameter if utility_config else "default_param"

    def export_data(self) -> bool:
        """Export data using your utility command."""
        # Implement the export logic
        pass

6. Add Configuration to TOML

Users can now configure your bulk utility in configuration.toml:

[bulk_utility.your_utility_name]
# Add your custom parameters
utility_specific_parameter = "param"

7. Write Tests

Create tests in tests/data_sources/test_your_utility_data_source.py to verify functionality.

Example: BCP Implementation

See the existing BCP implementation for reference:

  • Config: src/data_exchange_agent/config/sections/bulk_utilities/bcp.py
  • Data Source: src/data_exchange_agent/data_sources/bcp_data_source.py
  • Configuration example in configuration_example.toml

Using BCP for Data Extraction

By default, the agent uses JDBC for data extraction. To use BCP (Bulk Copy Program) instead, simply add the [bulk_utility.bcp] section to your configuration.toml:

[bulk_utility.bcp]
delimiter = ";"
row_terminator = "\\n"
encoding = "UTF8"
trusted_connection = false
encrypt = true

How it works: When the agent detects a [bulk_utility.bcp] configuration section, it automatically uses BCP for data extraction instead of JDBC. No additional configuration is needed—the presence of the BCP configuration section enables BCP mode.

Requirements:

  • The BCP utility must be installed and available in the system PATH
  • SQL Server source connection must be configured in [connections.source.sqlserver]

BCP Configuration Options:

Option Description Default
delimiter Field delimiter character(s) ,
row_terminator Row terminator character(s) \n
encoding Character encoding (e.g., UTF8, ACP) UTF8
trusted_connection Use Windows authentication false
encrypt Encrypt the connection true

Note: To switch back to JDBC, simply remove or comment out the [bulk_utility.bcp] section from your configuration.


🤝 Contributing

We welcome contributions! See our Contributing Guide for details on how to collaborate, set up your development environment, and submit PRs.


📄 License

This project is licensed under the Apache License 2.0. See the LICENSE file for details.

🆘 Support


Developed with ❄️ by Snowflake

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snowflake_data_exchange_agent-1.3.1.tar.gz (147.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

snowflake_data_exchange_agent-1.3.1-py3-none-any.whl (133.1 kB view details)

Uploaded Python 3

File details

Details for the file snowflake_data_exchange_agent-1.3.1.tar.gz.

File metadata

File hashes

Hashes for snowflake_data_exchange_agent-1.3.1.tar.gz
Algorithm Hash digest
SHA256 592c4554f51140ebeb9db6b970fa10d98e07f1e7969d8e87a671444167cf6e7e
MD5 9b775afcc3cf57fb651864c9623139ac
BLAKE2b-256 0430c108ee536cd17c947db3098454b197d825ba46916df6ea05e2d50de30111

See more details on using hashes here.

File details

Details for the file snowflake_data_exchange_agent-1.3.1-py3-none-any.whl.

File metadata

File hashes

Hashes for snowflake_data_exchange_agent-1.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1825b65d6e467e2ff704f4b7c24e5cb95312594dafb681bed6c18917c13f86a7
MD5 383cd8cd784038dbadb886e5a1500963
BLAKE2b-256 a44ff0f04b7eead59d05d226bcee89107991081d75f329b7c3a940282d45dc89

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page