Skip to main content

Data exchange agent for migrations and validation

Project description

Snowflake Data Exchange Agent

Python

The Data Exchange Agent is the Worker component of the Cloud Data Migration solution. It connects to source databases (SQL Server, Amazon Redshift, Teradata, Oracle), extracts data, and uploads it to Snowflake stages for ingestion by the Data Migration Orchestrator (snowflake-data-migration-orchestrator).

The same worker process also executes Cloud Data Validation tasks (data_validation) when the orchestrator schedules them. That path relies on the optional snowflake-data-validation package being installed in the worker environment (see the orchestrator documentation for creating validation workflows and JSON configuration).

Installation

pip install snowflake-data-exchange-agent

For Teradata sources using the native teradatasql driver (recommended over ODBC when you can use it), install the optional extra:

pip install snowflake-data-exchange-agent[teradata]

For Oracle sources using python-oracledb in thin mode (no Instant Client or ODBC required for typical EZ Connect), install the optional extra:

pip install snowflake-data-exchange-agent[oracle]

If oracledb is not installed, the agent falls back to Oracle ODBC + pyodbc as before (odbc_driver must match pyodbc.drivers()).

Python Version: 3.11 or higher

Usage

The agent provides two subcommands: run (default) and test.

# Start with a configuration file
data-exchange-agent run -c <configuration-file-path>

# Start with default configuration.toml in current directory
data-exchange-agent run

# Omitting the subcommand defaults to 'run' (backward compatible)
data-exchange-agent -c <configuration-file-path>

# Custom parallelism and port
data-exchange-agent run --max-parallel-tasks 8 --port 8080

# Task-handling only, without the HTTP server (for multi-worker setups)
data-exchange-agent run --no-server

# Custom base directory for exported files (overrides config)
data-exchange-agent run --local-results-directory /mnt/dea-exports

# Debug mode
data-exchange-agent run --debug --port 5001

# Test all configured connections (executes SELECT 1)
data-exchange-agent test -c <configuration-file-path>

Run Command Options

Flag Short Default Description
--config -c configuration.toml Path to the TOML configuration file.
--max-parallel-tasks -w from config Maximum number of parallel tasks.
--interval -i from config Interval (seconds) between task fetch attempts.
--host 0.0.0.0 Host to bind the HTTP server to.
--port -p 5001 Port to bind the HTTP server to.
--no-server off Run task handling only, without starting the HTTP server.
--local-results-directory from config Base directory for exported files before upload.
--debug -d off Enable debug mode.

Worker Configuration

The Worker configuration file uses TOML format.

Section Property Type Description
Top Level selected_task_source String Currently should always be set to "snowflake_stored_procedure".
[application] max_parallel_tasks Integer Maximum number of tasks the worker will process in parallel (using threads).
[application] task_fetch_interval Integer Interval (in seconds) between attempts to fetch new tasks from the Orchestrator.
[application] lease_refresh_interval Integer Optional. Interval (in seconds) between task lease renewals. Default 120.
[application] snowflake_database_for_metadata String Optional. Database where the orchestrator deployed the task queue (default SNOWCONVERT_AI). Must match the orchestrator's CUSTOM_SNOWFLAKE_DATABASE_FOR_METADATA if you override it there.
[application] snowflake_schema_for_data_migration_metadata String Optional. Schema for PULL_TASKS / COMPLETE_TASK / FAIL_TASK (default DATA_MIGRATION). Must match the orchestrator's CUSTOM_SNOWFLAKE_SCHEMA_FOR_DATA_MIGRATION_METADATA if overridden.
[application] local_results_directory String Optional. Base directory where each task's exported Parquet or CSV files are written before upload. Each run uses a subfolder task_<id>/<timestamp>. After a successful upload, that timestamp folder and the task_<id> parent (if empty) are removed so stale empty directories do not accumulate. When unset, files go under ~/.data_exchange_agent/result_data. Tilde (~) and relative paths are expanded at load time.
[connections.source.*] Object Configuration for source system connections. The Worker typically requires an ODBC driver. See examples below.
[connections.target.snowflake_connection_name] connection_name String The name of the connection entry in the ~/.snowflake/config.toml file to use.

When selected_task_source is snowflake_stored_procedure, the worker issues CALL statements against the task-queue using application.snowflake_database_for_metadata and application.snowflake_schema_for_data_migration_metadata. These settings are independent of Snowflake connection session defaults (SNOWFLAKE_DATABASE, SNOWFLAKE_SCHEMA in the connection profile).

Example: SQL Server (Standard Authentication)

[connections.source.sqlserver]
username = "username"
password = "password"
database = "database_name"
host = "127.0.0.1"
port = 1433

