Skip to main content

Lakehouse Tools for Snowflake and Salesforce

Project description

Lake House Tools (LHT) - Salesforce & Snowflake Integration

Bring Salesforce into the fold of your data cloud. LHT is a robust Python library that makes it really easy to extract data from Salesforce and reverse-extract your updates and transformations back into Salesforce. LHT optimizes the integration of Salesforce into your data cloud by automatically selecting and using the appropriate Salesforce API (Regular API or Bulk API 2.0) based on the data volume and sync requirements.

🚀 Features

Intelligent Synchronization

  • Automatic Method Selection: Choose the best sync method based on data volume
  • Incremental Sync: Smart detection of changed records since last sync
  • Bulk API 2.0 Integration: Efficient handling of large datasets

Core Capabilities

  • Salesforce Bulk API 2.0: Full support for bulk operations
  • Snowflake Integration: Native Snowpark support
  • Data Type Mapping: Automatic Salesforce to Snowflake type conversion
  • Error Handling: Comprehensive error management and recovery
  • Performance Optimization: Efficient processing for large datasets

📦 Installation

pip install lht

🎯 Quick Start

Prerequisites

1. Salesforce Setup

Option A: Developer Org (Recommended for Testing)

  • Sign up for a Salesforce Developer Org (free)
  • Important: Developer Pro or above is preferred for testing LHT
  • Do NOT use your production Salesforce instance or production data

Option B: Trial Org

  • Sign up for a Salesforce Trial (free)
  • Choose a trial that includes the features you want to test

Option C: Sandbox

  • If you have a Developer Pro+ license, create a sandbox from your production org
  • Never test LHT in production

⚠️ Critical Requirements:

  • Administrative access to the Salesforce instance

🔧 OAuth2.0 Setup Required:

  1. Configure a Connected App for the OAuth2.0 Client Credentials Flow
  2. Retrieve Credentials: Once configured, get the Client ID and Client Secret and store them securely
  3. Get Your Domain: From Setup, search for "My Domain" and copy the subdomain (everything before '.my.salesforce.com')
    • Note: Sandbox instances will include 'sandbox' in the subdomain
  4. Store Securely: Keep Client ID, Client Secret, and subdomain together - you'll need them for LHT configuration

🚨 Salesforce Limitations: Since Salesforce was never really architected to deal with any meaningful amount of data, there will be limitations on what you can do:

  • API rate limits: 15,000 API calls per 24-hour period (Enterprise), 100,000 (Unlimited)
  • Bulk API limits: 10,000 records per batch, 10 concurrent jobs
  • Query limits: SOQL queries limited to 50,000 records
  • Storage limits: Varies by org type and edition

This is why we introduced LHT - to bridge these limitations and provide robust data integration capabilities.

2. Snowflake Setup

Free Trial Registration:

  • Sign up for a Snowflake free trial (free)
  • Choose a cloud provider (AWS, Azure, or GCP)
  • Select a region close to your Salesforce org

⚠️ Critical Requirements:

  • Account Admin privileges (required for initial setup)
  • Security Admin privileges (required for user and role management)
  • Database creation permissions
  • Warehouse creation permissions

🔑 Minimum Snowflake Roles Needed:

-- Account Admin (automatically granted)
-- Security Admin
-- Database Admin
-- Warehouse Admin

Basic Intelligent Sync

from lht.salesforce.intelligent_sync import sync_sobject_intelligent

# Sync Account object intelligently
result = sync_sobject_intelligent(
    session=session,
    access_info=access_info,
    sobject="Account",
    schema="RAW",
    table="ACCOUNTS",
    match_field="ID"
)

print(f"Synced {result['actual_records']} records using {result['sync_method']}")

🔧 How It Works

Decision Matrix

The system automatically selects the optimal sync method:

Scenario Records Method Description
First-time sync < 1,000 regular_api_full Use regular Salesforce API
First-time sync 1,000+ bulk_api_full Use Bulk API 2.0
Incremental sync < 1,000 regular_api_incremental Use regular API with merge logic
Incremental sync 1,000+ bulk_api_incremental Use Bulk API 2.0 with merge logic

Incremental Sync Logic

  1. Check Table Existence: Determines if target table exists
  2. Get Last Modified Date: Queries MAX(LASTMODIFIEDDATE) from existing table
  3. Estimate Record Count: Counts records modified since last sync
  4. Choose Method: Selects appropriate sync method based on count
  5. Execute Sync: Runs the chosen method

