Data exchange agent for migrations and validation
Project description
Snowflake Data Exchange Agent
A REST API service for database migrations and data validation. Supports multiple databases including Snowflake, PostgreSQL, and SQL Server with queue-based task processing.
Quick Start
# Install
pip install snowflake-data-exchange-agent
# Run
data-exchange-agent --port 8080
# Test
curl http://localhost:8080/health
Installation
From PyPI (Production)
pip install snowflake-data-exchange-agent
Requirements & Dependencies
Python Version: 3.10, 3.11, or 3.12 (3.13 not yet supported)
Available dependency groups:
development: Testing and development tools (pytest, ruff, etc.)all: Includes all development dependencies
Core dependencies include:
- Snowflake Connector for Python
- PySpark for data processing
- Flask + Waitress for REST API
- PostgreSQL support (psycopg2-binary)
- AWS SDK (boto3)
Configuration
Create src/data_exchange_agent/configuration.toml:
selected_task_source = "api"
[application]
workers = 4
task_fetch_interval = 5
debug_mode = false
[task_source.api]
key = "api-key"
[connections.source.<sqlserver|postgresql|teradata>]
username = "username"
password = "password"
database = "database_name"
host = "127.0.0.1"
port = <1433|5432|1025>
[connections.target.snowflake_connection_name]
connection_name = "connection_name"
[connections.target.s3]
profile_name = "profile_name"
bucket_name = "bucket_name"
[connections.target.blob]
connection_string = "DefaultEndpointsProtocol=https;AccountName=account_name;AccountKey=account_key;EndpointSuffix=core.windows.net"
container_name = "container_name"
# Optional: Account name and use_default_credential if not using connection string
account_name="storage_account_name"
use_default_credential=<True|False>
For Snowflake, create ~/.snowflake/config.toml:
[connections.default]
account = "your_account.region"
user = "your_username"
password = "your_password"
warehouse = "COMPUTE_WH"
database = "PRODUCTION_DB"
API Usage
Command Line
# Basic usage
data-exchange-agent
# Production settings
data-exchange-agent --workers 8 --port 8080
# Debug mode
data-exchange-agent --debug --port 5001
Health Check
GET /health
{
"status": "healthy",
"version": "0.0.18",
"database_connections": {
"snowflake": "connected"
}
}
Task Management
# Start processing
GET /handle_tasks
# Stop processing
GET /stop
# Get status
GET /get_handling_tasks_status
# Task count
GET /get_tasks_count
Add Task
POST /tasks
Content-Type: application/json
{
"task_type": "data_extraction",
"source_config": {
"database": "postgresql",
"query": "SELECT * FROM users"
},
"destination_config": {
"type": "snowflake_stage",
"stage": "@data_stage/users/"
}
}
Development
Setup
git clone https://github.com/snowflakedb/migrations-data-validation.git
cd migrations-data-validation/data-exchange-agent
pip install -e .[development]
Testing
# Run all tests
pytest
# With coverage
pytest --cov=src/data_exchange_agent
# Run specific test types
pytest tests/unit/ # Unit tests only
pytest -m "not integration" # Non-integration tests
Code Quality
# Format code
ruff format .
# Lint code
ruff check .
# Auto-fix linting issues
ruff check --fix .
🐳 Docker
The Data Exchange Agent can be run in a Docker container with configuration injected via environment variables at runtime.
Building the Image
cd data-exchange-agent
docker build -t data-exchange-agent .
How It Works
The Dockerfile uses configuration templates that are processed at container startup:
docker-artifacts/configuration.template.toml- Agent configuration templatedocker-artifacts/snowflake.config.template.toml- Snowflake connection templatedocker-artifacts/docker-entrypoint.sh- Entrypoint script that usesenvsubstto substitute environment variables into the templates before starting the agent
This approach ensures that sensitive credentials (passwords) are never baked into the Docker image—they are only injected at runtime.
Environment Variables
Data Source Configuration (Required for database connections)
| Variable | Description | Default |
|---|---|---|
DATA_SOURCE_USERNAME |
Username for the source database | - |
DATA_SOURCE_PASSWORD |
Password for the source database | - |
DATA_SOURCE_HOST |
Hostname of the source database | - |
DATA_SOURCE_PORT |
Port of the source database | 1433 |
DATA_SOURCE_DATABASE |
Database name on the source | - |
Snowflake Connection Configuration (Required for Snowflake integration)
| Variable | Description | Default |
|---|---|---|
SNOWFLAKE_ACCOUNT |
Snowflake account identifier (e.g., myaccount.us-west-2.aws) |
- |
SNOWFLAKE_USER |
Snowflake username | - |
SNOWFLAKE_PASSWORD |
Snowflake password | - |
SNOWFLAKE_WAREHOUSE |
Snowflake warehouse name | - |
SNOWFLAKE_ROLE |
Snowflake role | - |
SNOWFLAKE_DATABASE |
Default Snowflake database | - |
SNOWFLAKE_SCHEMA |
Default Snowflake schema | - |
Application Configuration
| Variable | Description | Default |
|---|---|---|
AGENT_AFFINITY |
Agent affinity label for task routing (required) | - |
WORKER_COUNT |
Number of worker threads | 1 |
Running the Container
docker run -p 5000:5000 \
-e DATA_SOURCE_USERNAME="db_user" \
-e DATA_SOURCE_PASSWORD="db_password" \
-e DATA_SOURCE_HOST="db.example.com" \
-e DATA_SOURCE_PORT="1433" \
-e DATA_SOURCE_DATABASE="mydb" \
-e SNOWFLAKE_ACCOUNT="myaccount.us-west-2.aws" \
-e SNOWFLAKE_USER="snowflake_user" \
-e SNOWFLAKE_PASSWORD="snowflake_password" \
-e SNOWFLAKE_WAREHOUSE="COMPUTE_WH" \
-e SNOWFLAKE_ROLE="DATA_ENGINEER" \
-e SNOWFLAKE_DATABASE="PROD_DB" \
-e SNOWFLAKE_SCHEMA="PUBLIC" \
-e AGENT_AFFINITY="blue" \
-e WORKER_COUNT="8" \
data-exchange-agent
You can also pass additional arguments to the agent:
docker run -p 8080:8080 \
-e DATA_SOURCE_PASSWORD="secret" \
-e SNOWFLAKE_PASSWORD="secret" \
# ... other env vars ...
data-exchange-agent --port 8080 --debug
Running in Snowpark Container Services (SPCS)
When deploying the Data Exchange Agent in Snowpark Container Services, you can use the special @SPCS_CONNECTION connection name to automatically use Snowflake-provided credentials.
How It Works
When running in SPCS, Snowflake automatically provides:
- An OAuth token at
/snowflake/session/token - Environment variables:
SNOWFLAKE_HOST,SNOWFLAKE_ACCOUNT,SNOWFLAKE_DATABASE,SNOWFLAKE_SCHEMA
The @SPCS_CONNECTION feature reads these credentials automatically, so you don't need to configure Snowflake passwords or account details manually.
Note:
SNOWFLAKE_WAREHOUSEis not provided by SPCS. You can set it manually via environment variable or use theQUERY_WAREHOUSEparameter when creating the service.
Configuration
By default, the Docker image uses @SPCS_CONNECTION. No additional Snowflake configuration is needed:
[task_source.snowflake_stored_procedure]
connection_name = "@SPCS_CONNECTION"
[connections.target.snowflake_connection_name]
connection_name = "@SPCS_CONNECTION"
Environment Variables for SPCS
| Variable | Description | Default |
|---|---|---|
SNOWFLAKE_CONNECTION_NAME |
Connection mode: @SPCS_CONNECTION for SPCS credentials, or a named connection from ~/.snowflake/config.toml |
@SPCS_CONNECTION |
SNOWFLAKE_WAREHOUSE |
Warehouse for queries (not provided by SPCS, must be set manually) | - |
Switching to Manual Credentials
If you need to use traditional Snowflake credentials instead of SPCS-provided ones (e.g., for testing outside SPCS), set the SNOWFLAKE_CONNECTION_NAME environment variable:
# Use a named connection from ~/.snowflake/config.toml
docker run \
-e SNOWFLAKE_CONNECTION_NAME="MY_SNOWFLAKE_CONNECTION" \
-e SNOWFLAKE_ACCOUNT="myaccount.us-west-2.aws" \
-e SNOWFLAKE_USER="user" \
-e SNOWFLAKE_PASSWORD="password" \
# ... other env vars ...
data-exchange-agent
Example SPCS Service Definition
CREATE SERVICE data_exchange_agent
IN COMPUTE POOL my_compute_pool
QUERY_WAREHOUSE = MY_WAREHOUSE
FROM SPECIFICATION $$
spec:
containers:
- name: agent
image: /my_db/my_schema/my_repo/data-exchange-agent:latest
env:
DATA_SOURCE_HOST: "source-db.example.com"
DATA_SOURCE_PORT: "1433"
DATA_SOURCE_DATABASE: "mydb"
DATA_SOURCE_USERNAME: "user"
AGENT_AFFINITY: "spcs-agent"
WORKER_COUNT: "4"
secrets:
- snowflakeSecret: my_db_password_secret
secretKeyRef: password
envVarName: DATA_SOURCE_PASSWORD
$$;
For more details, see the Snowflake SPCS documentation.
🔌 Extending the System
Adding a New Bulk Utility
Bulk utilities are command-line tools used to efficiently export data from databases (e.g., BCP for SQL Server). Follow these steps to add a new bulk utility:
1. Define the Bulk Utility Type
Add your new utility to the BulkUtilityType enum in src/data_exchange_agent/data_sources/bulk_utility_types.py:
class BulkUtilityType(str, Enum):
BCP = "bcp"
YOUR_UTILITY = "your_utility_name" # Add this line
2. Create Configuration Class
Create a new configuration class in src/data_exchange_agent/config/sections/bulk_utilities/your_utility.py:
from data_exchange_agent.config.sections.bulk_utilities.base import BaseBulkUtilityConfig
class YourUtilityConfig(BaseBulkUtilityConfig):
"""Configuration class for YourUtility bulk utility settings."""
def __init__(
self,
# Add utility-specific parameters
utility_specific_parameter: str = "default_param",
) -> None:
"""Initialize YourUtility configuration."""
self.utility_specific_parameter = utility_specific_parameter
def _custom_validation(self) -> str | None:
"""Validate configuration parameters."""
if not self.utility_specific_parameter:
return "Utility specific parameter cannot be empty."
return None
def __repr__(self) -> str:
"""Return string representation."""
return f"YourUtilityConfig(utility_specific_parameter='{self.utility_specific_parameter}')"
3. Register the Bulk Utility
Register your utility in src/data_exchange_agent/config/sections/bulk_utilities/__init__.py:
from data_exchange_agent.config.sections.bulk_utilities.your_utility import YourUtilityConfig
from data_exchange_agent.constants.connection_types import ConnectionType
# Add to registry
BulkUtilityRegistry.register(ConnectionType.YOUR_UTILITY, YourUtilityConfig) # Add this
# Add to __all__
__all__ = [
"BaseBulkUtilityConfig",
"BulkUtilityRegistry",
"BCPBulkUtilityConfig",
"YourUtilityConfig", # Add this
]
4. Update ConnectionType Enum
Add your utility type to src/data_exchange_agent/constants/connection_types.py:
class ConnectionType(str, Enum):
# Bulk utilities
BCP = BulkUtilityType.BCP.value
YOUR_UTILITY = BulkUtilityType.YOUR_UTILITY.value # Add this
5. Create Data Source Implementation
Create the data source class in src/data_exchange_agent/data_sources/your_utility_data_source.py:
from data_exchange_agent.data_sources.base import BaseDataSource
from data_exchange_agent.data_sources.bulk_utility_types import BulkUtilityType
class YourUtilityDataSource(BaseDataSource):
"""Data source implementation for YourUtility."""
@inject
def __init__(
self,
engine: str,
statement: str,
results_folder_path: str = None,
base_file_name: str = "result",
logger: SFLogger = Provide[container_keys.SF_LOGGER],
program_config: ConfigManager = Provide[container_keys.PROGRAM_CONFIG],
) -> None:
"""Initialize YourUtilityDataSource."""
self.logger = logger
self._statement = statement
# Get configuration
bulk_utility_config = program_config[config_keys.BULK_UTILITY]
utility_config = bulk_utility_config.get(BulkUtilityType.YOUR_UTILITY, None)
# Use config values or defaults
self.utility_specific_parameter = utility_config.utility_specific_parameter if utility_config else "default_param"
def export_data(self) -> bool:
"""Export data using your utility command."""
# Implement the export logic
pass
6. Add Configuration to TOML
Users can now configure your bulk utility in configuration.toml:
[bulk_utility.your_utility_name]
# Add your custom parameters
utility_specific_parameter = "param"
7. Write Tests
Create tests in tests/data_sources/test_your_utility_data_source.py to verify functionality.
Example: BCP Implementation
See the existing BCP implementation for reference:
- Config:
src/data_exchange_agent/config/sections/bulk_utilities/bcp.py - Data Source:
src/data_exchange_agent/data_sources/bcp_data_source.py - Configuration example in
configuration_example.toml
🤝 Contributing
We welcome contributions! See our Contributing Guide for details on how to collaborate, set up your development environment, and submit PRs.
📄 License
This project is licensed under the Apache License 2.0. See the LICENSE file for details.
🆘 Support
- Documentation: Full documentation
- Issues: GitHub Issues
Developed with ❄️ by Snowflake
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file snowflake_data_exchange_agent-1.2.0.tar.gz.
File metadata
- Download URL: snowflake_data_exchange_agent-1.2.0.tar.gz
- Upload date:
- Size: 139.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7a76214673c23ebfc3f4593e5a691e8d0b9758c35d1992056ca9fdfeb5722f5f
|
|
| MD5 |
eb7022cbc09440ac9933b45d5a0f97a3
|
|
| BLAKE2b-256 |
aa5cc634c49690bb9a5739598b184271f35b297e82d30ae73f7e7121f66c40ed
|
File details
Details for the file snowflake_data_exchange_agent-1.2.0-py3-none-any.whl.
File metadata
- Download URL: snowflake_data_exchange_agent-1.2.0-py3-none-any.whl
- Upload date:
- Size: 104.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
17d32fc40df5b1c087b975e8bf92a70e32e4bc8643db01454d8ff4378f75fcd9
|
|
| MD5 |
31222d57015ef5010bbc84de0b156b44
|
|
| BLAKE2b-256 |
af493865024a38ef81c3738fb47efe630d166881d3ece55a482c06c71a70701b
|