Lakehouse Tools for Snowflake and Salesforce
Project description
rm -rf dist/ build/ src/lht.egg-info/ && python -m build python -m twine upload --repository testpypi dist/*
Lake House Tools (LHT) - Salesforce & Snowflake Integration
Bring Salesforce into the fold of your data cloud. LHT is a robust Python library that makes it really easy to extract data from Salesforce and reverse-extract your updates and transformations back into Salesforce.
🚀 Features
Intelligent Synchronization
- Automatic Method Selection: Choose the best sync method based on data volume
- Incremental Sync: Smart detection of changed records since last sync
- Bulk API 2.0 Integration: Efficient handling of large datasets
- Snowflake Stage Support: Optimized for Snowflake Notebook environments
Core Capabilities
- Salesforce Bulk API 2.0: Full support for bulk operations
- Snowflake Integration: Native Snowpark support
- Data Type Mapping: Automatic Salesforce to Snowflake type conversion
- Error Handling: Comprehensive error management and recovery
- Performance Optimization: Stage-based processing for large datasets
📦 Installation
pip install lht
🎯 Quick Start
Prerequisites
1. Salesforce Setup
Option A: Developer Org (Recommended for Testing)
- Sign up for a Salesforce Developer Org (free)
- Important: Developer Pro or above is preferred for testing LHT
- Do NOT use your production Salesforce instance or production data
Option B: Trial Org
- Sign up for a Salesforce Trial (free)
- Choose a trial that includes the features you want to test
Option C: Sandbox
- If you have a Developer Pro+ license, create a sandbox from your production org
- Never test LHT in production
⚠️ Critical Requirements:
- Administrative access to the Salesforce instance
🔧 OAuth2.0 Setup Required:
- Configure a Connected App for the OAuth2.0 Client Credentials Flow
- Detailed Connected App Setup Instructions
- Callback URL: Enter
https://localhost/callback(not used in this flow but required) - Scopes: Add "Full" scopes for testing (modify for production use)
- Retrieve Credentials: Once configured, get the Client ID and Client Secret and store them securely
- Get Your Domain: From Setup, search for "My Domain" and copy the subdomain (everything before '.my.salesforce.com')
- Note: Sandbox instances will include 'sandbox' in the subdomain
- Store Securely: Keep Client ID, Client Secret, and subdomain together - you'll need them for LHT configuration
🚨 Salesforce Limitations: Since Salesforce was never really architected to deal with any meaningful amount of data, there will be limitations on what you can do:
- API rate limits: 15,000 API calls per 24-hour period (Enterprise), 100,000 (Unlimited)
- Bulk API limits: 10,000 records per batch, 10 concurrent jobs
- Query limits: SOQL queries limited to 50,000 records
- Storage limits: Varies by org type and edition
This is why we introduced LHT - to bridge these limitations and provide robust data integration capabilities.
2. Snowflake Setup
Free Trial Registration:
- Sign up for a Snowflake free trial (free)
- Choose a cloud provider (AWS, Azure, or GCP)
- Select a region close to your Salesforce org
⚠️ Critical Requirements:
- Account Admin privileges (required for initial setup)
- Security Admin privileges (required for user and role management)
- Database creation permissions
- Warehouse creation permissions
- Stage creation permissions
🔑 Minimum Snowflake Roles Needed:
-- Account Admin (automatically granted)
-- Security Admin
-- Database Admin
-- Warehouse Admin
Basic Intelligent Sync
from lht.salesforce.intelligent_sync import sync_sobject_intelligent
# Sync Account object intelligently
result = sync_sobject_intelligent(
session=session,
access_info=access_info,
sobject="Account",
schema="RAW",
table="ACCOUNTS",
match_field="ID"
)
print(f"Synced {result['actual_records']} records using {result['sync_method']}")
Advanced Sync with Stage
# For large datasets in Snowflake Notebooks
result = sync_sobject_intelligent(
session=session,
access_info=access_info,
sobject="Contact",
schema="WAREHOUSE",
table="CONTACTS",
match_field="ID",
use_stage=True,
stage_name="@SALESFORCE_STAGE"
)
🔧 How It Works
Decision Matrix
The system automatically selects the optimal sync method:
| Scenario | Records | Method | Description |
|---|---|---|---|
| First-time sync | < 1,000 | regular_api_full |
Use regular Salesforce API |
| First-time sync | 1,000 - 49,999 | bulk_api_full |
Use Bulk API 2.0 |
| First-time sync | ≥ 50,000 | bulk_api_stage_full |
Use Bulk API 2.0 with Snowflake stage |
| Incremental sync | < 1,000 | regular_api_incremental |
Use regular API with merge logic |
| Incremental sync | 1,000 - 49,999 | bulk_api_incremental |
Use Bulk API 2.0 |
| Incremental sync | ≥ 50,000 | bulk_api_stage_incremental |
Use Bulk API 2.0 with stage |
Incremental Sync Logic
- Check Table Existence: Determines if target table exists
- Get Last Modified Date: Queries
MAX(LASTMODIFIEDDATE)from existing table - Estimate Record Count: Counts records modified since last sync
- Choose Method: Selects appropriate sync method based on count
- Execute Sync: Runs the chosen method
📚 Documentation
- Intelligent Sync Guide: Comprehensive guide to the intelligent sync system
- Snowflake Stage Integration: Stage-based processing documentation
- Examples: Complete working examples
🔄 Sync Methods
1. Regular API Methods
- Use cases: Small datasets (< 1,000 records)
- Advantages: Fast for small datasets, real-time processing
- Disadvantages: API rate limits, memory intensive
2. Bulk API 2.0 Methods
- Use cases: Medium to large datasets (1,000+ records)
- Advantages: Handles large datasets efficiently, built-in retry logic
- Disadvantages: Requires job management, asynchronous processing
3. Stage-Based Methods
- Use cases: Very large datasets (50,000+ records) in Snowflake Notebooks
- Advantages: Handles massive datasets, better memory management
- Disadvantages: Requires stage setup, Snowflake-specific
🛠️ Configuration
Custom Thresholds
from lht.salesforce.intelligent_sync import IntelligentSync
sync_system = IntelligentSync(session, access_info)
sync_system.BULK_API_THRESHOLD = 5000 # Use Bulk API for 5K+ records
sync_system.STAGE_THRESHOLD = 25000 # Use stage for 25K+ records
Environment Setup
# Create stage for large datasets
session.sql("CREATE OR REPLACE STAGE @SALESFORCE_STAGE").collect()
# Set appropriate warehouse size
session.sql("USE WAREHOUSE LARGE_WH").collect()
📊 Return Values
Sync functions return detailed information:
{
'sobject': 'Account',
'target_table': 'RAW.ACCOUNTS',
'sync_method': 'bulk_api_incremental',
'estimated_records': 1500,
'actual_records': 1487,
'sync_duration_seconds': 45.23,
'last_modified_date': Timestamp('2024-01-15 10:30:00'),
'sync_timestamp': Timestamp('2024-01-16 14:20:00'),
'success': True,
'error': None
}
🚨 Error Handling
The system includes comprehensive error handling for:
- Authentication errors
- Network issues
- Job failures
- Data errors
Errors are captured in the return value:
{
'success': False,
'error': 'Bulk API job failed with state: Failed',
'records_processed': 0
}
🔧 Advanced Usage
Multiple Object Sync
objects_to_sync = [
{"sobject": "Account", "table": "ACCOUNTS"},
{"sobject": "Contact", "table": "CONTACTS"},
{"sobject": "Opportunity", "table": "OPPORTUNITIES"}
]
results = []
for obj in objects_to_sync:
result = sync_sobject_intelligent(
session=session,
access_info=access_info,
sobject=obj['sobject'],
schema="RAW",
table=obj['table'],
match_field="ID"
)
results.append(result)
Force Full Sync
# Useful for data refresh or after schema changes
result = sync_sobject_intelligent(
session=session,
access_info=access_info,
sobject="Account",
schema="RAW",
table="ACCOUNTS",
match_field="ID",
force_full_sync=True # Overwrites entire table
)
📈 Performance Considerations
Memory Usage
- Regular API: Loads all data in memory
- Bulk API: Processes in batches
- Stage-based: Minimal memory usage
Processing Time
- Small datasets (< 1K): Regular API fastest
- Medium datasets (1K-50K): Bulk API optimal
- Large datasets (> 50K): Stage-based best
🤝 Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lht-0.1.240.tar.gz.
File metadata
- Download URL: lht-0.1.240.tar.gz
- Upload date:
- Size: 55.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.10.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0757fc34779fbaf3fb75c649aed51c2362955afef5775bfd24118c9cff2ce8be
|
|
| MD5 |
5d657003efeced0fed764b0d056778f8
|
|
| BLAKE2b-256 |
a4b254b02094afba9b04b3f3b1b591982ecba5c4e8f7dcb7021ef36bc93476e2
|
File details
Details for the file lht-0.1.240-py3-none-any.whl.
File metadata
- Download URL: lht-0.1.240-py3-none-any.whl
- Upload date:
- Size: 60.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.10.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b28dd427a8158f57bacdf55f10e834e8e3f061db09158257850380f962a7a84d
|
|
| MD5 |
baf462061a79bd8aada6b0c18fe02e0d
|
|
| BLAKE2b-256 |
78c72005c21f0c3e2f4236302b291cfd7d14d5306ab32c8c848bb13f80c6782b
|