📚 Documentation

🔄 Sync Methods

1. Regular API Methods

  • Use cases: Small datasets (< 1,000 records)
  • Advantages: Fast for small datasets, real-time processing
  • Disadvantages: API rate limits, memory intensive

2. Bulk API 2.0 Methods

  • Use cases: Medium to large datasets (1,000+ records)
  • Advantages: Handles large datasets efficiently, built-in retry logic
  • Disadvantages: Requires job management, asynchronous processing

🛠️ Configuration

Custom Thresholds

from lht.salesforce.intelligent_sync import IntelligentSync

sync_system = IntelligentSync(session, access_info)
sync_system.BULK_API_THRESHOLD = 5000    # Use Bulk API for 5K+ records

Environment Setup

# Set appropriate warehouse size
session.sql("USE WAREHOUSE LARGE_WH").collect()

📊 Return Values

Sync functions return detailed information:

{
    'sobject': 'Account',
    'target_table': 'RAW.ACCOUNTS',
    'sync_method': 'bulk_api_incremental',
    'estimated_records': 1500,
    'actual_records': 1487,
    'sync_duration_seconds': 45.23,
    'last_modified_date': Timestamp('2024-01-15 10:30:00'),
    'sync_timestamp': Timestamp('2024-01-16 14:20:00'),
    'success': True,
    'error': None
}

🚨 Error Handling

The system includes comprehensive error handling for:

  • Authentication errors
  • Network issues
  • Job failures
  • Data errors

Errors are captured in the return value:

{
    'success': False,
    'error': 'Bulk API job failed with state: Failed',
    'records_processed': 0
}

🔧 Advanced Usage

Multiple Object Sync

objects_to_sync = [
    {"sobject": "Account", "table": "ACCOUNTS"},
    {"sobject": "Contact", "table": "CONTACTS"},
    {"sobject": "Opportunity", "table": "OPPORTUNITIES"}
]

results = []
for obj in objects_to_sync:
    result = sync_sobject_intelligent(
        session=session,
        access_info=access_info,
        sobject=obj['sobject'],
        schema="RAW",
        table=obj['table'],
        match_field="ID"
    )
    results.append(result)

Force Full Sync

# Useful for data refresh or after schema changes
result = sync_sobject_intelligent(
    session=session,
    access_info=access_info,
    sobject="Account",
    schema="RAW",
    table="ACCOUNTS",
    match_field="ID",
    force_full_sync=True  # Overwrites entire table
)

📈 Performance Considerations

Memory Usage

  • Regular API: Loads all data in memory
  • Bulk API: Processes in batches

Processing Time

  • Small datasets (< 1K): Regular API fastest
  • Medium to large datasets (1K+): Bulk API optimal

🤝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Submit a pull request

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lht-2.0.0.tar.gz (58.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lht-2.0.0-cp313-cp313-macosx_14_0_arm64.whl (893.4 kB view details)

Uploaded CPython 3.13macOS 14.0+ ARM64

File details

Details for the file lht-2.0.0.tar.gz.

File metadata

  • Download URL: lht-2.0.0.tar.gz
  • Upload date:
  • Size: 58.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for lht-2.0.0.tar.gz
Algorithm Hash digest
SHA256 da7680d938b600265d521c6d134051235b4e0cc30dc192a894515b0e2525fe59
MD5 67511c7063a5ec6461a73c5678d5b7e2
BLAKE2b-256 ea8dca11d59ee1ff0ef88f97baa43badf0c081b9bd925bcf057f6e1a55da110a

See more details on using hashes here.

File details

Details for the file lht-2.0.0-cp313-cp313-macosx_14_0_arm64.whl.

File metadata

  • Download URL: lht-2.0.0-cp313-cp313-macosx_14_0_arm64.whl
  • Upload date:
  • Size: 893.4 kB
  • Tags: CPython 3.13, macOS 14.0+ ARM64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for lht-2.0.0-cp313-cp313-macosx_14_0_arm64.whl
Algorithm Hash digest
SHA256 40edefdd31856cf1b1bad3849ce962886ba266a3a5e2a8caefc23b477ee01fcb
MD5 3cb726ba3f03e85525701ad48ac825e7
BLAKE2b-256 0d39c804a97005b97dbd36bebbbe169c791404c2e06dd8bca9982ffe5df21395

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page