Example: Amazon Redshift (IAM Authentication)

[connections.source.redshift]
username = "demo-user"
database = "demo_db"
auth_method = "iam-provisioned-cluster"
cluster_id = "my-aws-cluster"
region = "us-west-2"
access_key_id = "your-access-key-id"
secret_access_key = "your-secret-access-key"

Example: Amazon Redshift (Standard Authentication)

[connections.source.redshift]
username = "myuser"
password = "mypassword"
database = "mydatabase"
host = "my-cluster.abcdef123456.us-west-2.redshift.amazonaws.com"
port = 5439
auth_method = "standard"

Example: PostgreSQL (ODBC)

Use this block when the orchestrator schedules Cloud Data Validation (or migration) against a PostgreSQL source. Install a PostgreSQL ODBC driver on the worker host; see pyodbc.drivers() for the exact odbc_driver string if you need to pin one.

[connections.source.postgresql]
username = "my_user"
password = "my_password"
database = "my_database"
host = "postgres.example.com"
port = 5432
# odbc_driver = "PostgreSQL Unicode"  # optional

Example: Teradata

The agent supports two Teradata drivers and automatically selects the best one available:

  1. teradatasql (preferred) -- Pure Python driver. No OS-level ODBC installation required. Install with pip install teradatasql.
  2. ODBC fallback -- If teradatasql is not installed, the agent falls back to pyodbc with the Teradata ODBC driver. Set odbc_driver to the exact name returned by pyodbc.drivers().

When teradatasql is available, odbc_driver is ignored and no ODBC driver needs to be installed on the host. Use dbc_name when your Teradata COP / TDPID alias differs from host.

Older configs used driver_name for the ODBC driver label; that key still works but is deprecated in favor of odbc_driver.

[connections.source.teradata]
host = "your-teradata-host.example.com"
port = 1025
database = "tpcds"
username = "your_username"
password = "your_password"
# odbc_driver = "Teradata Database ODBC Driver 17.20"  # only needed for ODBC fallback
# dbc_name = "TDPID_ALIAS"  # optional; defaults to host
# authentication = "LDAP"  # optional ODBC AuthMech when using pyodbc (e.g. TD2, LDAP, KRB5)

# Optional: WRITE_NOS configuration for direct export to cloud object storage
# (S3, Azure Blob, or GCS). The orchestrator's `extraction.strategy` must be
# set to `"write_nos"` for the worker to use these settings.
#
# `write_nos_location_scheme` is one of `/s3/`, `/az/`, or `/gs/`.
# Provide credentials in exactly one of the three modes below.
#
# write_nos_location_scheme    = "/az/"
# write_nos_location_host      = "myaccount.blob.core.windows.net"
# Optional path segment after host (omit when Snowflake stage is bucket root):
# write_nos_location_container = "td-nos-exports"
#
# Mode 1 - Function mapping (recommended; credentials live on Teradata):
# write_nos_function_mapping = "nos_util.WRITE_NOS_FM"
#
# Mode 2 - Named AUTHORIZATION object (credentials live on Teradata):
# write_nos_authorization_name = "nos_util.DefAuth_Write"
#
# Mode 3 - Inline credentials (embedded in SQL; redacted in logs):
# write_nos_access_id  = "your_account_or_access_key_id"
# write_nos_access_key = "your_secret"
#
# Optional output overrides (defaults shown):
# write_nos_stored_as       = "PARQUET"
# write_nos_compression     = "SNAPPY"
# write_nos_max_object_size = "16MB"
# write_nos_overwrite       = "TRUE"
#
# Optional: TPT (Teradata Parallel Transporter). Use when the orchestrator sets
# `extraction.strategy` to `"tpt"`. Requires TTU (`tbuild`) on PATH or set
# `TPT_TBUILD_EXECUTABLE` to its absolute path. Intermediate delimited files and
# job scripts use `tpt_output_directory` / `tpt_log_directory` on the worker.
#
# tpt_output_directory = "/var/dea/tpt"
# tpt_log_directory    = "/var/log/dea/tpt"
# tpt_delimiter        = "|"
# tpt_max_sessions     = 4
# tpt_charset          = "UTF8"

Example: PostgreSQL

The agent supports PostgreSQL via ODBC. For bulk data_movement tasks, set use_copy = true to use a psql subprocess with \copy (requires psql installed on the worker host and on PATH).

[connections.source.postgresql]
username = "my_user"
password = "my_password"
database = "my_database"
host = "postgres.example.com"
port = 5432
# use_copy = true   # bulk extracts via psql \copy (requires psql on PATH)

