Skip to main content

Lakehouse Tools for Snowflake and Salesforce

Project description

Lake House Tools (LHT) - Salesforce & Snowflake Integration

A comprehensive Python library for intelligent data synchronization between Salesforce and Snowflake, featuring automated method selection based on data volume and previous sync status.

🚀 Features

Intelligent Synchronization

  • Automatic Method Selection: Choose the best sync method based on data volume
  • Incremental Sync: Smart detection of changed records since last sync
  • Bulk API 2.0 Integration: Efficient handling of large datasets
  • Snowflake Stage Support: Optimized for Snowflake Notebook environments

Core Capabilities

  • Salesforce Bulk API 2.0: Full support for bulk operations
  • Snowflake Integration: Native Snowpark support
  • Data Type Mapping: Automatic Salesforce to Snowflake type conversion
  • Error Handling: Comprehensive error management and recovery
  • Performance Optimization: Stage-based processing for large datasets

📦 Installation

pip install lht

🎯 Quick Start

Basic Intelligent Sync

from lht.salesforce.intelligent_sync import sync_sobject_intelligent

# Sync Account object intelligently
result = sync_sobject_intelligent(
    session=session,
    access_info=access_info,
    sobject="Account",
    schema="RAW",
    table="ACCOUNTS",
    match_field="ID"
)

print(f"Synced {result['actual_records']} records using {result['sync_method']}")

Advanced Sync with Stage

# For large datasets in Snowflake Notebooks
result = sync_sobject_intelligent(
    session=session,
    access_info=access_info,
    sobject="Contact",
    schema="RAW",
    table="CONTACTS",
    match_field="ID",
    use_stage=True,
    stage_name="@SALESFORCE_STAGE"
)

🔧 How It Works

Decision Matrix

The system automatically selects the optimal sync method:

Scenario Records Method Description
First-time sync < 1,000 regular_api_full Use regular Salesforce API
First-time sync 1,000 - 49,999 bulk_api_full Use Bulk API 2.0
First-time sync ≥ 50,000 bulk_api_stage_full Use Bulk API 2.0 with Snowflake stage
Incremental sync < 1,000 regular_api_incremental Use regular API with merge logic
Incremental sync 1,000 - 49,999 bulk_api_incremental Use Bulk API 2.0
Incremental sync ≥ 50,000 bulk_api_stage_incremental Use Bulk API 2.0 with stage

Incremental Sync Logic

  1. Check Table Existence: Determines if target table exists
  2. Get Last Modified Date: Queries MAX(LASTMODIFIEDDATE) from existing table
  3. Estimate Record Count: Counts records modified since last sync
  4. Choose Method: Selects appropriate sync method based on count
  5. Execute Sync: Runs the chosen method

📚 Documentation

🔄 Sync Methods

1. Regular API Methods

  • Use cases: Small datasets (< 1,000 records)
  • Advantages: Fast for small datasets, real-time processing
  • Disadvantages: API rate limits, memory intensive

2. Bulk API 2.0 Methods

  • Use cases: Medium to large datasets (1,000+ records)
  • Advantages: Handles large datasets efficiently, built-in retry logic
  • Disadvantages: Requires job management, asynchronous processing

3. Stage-Based Methods

  • Use cases: Very large datasets (50,000+ records) in Snowflake Notebooks
  • Advantages: Handles massive datasets, better memory management
  • Disadvantages: Requires stage setup, Snowflake-specific

🛠️ Configuration

Custom Thresholds

from lht.salesforce.intelligent_sync import IntelligentSync

sync_system = IntelligentSync(session, access_info)
sync_system.BULK_API_THRESHOLD = 5000    # Use Bulk API for 5K+ records
sync_system.STAGE_THRESHOLD = 25000      # Use stage for 25K+ records

Environment Setup

# Create stage for large datasets
session.sql("CREATE OR REPLACE STAGE @SALESFORCE_STAGE").collect()

# Set appropriate warehouse size
session.sql("USE WAREHOUSE LARGE_WH").collect()

📊 Return Values

Sync functions return detailed information:

{
    'sobject': 'Account',
    'target_table': 'RAW.ACCOUNTS',
    'sync_method': 'bulk_api_incremental',
    'estimated_records': 1500,
    'actual_records': 1487,
    'sync_duration_seconds': 45.23,
    'last_modified_date': Timestamp('2024-01-15 10:30:00'),
    'sync_timestamp': Timestamp('2024-01-16 14:20:00'),
    'success': True,
    'error': None
}

🚨 Error Handling

The system includes comprehensive error handling for:

  • Authentication errors
  • Network issues
  • Job failures
  • Data errors

Errors are captured in the return value:

{
    'success': False,
    'error': 'Bulk API job failed with state: Failed',
    'records_processed': 0
}

🔧 Advanced Usage

Multiple Object Sync

objects_to_sync = [
    {"sobject": "Account", "table": "ACCOUNTS"},
    {"sobject": "Contact", "table": "CONTACTS"},
    {"sobject": "Opportunity", "table": "OPPORTUNITIES"}
]

results = []
for obj in objects_to_sync:
    result = sync_sobject_intelligent(
        session=session,
        access_info=access_info,
        sobject=obj['sobject'],
        schema="RAW",
        table=obj['table'],
        match_field="ID"
    )
    results.append(result)

Force Full Sync

# Useful for data refresh or after schema changes
result = sync_sobject_intelligent(
    session=session,
    access_info=access_info,
    sobject="Account",
    schema="RAW",
    table="ACCOUNTS",
    match_field="ID",
    force_full_sync=True  # Overwrites entire table
)

📈 Performance Considerations

Memory Usage

  • Regular API: Loads all data in memory
  • Bulk API: Processes in batches
  • Stage-based: Minimal memory usage

Processing Time

  • Small datasets (< 1K): Regular API fastest
  • Medium datasets (1K-50K): Bulk API optimal
  • Large datasets (> 50K): Stage-based best

🤝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Submit a pull request

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🔗 Related Documentation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lht-0.1.12.tar.gz (41.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lht-0.1.12-py3-none-any.whl (46.0 kB view details)

Uploaded Python 3

File details

Details for the file lht-0.1.12.tar.gz.

File metadata

  • Download URL: lht-0.1.12.tar.gz
  • Upload date:
  • Size: 41.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.9

File hashes

Hashes for lht-0.1.12.tar.gz
Algorithm Hash digest
SHA256 1f3be29b0be0b9cdd5b4dbfc8c6347d19a37e0d30471427e71ab78281c79f624
MD5 365eaf2a1da9d3b5e1d74c421abb14e5
BLAKE2b-256 a619909cdd1f1fdd4cbe64b8c6c17f1ccecffcac71de97b6f8fb5c43bba589fe

See more details on using hashes here.

File details

Details for the file lht-0.1.12-py3-none-any.whl.

File metadata

  • Download URL: lht-0.1.12-py3-none-any.whl
  • Upload date:
  • Size: 46.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.9

File hashes

Hashes for lht-0.1.12-py3-none-any.whl
Algorithm Hash digest
SHA256 e5b87c8d04db6126cfb65642c722ef5476abbde504bff7b38a3a595169300f4e
MD5 d0c315dd76ebd1ebe6a45a9dae530627
BLAKE2b-256 41c197700777bc76a8e25a1854c2b1499a9beeaaf2b20942469799f9bd03dfa6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page