Data exchange agent for migrations and validation
Project description
Snowflake Data Exchange Agent
A REST API service for database migrations and data validation. Supports multiple databases including Snowflake, PostgreSQL, and SQL Server with queue-based task processing.
Quick Start
# Install
pip install snowflake-data-exchange-agent
# Run
data-exchange-agent --port 8080
# Test
curl http://localhost:8080/health
Installation
From PyPI (Production)
pip install snowflake-data-exchange-agent
Requirements & Dependencies
Python Version: 3.10, 3.11, or 3.12 (3.13 not yet supported)
Available dependency groups:
development: Testing and development tools (pytest, ruff, etc.)all: Includes all development dependencies
Core dependencies include:
- Snowflake Connector for Python
- PySpark for data processing
- Flask + Waitress for REST API
- PostgreSQL support (psycopg2-binary)
- AWS SDK (boto3)
Configuration
Create src/data_exchange_agent/configuration.toml:
selected_task_source = "api"
[application]
workers = 4
task_fetch_interval = 120
debug_mode = false
[task_source.api]
key = "api-key"
[connections.source.<sqlserver|postgresql|teradata>]
username = "username"
password = "password"
database = "database_name"
host = "127.0.0.1"
port = <1433|5432|1025>
[connections.target.snowflake_connection_name]
connection_name = "connection_name"
[connections.target.s3]
profile_name = "profile_name"
bucket_name = "bucket_name"
[connections.target.blob]
connection_string = "DefaultEndpointsProtocol=https;AccountName=account_name;AccountKey=account_key;EndpointSuffix=core.windows.net"
container_name = "container_name"
# Optional: Account name and use_default_credential if not using connection string
account_name="storage_account_name"
use_default_credential=<True|False>
For Snowflake, create ~/.snowflake/config.toml:
[connections.default]
account = "your_account.region"
user = "your_username"
password = "your_password"
warehouse = "COMPUTE_WH"
database = "PRODUCTION_DB"
API Usage
Command Line
# Basic usage
data-exchange-agent
# Production settings
data-exchange-agent --workers 8 --port 8080
# Debug mode
data-exchange-agent --debug --port 5001
Health Check
GET /health
{
"status": "healthy",
"version": "0.0.18",
"database_connections": {
"snowflake": "connected"
}
}
Task Management
# Start processing
GET /handle_tasks
# Stop processing
GET /stop
# Get status
GET /get_handling_tasks_status
# Task count
GET /get_tasks_count
Add Task
POST /tasks
Content-Type: application/json
{
"task_type": "data_extraction",
"source_config": {
"database": "postgresql",
"query": "SELECT * FROM users"
},
"destination_config": {
"type": "snowflake_stage",
"stage": "@data_stage/users/"
}
}
Development
Setup
git clone https://github.com/snowflakedb/migrations-data-validation.git
cd migrations-data-validation/data-exchange-agent
pip install -e .[development]
Testing
# Run all tests
pytest
# With coverage
pytest --cov=src/data_exchange_agent
# Run specific test types
pytest tests/unit/ # Unit tests only
pytest -m "not integration" # Non-integration tests
Code Quality
# Format code
ruff format .
# Lint code
ruff check .
# Auto-fix linting issues
ruff check --fix .
🔌 Extending the System
Adding a New Bulk Utility
Bulk utilities are command-line tools used to efficiently export data from databases (e.g., BCP for SQL Server). Follow these steps to add a new bulk utility:
1. Define the Bulk Utility Type
Add your new utility to the BulkUtilityType enum in src/data_exchange_agent/data_sources/bulk_utility_types.py:
class BulkUtilityType(str, Enum):
BCP = "bcp"
YOUR_UTILITY = "your_utility_name" # Add this line
2. Create Configuration Class
Create a new configuration class in src/data_exchange_agent/config/sections/bulk_utilities/your_utility.py:
from data_exchange_agent.config.sections.bulk_utilities.base import BaseBulkUtilityConfig
class YourUtilityConfig(BaseBulkUtilityConfig):
"""Configuration class for YourUtility bulk utility settings."""
def __init__(
self,
# Add utility-specific parameters
utility_specific_parameter: str = "default_param",
) -> None:
"""Initialize YourUtility configuration."""
self.utility_specific_parameter = utility_specific_parameter
def _custom_validation(self) -> str | None:
"""Validate configuration parameters."""
if not self.utility_specific_parameter:
return "Utility specific parameter cannot be empty."
return None
def __repr__(self) -> str:
"""Return string representation."""
return f"YourUtilityConfig(utility_specific_parameter='{self.utility_specific_parameter}')"
3. Register the Bulk Utility
Register your utility in src/data_exchange_agent/config/sections/bulk_utilities/__init__.py:
from data_exchange_agent.config.sections.bulk_utilities.your_utility import YourUtilityConfig
from data_exchange_agent.constants.connection_types import ConnectionType
# Add to registry
BulkUtilityRegistry.register(ConnectionType.YOUR_UTILITY, YourUtilityConfig) # Add this
# Add to __all__
__all__ = [
"BaseBulkUtilityConfig",
"BulkUtilityRegistry",
"BCPBulkUtilityConfig",
"YourUtilityConfig", # Add this
]
4. Update ConnectionType Enum
Add your utility type to src/data_exchange_agent/constants/connection_types.py:
class ConnectionType(str, Enum):
# Bulk utilities
BCP = BulkUtilityType.BCP.value
YOUR_UTILITY = BulkUtilityType.YOUR_UTILITY.value # Add this
5. Create Data Source Implementation
Create the data source class in src/data_exchange_agent/data_sources/your_utility_data_source.py:
from data_exchange_agent.data_sources.base import BaseDataSource
from data_exchange_agent.data_sources.bulk_utility_types import BulkUtilityType
class YourUtilityDataSource(BaseDataSource):
"""Data source implementation for YourUtility."""
@inject
def __init__(
self,
engine: str,
statement: str,
results_folder_path: str = None,
base_file_name: str = "result",
logger: SFLogger = Provide[container_keys.SF_LOGGER],
program_config: ConfigManager = Provide[container_keys.PROGRAM_CONFIG],
) -> None:
"""Initialize YourUtilityDataSource."""
self.logger = logger
self._statement = statement
# Get configuration
bulk_utility_config = program_config[config_keys.BULK_UTILITY]
utility_config = bulk_utility_config.get(BulkUtilityType.YOUR_UTILITY, None)
# Use config values or defaults
self.utility_specific_parameter = utility_config.utility_specific_parameter if utility_config else "default_param"
def export_data(self) -> bool:
"""Export data using your utility command."""
# Implement the export logic
pass
6. Add Configuration to TOML
Users can now configure your bulk utility in configuration.toml:
[bulk_utility.your_utility_name]
# Add your custom parameters
utility_specific_parameter = "param"
7. Write Tests
Create tests in tests/data_sources/test_your_utility_data_source.py to verify functionality.
Example: BCP Implementation
See the existing BCP implementation for reference:
- Config:
src/data_exchange_agent/config/sections/bulk_utilities/bcp.py - Data Source:
src/data_exchange_agent/data_sources/bcp_data_source.py - Configuration example in
configuration_example.toml
🤝 Contributing
We welcome contributions! See our Contributing Guide for details on how to collaborate, set up your development environment, and submit PRs.
📄 License
This project is licensed under the Apache License 2.0. See the LICENSE file for details.
🆘 Support
- Documentation: Full documentation
- Issues: GitHub Issues
Developed with ❄️ by Snowflake
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file snowflake_data_exchange_agent-1.1.1.tar.gz.
File metadata
- Download URL: snowflake_data_exchange_agent-1.1.1.tar.gz
- Upload date:
- Size: 131.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
552704a134f31f544fe6981610ed780617e65ebf1339b8817587721f0079c06f
|
|
| MD5 |
4120edc60438c1578f8b0bb958f79057
|
|
| BLAKE2b-256 |
ed260a92106447db7dbacb03782184347978722c6d508b22fa2d934d665dc291
|
File details
Details for the file snowflake_data_exchange_agent-1.1.1-py3-none-any.whl.
File metadata
- Download URL: snowflake_data_exchange_agent-1.1.1-py3-none-any.whl
- Upload date:
- Size: 94.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5ca03ec66046b025d67472ef3021578dd5225a966f434f2478a27b35d26bffe5
|
|
| MD5 |
87a850c943967e8366fee4d1a70f3da1
|
|
| BLAKE2b-256 |
9ef5935a0fc3611d02a7a4cc02007df8d12b75a316fb3c38bf2d148c5380cf85
|