Example: Oracle (ODBC)

Install an Oracle Instant Client and the matching ODBC driver on the worker host. Use pyodbc.drivers() to find the exact odbc_driver string if the default label below does not match your installation.

Basic mode uses EZ Connect: service name in database, host/port for the listener.

[connections.source.oracle]
oracle_connection_mode = "basic"
username = "my_user"
password = "my_password"
database = "ORCLPDB1"
host = "oracle.example.com"
port = 1521
odbc_driver = "Oracle in instantclient_21_1"
auto_detect_driver = false

TNS alias mode sets DBQ to a name from tnsnames.ora. When tns_admin is set, TNS_ADMIN is applied only for the duration of pyodbc.connect so the driver can resolve the alias.

[connections.source.oracle]
oracle_connection_mode = "tns_alias"
tns_name = "MY_ORACLE_SERVICE"
username = "my_user"
password = "my_password"
odbc_driver = "Oracle in instantclient_21_1"
auto_detect_driver = false
tns_admin = "/path/to/tns"

Connect descriptor mode passes a full net descriptor as DBQ (for advanced routing, LDAP alternatives, or paste-in from tnsnames.ora).

[connections.source.oracle]
oracle_connection_mode = "connect_descriptor"
connect_descriptor = "(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=oracle.example.com)(PORT=1521))(CONNECT_DATA=(SERVICE_NAME=ORCLPDB1)))"
username = "my_user"
password = "my_password"
odbc_driver = "Oracle in instantclient_21_1"
auto_detect_driver = false

Optional wallet_directory / wallet_password append common Oracle ODBC wallet attributes; TLS naming and LDAP directory options vary by driver — add driver-specific keys via extra options or extend configuration as needed.

DBMS_CLOUD (S3 export inside Oracle): set dbms_cloud_credential_name and dbms_cloud_file_uri_prefix on the Oracle source connection. For local development, use the ghcr.io/oracle/adb-free image (not Oracle Database 23ai Free XE), wallet + DataGrip steps, S3 credential SQL, and troubleshooting — see docs/oracle-dbms-cloud-local-setup.md. Validate IAM first with scripts/test_s3_bucket_access.py.

Note: Only one source connection is needed. The Snowflake target connection should point to a valid entry in your ~/.snowflake/config.toml.

Idle worker shutdown

Task sources with idle shutdown (supports_idle_shutdown on the adapter; today the Snowflake stored-procedure source): configure idle exit so the worker terminates after sustained healthy idle (empty PULL_TASKS, no leases for this agent_id in TASK_QUEUE, no Snowflake errors on those checks). It inserts WORKER_SHUTDOWN into SYSTEM_EVENT (orchestrator migration 0029), emits Snowhouse stop telemetry, then drains workers (lease refresher first, then bounded worker joins), flushes logs, closes Snowflake sessions, and exits via shared.runtime_forced_exit.forced_idle_process_exit (os._exit(0)). Other task sources ignore idle shutdown settings.

Canonical (worker TOML):

[idle_shutdown]
minutes = 60   # always on; use 10080 (one week) for long-lived local dev

Idle shutdown is always enabled (secure-by-default). Resolution: ConfigManager (CLI → TOML → defaults), then optional DM_IDLE_SHUTDOWN_MINUTES env override when present in os.environ.

Setting TOML key Env override Default
Idle window (minutes) idle_shutdown.minutes (ge=1) DM_IDLE_SHUTDOWN_MINUTES 60

Invalid or <=0 minutes (env) clamp to 60 with WARNING. TOML minutes must be >=1. INFO is logged when env minutes differ from the TOML/CLI/default base.

Apply migration 0029 (SYSTEM_EVENT) before running DEA with idle shutdown, or the audit insert fails. With the HTTP server enabled (default), it stops when idle shutdown runs; use --no-server for worker-only smoke. Idle shutdown rows include PAYLOAD.host from shared.runtime_host.system_event_host() (spcs:… when SNOWFLAKE_SERVICE_* is set, otherwise host: + OS hostname). See ExplicitIdleShutdownSafetySpec.md §5.4.

In SPCS, set DM_IDLE_SHUTDOWN_MINUTES on both orchestrator and DEA service specs to the same value when overriding.

ODBC Driver Auto-Detection

The agent automatically detects the best available ODBC driver for SQL Server connections. If no odbc_driver is specified in the configuration, it will prefer the newest available driver (ODBC Driver 18 > 17 > 13 > 11). If a specific driver is requested but not found, it falls back to the best available driver with a warning.

To manually specify a driver:

