Skip to main content

Data exchange agent for migrations and validation

Project description

Snowflake Data Exchange Agent

License Apache-2.0 Python

A REST API service for database migrations and data validation. Supports multiple databases including Snowflake, PostgreSQL, and SQL Server with queue-based task processing.

Quick Start

# Install
pip install snowflake-data-exchange-agent

# Run
data-exchange-agent --port 8080

# Test
curl http://localhost:8080/health

Installation

From PyPI (Production)

pip install snowflake-data-exchange-agent

Requirements & Dependencies

Python Version: 3.10, 3.11, or 3.12 (3.13 not yet supported)

Available dependency groups:

  • development: Testing and development tools (pytest, ruff, etc.)
  • all: Includes all development dependencies

Core dependencies include:

  • Snowflake Connector for Python
  • PySpark for data processing
  • Flask + Waitress for REST API
  • PostgreSQL support (psycopg2-binary)
  • AWS SDK (boto3)

Configuration

Create src/data_exchange_agent/configuration.toml:

selected_task_source = "api"

[application]
workers = 4
task_fetch_interval = 5
debug_mode = false

[task_source.api]
key = "api-key"

[connections.source.<sqlserver|postgresql|teradata>]
username = "username"
password = "password"
database = "database_name"
host = "127.0.0.1"
port = <1433|5432|1025>

[connections.target.snowflake_connection_name]
connection_name = "connection_name"

[connections.target.s3]
profile_name = "profile_name"
bucket_name = "bucket_name"

[connections.target.blob]
connection_string = "DefaultEndpointsProtocol=https;AccountName=account_name;AccountKey=account_key;EndpointSuffix=core.windows.net"
container_name = "container_name"
# Optional: Account name and use_default_credential if not using connection string
account_name="storage_account_name"
use_default_credential=<True|False>

For Snowflake, create ~/.snowflake/config.toml:

[connections.default]
account = "your_account.region"
user = "your_username"
password = "your_password"
warehouse = "COMPUTE_WH"
database = "PRODUCTION_DB"

API Usage

Command Line

# Basic usage
data-exchange-agent

# Production settings
data-exchange-agent --workers 8 --port 8080

# Debug mode
data-exchange-agent --debug --port 5001

Health Check

GET /health
{
  "status": "healthy",
  "version": "0.0.18",
  "database_connections": {
    "snowflake": "connected"
  }
}

Task Management

# Start processing
GET /handle_tasks

# Stop processing
GET /stop

# Get status
GET /get_handling_tasks_status

# Task count
GET /get_tasks_count

Add Task

POST /tasks
Content-Type: application/json
{
  "task_type": "data_extraction",
  "source_config": {
    "database": "postgresql",
    "query": "SELECT * FROM users"
  },
  "destination_config": {
    "type": "snowflake_stage",
    "stage": "@data_stage/users/"
  }
}

Development

Setup

git clone https://github.com/snowflakedb/migrations-data-validation.git
cd migrations-data-validation/data-exchange-agent
pip install -e .[development]

Testing

# Run all tests
pytest

# With coverage
pytest --cov=src/data_exchange_agent

# Run specific test types
pytest tests/unit/           # Unit tests only
pytest -m "not integration" # Non-integration tests

Code Quality

# Format code
ruff format .

# Lint code
ruff check .

# Auto-fix linting issues
ruff check --fix .

🐳 Docker

The Data Exchange Agent can be run in a Docker container with configuration injected via environment variables at runtime.

Building the Image

cd data-exchange-agent
docker build -t data-exchange-agent .

How It Works

The Dockerfile uses configuration templates that are processed at container startup:

  1. docker-artifacts/configuration.template.toml - Agent configuration template
  2. docker-artifacts/snowflake.config.template.toml - Snowflake connection template
  3. docker-artifacts/docker-entrypoint.sh - Entrypoint script that uses envsubst to substitute environment variables into the templates before starting the agent

This approach ensures that sensitive credentials (passwords) are never baked into the Docker image—they are only injected at runtime.

Environment Variables

Data Source Configuration (Required for database connections)

Variable Description Default
DATA_SOURCE_USERNAME Username for the source database -
DATA_SOURCE_PASSWORD Password for the source database -
DATA_SOURCE_HOST Hostname of the source database -
DATA_SOURCE_PORT Port of the source database 1433
DATA_SOURCE_DATABASE Database name on the source -

Snowflake Connection Configuration (Required for Snowflake integration)

Variable Description Default
SNOWFLAKE_ACCOUNT Snowflake account identifier (e.g., myaccount.us-west-2.aws) -
SNOWFLAKE_USER Snowflake username -
SNOWFLAKE_PASSWORD Snowflake password -
SNOWFLAKE_WAREHOUSE Snowflake warehouse name -
SNOWFLAKE_ROLE Snowflake role -
SNOWFLAKE_DATABASE Default Snowflake database -
SNOWFLAKE_SCHEMA Default Snowflake schema -

Application Configuration

