Data exchange agent for migrations and validation
Project description
Snowflake Data Exchange Agent
The Data Exchange Agent is the Worker component of the Cloud Data Migration solution. It connects to source databases (SQL Server, Amazon Redshift, Teradata), extracts data, and uploads it to Snowflake stages for ingestion by the Data Migration Orchestrator (snowflake-data-migration-orchestrator).
The same worker process also executes Cloud Data Validation tasks (data_validation) when the orchestrator schedules them. That path relies on the optional snowflake-data-validation package being installed in the worker environment (see the orchestrator documentation for creating validation workflows and JSON configuration).
Installation
pip install snowflake-data-exchange-agent
Python Version: 3.11 or higher
Usage
The agent provides two subcommands: run (default) and test.
# Start with a configuration file
data-exchange-agent run -c <configuration-file-path>
# Start with default configuration.toml in current directory
data-exchange-agent run
# Omitting the subcommand defaults to 'run' (backward compatible)
data-exchange-agent -c <configuration-file-path>
# Custom parallelism and port
data-exchange-agent run --max-parallel-tasks 8 --port 8080
# Task-handling only, without the HTTP server (for multi-worker setups)
data-exchange-agent run --no-server
# Custom base directory for exported files (overrides config)
data-exchange-agent run --local-results-directory /mnt/dea-exports
# Debug mode
data-exchange-agent run --debug --port 5001
# Test all configured connections (executes SELECT 1)
data-exchange-agent test -c <configuration-file-path>
Run Command Options
| Flag | Short | Default | Description |
|---|---|---|---|
--config |
-c |
configuration.toml |
Path to the TOML configuration file. |
--max-parallel-tasks |
-w |
from config | Maximum number of parallel tasks. |
--interval |
-i |
from config | Interval (seconds) between task fetch attempts. |
--host |
0.0.0.0 |
Host to bind the HTTP server to. | |
--port |
-p |
5001 |
Port to bind the HTTP server to. |
--no-server |
off | Run task handling only, without starting the HTTP server. | |
--local-results-directory |
from config | Base directory for exported files before upload. | |
--debug |
-d |
off | Enable debug mode. |
Worker Configuration
The Worker configuration file uses TOML format.
| Section | Property | Type | Description |
|---|---|---|---|
| Top Level | selected_task_source |
String | Currently should always be set to "snowflake_stored_procedure". |
[application] |
max_parallel_tasks |
Integer | Maximum number of tasks the worker will process in parallel (using threads). |
[application] |
task_fetch_interval |
Integer | Interval (in seconds) between attempts to fetch new tasks from the Orchestrator. |
[application] |
lease_refresh_interval |
Integer | Optional. Interval (in seconds) between task lease renewals. Default 120. |
[application] |
snowflake_database_for_metadata |
String | Optional. Database where the orchestrator deployed the task queue (default SNOWCONVERT_AI). Must match the orchestrator's CUSTOM_SNOWFLAKE_DATABASE_FOR_METADATA if you override it there. |
[application] |
snowflake_schema_for_data_migration_metadata |
String | Optional. Schema for PULL_TASKS / COMPLETE_TASK / FAIL_TASK (default DATA_MIGRATION). Must match the orchestrator's CUSTOM_SNOWFLAKE_SCHEMA_FOR_DATA_MIGRATION_METADATA if overridden. |
[application] |
local_results_directory |
String | Optional. Base directory where each task's exported Parquet or CSV files are written before upload. Each run uses a subfolder task_<id>/<timestamp>. After a successful upload, that timestamp folder and the task_<id> parent (if empty) are removed so stale empty directories do not accumulate. When unset, files go under ~/.data_exchange_agent/result_data. Tilde (~) and relative paths are expanded at load time. |
[connections.source.*] |
Object | Configuration for source system connections. The Worker typically requires an ODBC driver. See examples below. | |
[connections.target.snowflake_connection_name] |
connection_name |
String | The name of the connection entry in the ~/.snowflake/config.toml file to use. |
When selected_task_source is snowflake_stored_procedure, the worker issues CALL statements against the task-queue using application.snowflake_database_for_metadata and application.snowflake_schema_for_data_migration_metadata. These settings are independent of Snowflake connection session defaults (SNOWFLAKE_DATABASE, SNOWFLAKE_SCHEMA in the connection profile).
Example: SQL Server (Standard Authentication)
[connections.source.sqlserver]
username = "username"
password = "password"
database = "database_name"
host = "127.0.0.1"
port = 1433
Example: Amazon Redshift (IAM Authentication)
[connections.source.redshift]
username = "demo-user"
database = "demo_db"
auth_method = "iam-provisioned-cluster"
cluster_id = "my-aws-cluster"
region = "us-west-2"
access_key_id = "your-access-key-id"
secret_access_key = "your-secret-access-key"
Example: Amazon Redshift (Standard Authentication)
[connections.source.redshift]
username = "myuser"
password = "mypassword"
database = "mydatabase"
host = "my-cluster.abcdef123456.us-west-2.redshift.amazonaws.com"
port = 5439
auth_method = "standard"
Example: Teradata
The agent supports two Teradata drivers and automatically selects the best one available:
teradatasql(preferred) -- Pure Python driver. No OS-level ODBC installation required. Install withpip install teradatasql.- ODBC fallback -- If
teradatasqlis not installed, the agent falls back topyodbcwith the Teradata ODBC driver. Setdriver_nameto the exact name returned bypyodbc.drivers().
When teradatasql is available, driver_name is ignored and no ODBC driver needs to be installed on the host. Use dbc_name when your Teradata COP / TDPID alias differs from host.
[connections.source.teradata]
host = "your-teradata-host.example.com"
port = 1025
database = "tpcds"
username = "your_username"
password = "your_password"
# driver_name = "Teradata Database ODBC Driver 17.20" # only needed for ODBC fallback
# dbc_name = "TDPID_ALIAS" # optional; defaults to host
Note: Only one source connection is needed. The Snowflake target connection should point to a valid entry in your
~/.snowflake/config.toml.
ODBC Driver Auto-Detection
The agent automatically detects the best available ODBC driver for SQL Server connections. If no odbc_driver is specified in the configuration, it will prefer the newest available driver (ODBC Driver 18 > 17 > 13 > 11). If a specific driver is requested but not found, it falls back to the best available driver with a warning.
To manually specify a driver:
[connections.source.sqlserver]
odbc_driver = "ODBC Driver 17 for SQL Server"
ODBC Encryption (SQL Server)
The encrypt and trust_server_certificate parameters are optional. By default, they are omitted from the connection string, allowing the ODBC driver to use its default behavior:
- ODBC Driver 17 and below: Encryption is disabled by default.
- ODBC Driver 18 and above: Encryption is mandatory by default.
[connections.source.sqlserver]
username = "sa"
password = "mypassword"
database = "mydb"
host = "my-server.example.com"
port = 1433
encrypt = true
trust_server_certificate = false
For development environments or SQL Servers without encryption support, either omit the encryption parameters or set encrypt = false.
Query Tagging
The Worker automatically sets Snowflake's QUERY_TAG session parameter on every query it submits. Tags are compact JSON strings containing identifiers such as the workflow ID, task ID, and worker version. You can use these tags to filter and attribute Worker queries in QUERY_HISTORY:
SELECT query_text, query_tag, start_time
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE TRY_PARSE_JSON(query_tag):DMVF_WORKFLOW_ID IS NOT NULL
ORDER BY start_time DESC;
| Tag key | Present on | Description |
|---|---|---|
DMVF_VERSION |
Infrastructure queries | Worker package version. |
DMVF_WORKFLOW_ID |
Task-processing queries | Workflow that originated the task. |
DMVF_TASK_ID |
Task-processing queries | Individual task identifier. |
DMVF_WORKER_VERSION |
Task-processing queries | Worker package version. |
Changelog
v1.11.0
Improvements
- Cast value columns to
Utf8before unpivot and correctedIS_VALIDevaluation. - Vertical partitioning for cell validation on wide tables.
Bug fixes
- Fixed timestamp copy handling for SQL Server BCP loads.
- Fixed duplicate tasks created when evaluating L1 results under race conditions.
- Fixed decimal partition coercion and parallelized L3 validation fixes.
v1.10.0
New features
- Added hybrid row validation mode — two-phase
MD5+ cell drilldown. - Added
DEFAULTnormalization templates for various data types.
Improvements
- Improved result set snapshots validation.
- Improved Data Validation performance.
- Included thread name and ID in log output for easier troubleshooting.
- Improved the task queue to support a higher number of parallel workers.
Bug fixes
- Fixed
SQLcompilation memory exhaustion by batching L2 metrics queries for wide tables. - Fixed an issue with the incremental sync watermark on Redshift.
- Fixed usage of the vectorized scanner.
v1.9.2
Improvements
- Log installed dependency versions and the Python runtime version at startup.
v1.9.1
Improvements
- Cloud data validation tasks read query results in batches instead of loading full result sets into memory.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file snowflake_data_exchange_agent-1.11.0.tar.gz.
File metadata
- Download URL: snowflake_data_exchange_agent-1.11.0.tar.gz
- Upload date:
- Size: 213.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
47a6bbbb9ada026bf8c60f777f8f5ad133a67d85a9171e6b8d9946fa9d94edaa
|
|
| MD5 |
ed10e3036af9e286df6ee542e88864cf
|
|
| BLAKE2b-256 |
be991228242a9aa6091b891c759f3bb7d3b16a95fa6512168976b83a0daacd3e
|
File details
Details for the file snowflake_data_exchange_agent-1.11.0-py3-none-any.whl.
File metadata
- Download URL: snowflake_data_exchange_agent-1.11.0-py3-none-any.whl
- Upload date:
- Size: 221.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ef8e7432b831e47231ae9475e18760b3f7d63bf315e7f1dd0e15cc28d6290e0
|
|
| MD5 |
e3aef786d267c282ca4513177b7f0e2e
|
|
| BLAKE2b-256 |
66aa017dfd2893e7b3a9e0eff0be1be0e1326ee1b96d23763eff0db46bc48d33
|