[connections.source.sqlserver]
odbc_driver = "ODBC Driver 17 for SQL Server"

ODBC Encryption (SQL Server)

The encrypt and trust_server_certificate parameters are optional. By default, they are omitted from the connection string, allowing the ODBC driver to use its default behavior:

  • ODBC Driver 17 and below: Encryption is disabled by default.
  • ODBC Driver 18 and above: Encryption is mandatory by default.
[connections.source.sqlserver]
username = "sa"
password = "mypassword"
database = "mydb"
host = "my-server.example.com"
port = 1433
encrypt = true
trust_server_certificate = false

For development environments or SQL Servers without encryption support, either omit the encryption parameters or set encrypt = false.

Query Tagging

The Worker automatically sets Snowflake's QUERY_TAG session parameter on every query it submits. Tags are compact JSON strings containing identifiers such as the workflow ID, task ID, and worker version. You can use these tags to filter and attribute Worker queries in QUERY_HISTORY:

SELECT query_text, query_tag, start_time
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE TRY_PARSE_JSON(query_tag):DMVF_WORKFLOW_ID IS NOT NULL
ORDER BY start_time DESC;
Tag key Present on Description
DMVF_VERSION Infrastructure queries Worker package version.
DMVF_WORKFLOW_ID Task-processing queries Workflow that originated the task.
DMVF_TASK_ID Task-processing queries Individual task identifier.
DMVF_WORKER_VERSION Task-processing queries Worker package version.

Changelog

v1.20.2

New features

  • Added ODBC data migration support for Azure Synapse serverless SQL pool.

Improvements

  • Improved L3 row validation with a safeguard for mismatched target columns.

Bug fixes

  • Fixed identifier quoting: uppercase identifiers remain unquoted while lowercase or mixed-case identifiers are auto-quoted.

v1.20.1

New features

  • Added Azure Synapse Analytics connection configuration and ODBC output converters for Synapse-specific types.

Bug fixes

  • Fixed intermediate partition threshold calculation to use the next partition's initial index as the exclusive boundary, preventing unmoved rows on partition edges.

v1.20.0

New features

  • Added doctor subcommand for the data exchange agent to support scai data doctor.
  • Added doctor subcommand for the data migration orchestrator to support scai data doctor.
  • Added support for Oracle quoted and mixed-case identifiers in catalog and object-type queries.
  • Added unified Teradata Docker support with teradatasql, tbuild fallback, and a configurable image registry database.

Improvements

  • Improved L3 row validation by skipping per-partition information_schema queries.
  • Updated WRITE_NOS execution to use teradatasql as the primary driver, with ODBC as a fallback.

Bug fixes

  • Fixed TeradataConnectionConfig ODBC driver check when teradatasql is installed.

v1.13.2

Bug fixes

  • Fixed L3 row validation to honor column-selection and column-mapping config so source and target tables with different column counts no longer crash.

v1.13.1

New features

  • Added DBMS_CLOUD extraction so Oracle can unload directly to an external S3 stage without an agent upload step.
  • Added external-stage JSON load and stage file detection for DBMS_CLOUD.

Improvements

  • Improved Oracle wallet handling: honored tns_admin when wallet_directory is unset, and set both config_dir and wallet_location for oracledb ATP connections.

v1.13.0

New features

  • Added Oracle ODBC extraction with staged Parquet metadata and improved ODBC robustness.
  • Replaced standalone Oracle connector with oracledb-backed connection and added an Oracle Docker image.
  • Added Azure Synapse connection support (Phase 1).
  • Added Azure Synapse CLI and agent integration (Phase 5).
  • Added PostgreSQL bulk extract via psql \copy with PG_CSV_FILE_FORMAT support.
  • Added PostgreSQL support to the unified data exchange agent.
  • Added Teradata use_tpt_for_bulk flag and TPT typed Parquet coercion.

Improvements

  • Enhanced task telemetry by integrating MigrationTracker and current-step tracking.
  • Refactored Teradata extraction strategies to unify REGULAR and TPT handling.
  • Made write_nos_location_container optional in WRITE_NOS configuration and removed the external stage URL and storage integration requirement.
  • Improved Oracle fully-qualified-name and view validation handling.
  • Updated VARCHAR limits in validation templates and constants.
  • Improved row-hashing performance.

Bug fixes

  • Fixed custom-template model imports.

v1.12.0