Variable Description Default
AGENT_AFFINITY Agent affinity label for task routing (required) -
WORKER_COUNT Number of worker threads 1

Running the Container

docker run -p 5000:5000 \
  -e DATA_SOURCE_USERNAME="db_user" \
  -e DATA_SOURCE_PASSWORD="db_password" \
  -e DATA_SOURCE_HOST="db.example.com" \
  -e DATA_SOURCE_PORT="1433" \
  -e DATA_SOURCE_DATABASE="mydb" \
  -e SNOWFLAKE_ACCOUNT="myaccount.us-west-2.aws" \
  -e SNOWFLAKE_USER="snowflake_user" \
  -e SNOWFLAKE_PASSWORD="snowflake_password" \
  -e SNOWFLAKE_WAREHOUSE="COMPUTE_WH" \
  -e SNOWFLAKE_ROLE="DATA_ENGINEER" \
  -e SNOWFLAKE_DATABASE="PROD_DB" \
  -e SNOWFLAKE_SCHEMA="PUBLIC" \
  -e AGENT_AFFINITY="blue" \
  -e WORKER_COUNT="8" \
  data-exchange-agent

You can also pass additional arguments to the agent:

docker run -p 8080:8080 \
  -e DATA_SOURCE_PASSWORD="secret" \
  -e SNOWFLAKE_PASSWORD="secret" \
  # ... other env vars ...
  data-exchange-agent --port 8080 --debug

Running in Snowpark Container Services (SPCS)

When deploying the Data Exchange Agent in Snowpark Container Services, you can use the special @SPCS_CONNECTION connection name to automatically use Snowflake-provided credentials.

How It Works

When running in SPCS, Snowflake automatically provides:

  • An OAuth token at /snowflake/session/token
  • Environment variables: SNOWFLAKE_HOST, SNOWFLAKE_ACCOUNT, SNOWFLAKE_DATABASE, SNOWFLAKE_SCHEMA

The @SPCS_CONNECTION feature reads these credentials automatically, so you don't need to configure Snowflake passwords or account details manually.

Note: SNOWFLAKE_WAREHOUSE is not provided by SPCS. You can set it manually via environment variable or use the QUERY_WAREHOUSE parameter when creating the service.

Configuration

By default, the Docker image uses @SPCS_CONNECTION. No additional Snowflake configuration is needed:

[task_source.snowflake_stored_procedure]
connection_name = "@SPCS_CONNECTION"

[connections.target.snowflake_connection_name]
connection_name = "@SPCS_CONNECTION"

Environment Variables for SPCS

Variable Description Default
SNOWFLAKE_CONNECTION_NAME Connection mode: @SPCS_CONNECTION for SPCS credentials, or a named connection from ~/.snowflake/config.toml @SPCS_CONNECTION
SNOWFLAKE_WAREHOUSE Warehouse for queries (not provided by SPCS, must be set manually) -

Switching to Manual Credentials

If you need to use traditional Snowflake credentials instead of SPCS-provided ones (e.g., for testing outside SPCS), set the SNOWFLAKE_CONNECTION_NAME environment variable:

# Use a named connection from ~/.snowflake/config.toml
docker run \
  -e SNOWFLAKE_CONNECTION_NAME="MY_SNOWFLAKE_CONNECTION" \
  -e SNOWFLAKE_ACCOUNT="myaccount.us-west-2.aws" \
  -e SNOWFLAKE_USER="user" \
  -e SNOWFLAKE_PASSWORD="password" \
  # ... other env vars ...
  data-exchange-agent

Example SPCS Service Definition

CREATE SERVICE data_exchange_agent
  IN COMPUTE POOL my_compute_pool
  QUERY_WAREHOUSE = MY_WAREHOUSE
  FROM SPECIFICATION $$
  spec:
    containers:
    - name: agent
      image: /my_db/my_schema/my_repo/data-exchange-agent:latest
      env:
        DATA_SOURCE_HOST: "source-db.example.com"
        DATA_SOURCE_PORT: "1433"
        DATA_SOURCE_DATABASE: "mydb"
        DATA_SOURCE_USERNAME: "user"
        AGENT_AFFINITY: "spcs-agent"
        WORKER_COUNT: "4"
      secrets:
      - snowflakeSecret: my_db_password_secret
        secretKeyRef: password
        envVarName: DATA_SOURCE_PASSWORD
  $$;

For more details, see the Snowflake SPCS documentation.

🔌 Extending the System

Adding a New Bulk Utility

Bulk utilities are command-line tools used to efficiently export data from databases (e.g., BCP for SQL Server). Follow these steps to add a new bulk utility:

1. Define the Bulk Utility Type

Add your new utility to the BulkUtilityType enum in src/data_exchange_agent/data_sources/bulk_utility_types.py:

class BulkUtilityType(str, Enum):
    BCP = "bcp"
    YOUR_UTILITY = "your_utility_name"  # Add this line

2. Create Configuration Class

Create a new configuration class in src/data_exchange_agent/config/sections/bulk_utilities/your_utility.py:

from data_exchange_agent.config.sections.bulk_utilities.base import BaseBulkUtilityConfig

class YourUtilityConfig(BaseBulkUtilityConfig):
    """Configuration class for YourUtility bulk utility settings."""

    def __init__(
        self,
        # Add utility-specific parameters
        utility_specific_parameter: str = "default_param",
    ) -> None:
        """Initialize YourUtility configuration."""
        self.utility_specific_parameter = utility_specific_parameter

    def _custom_validation(self) -> str | None:
        """Validate configuration parameters."""
        if not self.utility_specific_parameter:
            return "Utility specific parameter cannot be empty."
        return None

    def __repr__(self) -> str:
        """Return string representation."""
        return f"YourUtilityConfig(utility_specific_parameter='{self.utility_specific_parameter}')"

3. Register the Bulk Utility

Register your utility in src/data_exchange_agent/config/sections/bulk_utilities/__init__.py:

from data_exchange_agent.config.sections.bulk_utilities.your_utility import YourUtilityConfig
from data_exchange_agent.constants.connection_types import ConnectionType

# Add to registry
BulkUtilityRegistry.register(ConnectionType.YOUR_UTILITY, YourUtilityConfig) # Add this

# Add to __all__
__all__ = [
    "BaseBulkUtilityConfig",
    "BulkUtilityRegistry",
    "BCPBulkUtilityConfig",
    "YourUtilityConfig",  # Add this
]

4. Update ConnectionType Enum

Add your utility type to src/data_exchange_agent/constants/connection_types.py:

class ConnectionType(str, Enum):
    # Bulk utilities
    BCP = BulkUtilityType.BCP.value
    YOUR_UTILITY = BulkUtilityType.YOUR_UTILITY.value  # Add this

5. Create Data Source Implementation

Create the data source class in src/data_exchange_agent/data_sources/your_utility_data_source.py:

from data_exchange_agent.data_sources.base import BaseDataSource
from data_exchange_agent.data_sources.bulk_utility_types import BulkUtilityType

class YourUtilityDataSource(BaseDataSource):
    """Data source implementation for YourUtility."""

    @inject
    def __init__(
        self,
        engine: str,
        statement: str,
        results_folder_path: str = None,
        base_file_name: str = "result",
        logger: SFLogger = Provide[container_keys.SF_LOGGER],
        program_config: ConfigManager = Provide[container_keys.PROGRAM_CONFIG],
    ) -> None:
        """Initialize YourUtilityDataSource."""
        self.logger = logger
        self._statement = statement

        # Get configuration
        bulk_utility_config = program_config[config_keys.BULK_UTILITY]
        utility_config = bulk_utility_config.get(BulkUtilityType.YOUR_UTILITY, None)

        # Use config values or defaults
        self.utility_specific_parameter = utility_config.utility_specific_parameter if utility_config else "default_param"

    def export_data(self) -> bool:
        """Export data using your utility command."""
        # Implement the export logic
        pass

6. Add Configuration to TOML

Users can now configure your bulk utility in configuration.toml:

[bulk_utility.your_utility_name]
# Add your custom parameters
utility_specific_parameter = "param"

7. Write Tests

Create tests in tests/data_sources/test_your_utility_data_source.py to verify functionality.

Example: BCP Implementation

See the existing BCP implementation for reference:

  • Config: src/data_exchange_agent/config/sections/bulk_utilities/bcp.py
  • Data Source: src/data_exchange_agent/data_sources/bcp_data_source.py
  • Configuration example in configuration_example.toml

🤝 Contributing

We welcome contributions! See our Contributing Guide for details on how to collaborate, set up your development environment, and submit PRs.


📄 License

This project is licensed under the Apache License 2.0. See the LICENSE file for details.

🆘 Support


Developed with ❄️ by Snowflake

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snowflake_data_exchange_agent-1.2.0.tar.gz (139.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

snowflake_data_exchange_agent-1.2.0-py3-none-any.whl (104.9 kB view details)

Uploaded Python 3

File details

Details for the file snowflake_data_exchange_agent-1.2.0.tar.gz.

File metadata

File hashes

Hashes for snowflake_data_exchange_agent-1.2.0.tar.gz
Algorithm Hash digest
SHA256 7a76214673c23ebfc3f4593e5a691e8d0b9758c35d1992056ca9fdfeb5722f5f
MD5 eb7022cbc09440ac9933b45d5a0f97a3
BLAKE2b-256 aa5cc634c49690bb9a5739598b184271f35b297e82d30ae73f7e7121f66c40ed

See more details on using hashes here.

File details

Details for the file snowflake_data_exchange_agent-1.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for snowflake_data_exchange_agent-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 17d32fc40df5b1c087b975e8bf92a70e32e4bc8643db01454d8ff4378f75fcd9
MD5 31222d57015ef5010bbc84de0b156b44
BLAKE2b-256 af493865024a38ef81c3738fb47efe630d166881d3ece55a482c06c71a70701b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page