New features

  • Added view validation support to the Cloud Data Validation pipeline.
  • Added Oracle ODBC source support.
  • Added Teradata type mappings and Snowflake target fully-qualified-name helpers.
  • Added Teradata object_type query in the shared dispatcher.
  • Added Teradata to the data-migration-orchestrator workflow.
  • Added Teradata ODBC source support.
  • Added early-stopping support for L3 row-hashing validation.
  • Added support for custom metrics and templates in Cloud Data Validation.
  • Added Oracle Data Validation foundation with L1 schema validation.
  • Added L3 row and cell MD5 validation for Oracle.
  • Added Oracle wiring and factory registration in the SDV core.
  • Added the teradata optional install extra (pip install snowflake-data-exchange-agent[teradata]).
  • Added support for early stopping, hybrid L3, and Snowpipe (breaking change).
  • Added Snowflake schema utilities and type-mapping updates for the orchestrator.
  • Added TPT and WRITE_NOS data sources for Teradata extraction.
  • Added TPT and WRITE_NOS integration in Teradata workflow tasks.
  • Added a metrics skill and PostgreSQL metrics templates.
  • Added PostgreSQL connector with L0 and L1 validation.
  • Added Oracle as a supported Data Validation source across the orchestrator and agent.
  • Added L2 and L3 row and cell validation for PostgreSQL.
  • Added PostgreSQL support to Cloud Data Validation.

Improvements

  • Optimized L3 row-hashing queries.
  • Treated wrapped (200010) PULL_TASKS lock-wait error as transient so the worker retries instead of failing.
  • Extended Teradata ODBC connection configuration.

Bug fixes

  • Made BCP stdout/stderr pump threads daemon=True to prevent worker hangs on shutdown.
  • Prevented out-of-memory errors in cell-by-cell and row-hashing comparisons in workers.
  • Fixed L3 row-hashing producing false positives.
  • Prevented unnecessary shared-cache eviction when the loaded copy already matches the workspace.
  • Fixed row-hashing algorithm errors and now surfaces duplicates and missing rows distinctly (breaking change).

v1.11.1

Improvements

  • Improved column metrics query performance by consolidating per-column CTEs into a single wide-row query.

Bug fixes

  • Fixed aggregate overflow on STDDEV and VARIANCE during data validation by casting SUM/AVG/STDDEV inputs to FLOAT; removed the VARIANCE metric.

v1.11.0

Improvements

  • Cast value columns to Utf8 before unpivot and corrected IS_VALID evaluation.
  • Vertical partitioning for cell validation on wide tables.

Bug fixes

  • Fixed timestamp copy handling for SQL Server BCP loads.
  • Fixed duplicate tasks created when evaluating L1 results under race conditions.
  • Fixed decimal partition coercion and parallelized L3 validation fixes.

v1.10.0

New features

  • Added hybrid row validation mode — two-phase MD5 + cell drilldown.
  • Added DEFAULT normalization templates for various data types.

Improvements

  • Improved result set snapshots validation.
  • Improved Data Validation performance.
  • Included thread name and ID in log output for easier troubleshooting.
  • Improved the task queue to support a higher number of parallel workers.

Bug fixes

  • Fixed SQL compilation memory exhaustion by batching L2 metrics queries for wide tables.
  • Fixed an issue with the incremental sync watermark on Redshift.
  • Fixed usage of the vectorized scanner.

v1.9.2

Improvements

  • Log installed dependency versions and the Python runtime version at startup.

v1.9.1

Improvements

  • Cloud data validation tasks read query results in batches instead of loading full result sets into memory.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snowflake_data_exchange_agent-1.20.2.tar.gz (291.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

snowflake_data_exchange_agent-1.20.2-py3-none-any.whl (304.9 kB view details)

Uploaded Python 3

File details

Details for the file snowflake_data_exchange_agent-1.20.2.tar.gz.

File metadata

File hashes

Hashes for snowflake_data_exchange_agent-1.20.2.tar.gz
Algorithm Hash digest
SHA256 074a71f3a9031b2384b58e6194f1a65c2d542d75e991ff7bf9f54a7ec3ac7789
MD5 a0dcf21fc3743351022416fa3c5526ab
BLAKE2b-256 cfc4025e5fb7ca9ee4c20799bf796b1ebb9070b98ca7986686bacb91b633dac8

See more details on using hashes here.

File details

Details for the file snowflake_data_exchange_agent-1.20.2-py3-none-any.whl.

File metadata

File hashes

Hashes for snowflake_data_exchange_agent-1.20.2-py3-none-any.whl
Algorithm Hash digest
SHA256 47ff461f6d901e666351486e63a768ccc5ea5d7e75d90694156e7d329c59be70
MD5 896fd3461327680fc76a8e2f897fdf99
BLAKE2b-256 53a4f7ddbc36e89cd92af0e05b7b3cb6a88bab6cfd324ffd7041929a73fe07d4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page