Skip to main content

A Python package for supporting dataplatform-migration from on-prem to cloud

Project description

๐Ÿš€ Snowforge - Enterprise Data Integration Platform

Snowforge is a comprehensive Python package designed to streamline data integration and orchestration across cloud and on-premise systems. It provides a unified interface for managing data pipelines between AWS, Snowflake, Microsoft Fabric, Power BI, and various on-premise database systems.

๐Ÿ“‹ Table of Contents


๐ŸŽฏ Overview

Snowforge addresses the complexity of modern data engineering by providing:

  • Unified Configuration Management: Single TOML-based configuration for all integrations
  • Multi-Cloud Support: Seamless integration with AWS S3, Secrets Manager, and Snowflake
  • BI Report Distribution: Automated export and delivery of Power BI and Fabric reports
  • Extensible Architecture: Strategy pattern-based extractors for any database system
  • Production-Ready Logging: Console and Snowflake-backed execution tracking
  • Parallel Processing: Built-in support for concurrent data operations

๐Ÿ—๏ธ Architecture

Snowforge is organized into specialized modules:

Snowforge/
โ”œโ”€โ”€ AWSIntegration.py          # AWS S3 and Secrets Manager operations
โ”œโ”€โ”€ SnowflakeIntegration.py    # Snowflake connections and data loading
โ”œโ”€โ”€ SnowflakeLogging.py        # Task execution logging to Snowflake
โ”œโ”€โ”€ Config.py                  # Configuration management
โ”œโ”€โ”€ Logging.py                 # Centralized console logging
โ”œโ”€โ”€ DataMover/                 # Data extraction and movement engine
โ”‚   โ”œโ”€โ”€ DataMover.py
โ”‚   โ””โ”€โ”€ Extractors/            # Database-specific extractors
โ”œโ”€โ”€ Broadcaster/               # BI report export and distribution
โ”‚   โ”œโ”€โ”€ SnowforgeBroadcaster.py
โ”‚   โ”œโ”€โ”€ BroadcastConfig.py
โ”‚   โ”œโ”€โ”€ Email.py
โ”‚   โ””โ”€โ”€ adapters/              # Platform-specific exporters
โ””โ”€โ”€ FabricIntegration/         # Microsoft Fabric and Power BI API
    โ”œโ”€โ”€ FabricIntegration.py
    โ””โ”€โ”€ PowerAPI.py

โœจ Key Features

Data Integration

  • Multi-Source Extraction: Extract data from Netezza, Oracle, PostgreSQL, and more
  • Parallel Processing: Multi-threaded and multi-process data operations
  • Smart Chunking: Automatic file segmentation for large datasets
  • S3 Integration: Efficient uploads with progress tracking and multipart transfer

Snowflake Operations

  • Flexible Authentication: Key-pair or browser-based authentication
  • Connection Pooling: Automatic connection reuse and management
  • Snowpipe Streaming: Real-time data ingestion support
  • Data Loading: Direct COPY INTO operations from S3 stages

BI and Reporting

  • Report Broadcasting: Automated export and delivery of paginated reports
  • Multi-Channel Delivery: Email and Teams (extensible)
  • Template-Based Configuration: Jinja2 templating for dynamic parameters
  • Export Polling: Automatic status monitoring and download

Configuration & Security

  • Centralized Configuration: Single TOML file for all credentials and profiles
  • AWS Secrets Manager: Secure token and credential storage
  • Multiple Profiles: Support for dev, staging, and production environments
  • Environment Variable Override: Flexible configuration paths

Logging & Monitoring

  • Colored Console Output: Easy-to-read log levels with color coding
  • Snowflake Task Tracking: Persistent execution history in Snowflake
  • Verbose Mode: Detailed debugging information on demand
  • Execution Metadata: Start time, end time, status, and next execution tracking

๐Ÿ“ฅ Installation

From PyPI

pip install snowforge-package

From Source

git clone https://github.com/Norsk-Tipping/snowforge-package.git
cd snowforge-package
pip install -e .

Requirements

  • Python >= 3.12
  • Dependencies (automatically installed):
    • boto3
    • snowflake-connector-python
    • snowpipe-streaming
    • colored, coloredlogs
    • tqdm
    • toml
    • requests
    • jinja2

โš™๏ธ Configuration

Snowforge uses a snowforge_config.toml file for managing credentials and profiles. The file is searched in the following order:

  1. Path specified in the SNOWFORGE_CONFIG_PATH environment variable
  2. Current working directory: ./snowforge_config.toml
  3. User config directory: ~/.config/snowforge_config.toml
  4. Package installation directory

Configuration File Structure

# AWS Configuration
[AWS.default]
AWS_ACCESS_KEY = "your-access-key-id"
AWS_SECRET_KEY = "your-secret-access-key"
REGION = "us-east-1"

[AWS.production]
AWS_ACCESS_KEY = "prod-access-key"
AWS_SECRET_KEY = "prod-secret-key"
REGION = "eu-west-1"

# Snowflake Configuration
[SNOWFLAKE.default]
USERNAME = "your-username"
ACCOUNT = "your-account-identifier"
ROLE = "SYSADMIN"
SNOWFLAKE_WAREHOUSE = "COMPUTE_WH"

[SNOWFLAKE.svc_key_based]
USERNAME = "service_account"
ACCOUNT = "xy12345.us-east-1"
KEY_FILE_PATH = "/path/to/private_key.p8"
KEY_FILE_PASSWORD = "key_password"
SNOWFLAKE_DATABASE = "ANALYTICS"
SNOWFLAKE_SCHEMA = "PUBLIC"
SNOWFLAKE_WAREHOUSE = "ETL_WH"
ROLE = "ETL_ROLE"

# Required for SnowflakeLogging
[SNOWFLAKE.snowforge]
USERNAME = "logging_user"
ACCOUNT = "xy12345.us-east-1"
KEY_FILE_PATH = "/path/to/logging_key.p8"
KEY_FILE_PASSWORD = "key_password"
SNOWFLAKE_DATABASE = "LOGGING_DB"
SNOWFLAKE_SCHEMA = "TASK_LOGS"

Environment Variables

# Override config file location
export SNOWFORGE_CONFIG_PATH="/custom/path/snowforge_config.toml"

๐Ÿš€ Quick Start

Basic Workflow Example

from Snowforge import (
    AWSIntegration,
    SnowflakeIntegration,
    SnowflakeLogging,
    Debug
)
from datetime import datetime

# Initialize AWS
AWSIntegration.initialize(profile="default", verbose=True)

# Connect to Snowflake
conn = SnowflakeIntegration.connect(profile="svc_key_based", verbose=True)

# Log task execution
execution_id = SnowflakeLogging.log_start(
    task_id=1,
    process_id=12345,
    starttime=datetime.now()
)

# Your data processing logic here
Debug.log("Processing data...", level='INFO')

# Complete task logging
SnowflakeLogging.log_end(
    execution_id=execution_id,
    status="SUCCESS",
    log_path="/logs/task_1.log",
    endtime=datetime.now(),
    next_execution_time=None
)

# Clean up
SnowflakeIntegration.close_connection()

๐Ÿ“š Core Modules

AWSIntegration

Manages AWS S3 and Secrets Manager operations with automatic authentication and error handling.

Key Methods

initialize(profile: str = "default", verbose: bool = False)

Initialize AWS clients for S3 and Secrets Manager.

Example:

from Snowforge import AWSIntegration

# Initialize with default profile
AWSIntegration.initialize(profile="default")

# Initialize with production profile and verbose output
AWSIntegration.initialize(profile="production", verbose=True)
get_secret(secret_name: str, verbose: bool = False) -> dict | str | None

Retrieve secrets from AWS Secrets Manager.

Example:

# Get database credentials
db_creds = AWSIntegration.get_secret("production/database/credentials")

# Get API token
api_token = AWSIntegration.get_secret("powerbi/refresh_token")

if isinstance(db_creds, dict):
    username = db_creds.get("username")
    password = db_creds.get("password")
push_file_to_s3(bucket_name: str, file_to_upload: str, key: str, config: TransferConfig = None, verbose: bool = False)

Upload files to S3 with progress tracking and multipart support.

Example:

from boto3.s3.transfer import TransferConfig

# Configure for large files
transfer_config = AWSIntegration.define_s3_transfer_config(
    size_threshold=0.5,  # 500 MB threshold
    threads=10           # 10 concurrent threads
)

# Upload file
AWSIntegration.push_file_to_s3(
    bucket_name="my-data-bucket",
    file_to_upload="/tmp/large_dataset.csv",
    key="raw/2024/dataset.csv",
    config=transfer_config,
    verbose=True
)
get_bucket_contents(bucket_name: str, verbose: bool = False) -> list[str]

List all files in an S3 bucket.

Example:

# List all files in bucket
files = AWSIntegration.get_bucket_contents("my-data-bucket")

for file_key in files:
    print(f"Found: {file_key}")

SnowflakeIntegration

Establishes and manages connections to Snowflake with support for multiple authentication methods.

Key Methods

connect(user_name: str = None, account: str = None, profile: str = "default", verbose: bool = False)

Connect to Snowflake using various authentication methods.

Example:

from Snowforge import SnowflakeIntegration

# Method 1: Using profile with key-pair authentication
conn = SnowflakeIntegration.connect(profile="svc_key_based", verbose=True)

# Method 2: Using external browser authentication
conn = SnowflakeIntegration.connect(
    user_name="john.doe@company.com",
    account="xy12345.us-east-1"
)

# Method 3: Using default profile
conn = SnowflakeIntegration.connect()

# Execute queries
cursor = conn.cursor()
cursor.execute("SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()")
print(cursor.fetchone())
cursor.close()
load_to_snowflake(stage: str, stage_key: str, database: str, schema: str, table: str, profile: str = "default", verbose: bool = False)

Load data from Snowflake stage into table using COPY INTO.

Example:

# Load CSV file from internal stage
SnowflakeIntegration.load_to_snowflake(
    stage="MY_STAGE",
    stage_key="/data/2024/sales.csv",
    database="ANALYTICS",
    schema="RAW",
    table="SALES_DATA",
    profile="svc_key_based",
    verbose=True
)
truncate_table(database: str, schema: str, table: str, profile: str = "default", verbose: bool = False)

Truncate a Snowflake table.

Example:

# Truncate staging table before load
SnowflakeIntegration.truncate_table(
    database="ANALYTICS",
    schema="STAGING",
    table="TEMP_SALES",
    profile="svc_key_based"
)
connect_to_pipe(profile: str = "default", client_name: str = None, db_name: str = None, schema_name: str = None, pipe_name: str = None, verbose: bool = False)

Connect to Snowpipe Streaming for real-time data ingestion.

Example:

from Snowforge import SnowflakeIntegration
from datetime import datetime

# Connect to Snowpipe
client = SnowflakeIntegration.connect_to_pipe(
    profile="svc_key_based",
    client_name="LOG_STREAM",
    db_name="LOGGING_DB",
    schema_name="LOGS",
    pipe_name="LOG_PIPE"
)

# Stream data
data = {
    "SOURCE": "APPLICATION",
    "TIMESTAMP": datetime.now().isoformat(),
    "CRITICALITY": "INFO",
    "MESSAGE": "User login successful",
    "WORKFLOW_NAME": "Authentication",
    "WORKFLOW_INSTANCE": "12345"
}

SnowflakeIntegration.stream_to_pipe(client, data, channel_name="LOG_CHANNEL")

# Close connection
SnowflakeIntegration.close_pipe_connection(client)
close_connection()

Close the active Snowflake connection.

Example:

# Always close connections when done
SnowflakeIntegration.close_connection()

SnowflakeLogging

Provides structured task execution logging directly to Snowflake for audit trails and monitoring.

Prerequisites

Before using SnowflakeLogging, you must:

  1. Create a profile named snowforge in your config file
  2. Execute the SQL setup scripts in your Snowflake account

View setup requirements:

from Snowforge import SnowflakeLogging

# Print SQL scripts to console
sql_files = SnowflakeLogging.show_requirements(print_to_console=True)

Key Methods

log_start(task_id: int, process_id: int, starttime: datetime, verbose: bool = False) -> int

Log the start of a task execution and return an execution ID.

Example:

from Snowforge import SnowflakeLogging
from datetime import datetime
import hashlib

# Generate unique process ID
process_id = int(hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8], 16)

# Log task start
execution_id = SnowflakeLogging.log_start(
    task_id=42,
    process_id=process_id,
    starttime=datetime.now(),
    verbose=True
)

print(f"Task execution started: {execution_id}")
log_end(execution_id: int, status: str, log_path: str, endtime: datetime, next_execution_time: datetime = None, verbose: bool = False)

Log the completion of a task execution.

Example:

from datetime import datetime, timedelta

# Log successful completion
SnowflakeLogging.log_end(
    execution_id=execution_id,
    status="SUCCESS",
    log_path="/logs/2024/04/task_42.log",
    endtime=datetime.now(),
    next_execution_time=datetime.now() + timedelta(hours=24),
    verbose=True
)

# Log failure
SnowflakeLogging.log_end(
    execution_id=execution_id,
    status="FAILED",
    log_path="/logs/2024/04/task_42_error.log",
    endtime=datetime.now(),
    next_execution_time=None  # No next execution on failure
)

Complete Logging Example

from Snowforge import SnowflakeLogging, Debug
from datetime import datetime
import hashlib

def run_data_pipeline():
    """Example data pipeline with Snowflake logging."""

    # Generate process ID
    process_id = int(hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8], 16)

    # Start logging
    execution_id = SnowflakeLogging.log_start(
        task_id=100,
        process_id=process_id,
        starttime=datetime.now()
    )

    try:
        # Your pipeline logic here
        Debug.log("Extracting data...", 'INFO')
        Debug.log("Transforming data...", 'INFO')
        Debug.log("Loading to target...", 'INFO')

        # Log success
        SnowflakeLogging.log_end(
            execution_id=execution_id,
            status="SUCCESS",
            log_path=f"/logs/pipeline_{execution_id}.log",
            endtime=datetime.now(),
            next_execution_time=None
        )

    except Exception as e:
        Debug.log(f"Pipeline failed: {e}", 'ERROR')

        # Log failure
        SnowflakeLogging.log_end(
            execution_id=execution_id,
            status="FAILED",
            log_path=f"/logs/pipeline_{execution_id}_error.log",
            endtime=datetime.now(),
            next_execution_time=None
        )
        raise

if __name__ == "__main__":
    run_data_pipeline()

Config

Centralized configuration management with automatic file discovery and profile selection.

Key Methods

get_snowflake_credentials(profile: str = "default", verbose: bool = False) -> dict

Retrieve Snowflake credentials for a specific profile.

Example:

from Snowforge import Config

# Get default profile
creds = Config.get_snowflake_credentials()

# Get specific profile
prod_creds = Config.get_snowflake_credentials(profile="production", verbose=True)

print(f"Username: {prod_creds['USERNAME']}")
print(f"Account: {prod_creds['ACCOUNT']}")
get_aws_credentials(profile: str = "default", verbose: bool = False) -> dict

Retrieve AWS credentials for a specific profile.

Example:

# Get AWS credentials
aws_creds = Config.get_aws_credentials(profile="production")

print(f"Region: {aws_creds['REGION']}")
print(f"Access Key: {aws_creds['AWS_ACCESS_KEY'][:8]}...")
find_config_file(config_paths: list = None, verbose: bool = False) -> str | None

Locate the configuration file.

Example:

# Find config file
config_path = Config.find_config_file(verbose=True)

if config_path:
    print(f"Using config: {config_path}")
else:
    print("No config file found")

Logging

Centralized console logging with colored output for better visibility.

Key Methods

log(message: str, level: str = 'INFO', verbose_logging: bool = False)

Log messages with colored output based on severity level.

Example:

from Snowforge import Debug

# Basic logging
Debug.log("Application started", 'INFO')
Debug.log("Configuration loaded", 'SUCCESS')
Debug.log("Missing parameter", 'WARNING')
Debug.log("Connection failed", 'ERROR')
Debug.log("System crash", 'CRITICAL')

# Debug logging (only shown when verbose=True)
Debug.log("Variable value: 42", 'DEBUG', verbose_logging=True)

# Custom formatting
Debug.log(
    f"Processing file: large_dataset.csv\n"
    f"Size: 1.5 GB\n"
    f"Rows: 10,000,000",
    'INFO'
)

Log Levels

Level Color Usage
INFO White General information
SUCCESS Green Successful operations
WARNING Yellow Warning messages
ERROR Red Error messages
CRITICAL Bright Red Critical failures
DEBUG Blue Debug information (verbose mode only)

DataMover

Extensible data extraction and movement engine with parallel processing support.

Key Classes

Engine

Orchestrates data extraction and file operations.

Key Methods:

export_to_file(extractor: ExtractorStrategy, output_path: str, fully_qualified_table_name: str, filter_statement: str = None, verbose: bool = False) -> tuple

Example:

from Snowforge.DataMover import Engine
from Snowforge.DataMover.Extractors.NetezzaExtractor import NetezzaExtractor

# Initialize extractor
extractor = NetezzaExtractor()

# Export table to file
header, csv_file = Engine.export_to_file(
    extractor=extractor,
    output_path="/tmp/exports",
    fully_qualified_table_name="PROD_DB.SALES.TRANSACTIONS",
    filter_statement="WHERE transaction_date >= '2024-01-01'",
    verbose=True
)

print(f"Header: {header}")
print(f"Output file: {csv_file}")
parallel_process(worker_func: object, args_list: list[tuple], num_workers: int = None) -> list

Execute functions in parallel processes.

Example:

def process_chunk(chunk_id, data_file, start_offset, end_offset):
    """Process a chunk of data."""
    print(f"Processing chunk {chunk_id}: {start_offset} to {end_offset}")
    # Processing logic here

# Prepare work items
args_list = [
    (1, "/tmp/data.csv", 0, 1000000),
    (2, "/tmp/data.csv", 1000000, 2000000),
    (3, "/tmp/data.csv", 2000000, 3000000),
]

# Execute in parallel
processes = Engine.parallel_process(
    worker_func=process_chunk,
    args_list=args_list,
    num_workers=3
)

# Wait for completion
for process in processes:
    process.join()
calculate_chunks(external_table: str, compression: int = 4) -> int

Calculate optimal number of chunks for a file.

Example:

# Calculate chunks for large file
num_chunks = Engine.calculate_chunks(
    external_table="/tmp/large_export.csv",
    compression=4
)

print(f"File will be split into {num_chunks} chunks")
ExtractorStrategy

Abstract base class for database-specific extractors.

Implementing a Custom Extractor:

from Snowforge.DataMover.Extractors.ExtractorStrategy import ExtractorStrategy

class PostgreSQLExtractor(ExtractorStrategy):
    """PostgreSQL data extractor implementation."""

    def __init__(self, connection_string):
        self.connection_string = connection_string

    def extract_table_query(self, fully_qualified_table_name: str,
                           filter_statement: str = None,
                           verbose: bool = False):
        """Build extraction query for PostgreSQL."""
        query = f"SELECT * FROM {fully_qualified_table_name}"
        if filter_statement:
            query += f" {filter_statement}"
        return query

    def list_all_tables(self, database_name: str, verbose: bool = False):
        """List all tables in PostgreSQL database."""
        query = """
            SELECT table_schema, table_name
            FROM information_schema.tables
            WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
        """
        # Execute query and return results
        pass

    def export_external_table(self, output_path: str,
                            table_name: str,
                            filter_statement: str = None,
                            verbose: bool = False):
        """Export PostgreSQL table to CSV file."""
        # Implementation here
        pass

# Use custom extractor
extractor = PostgreSQLExtractor("postgresql://user:pass@host:5432/db")
header, file = Engine.export_to_file(
    extractor=extractor,
    output_path="/tmp/exports",
    fully_qualified_table_name="public.customers"
)

Broadcaster

Automated BI report export and multi-channel delivery system.

Key Classes

SnowforgeBroadcaster

Main orchestrator for report broadcasting.

Example:

from Snowforge.Broadcaster import (
    SnowforgeBroadcaster,
    BroadcastConfig,
    SmtpConfig
)
from Snowforge.Broadcaster.adapters import FabricExporter
from Snowforge import FabricIntegration

# Initialize Fabric integration
fabric = FabricIntegration(
    aws_secret_name="powerbi_token",
    aws_profile="production"
)

# Create exporter
exporter = FabricExporter(fabric)

# Configure SMTP
smtp = SmtpConfig(
    host="smtp.company.com",
    port=587,
    username="reports@company.com",
    password="smtp_password",
    use_tls=True
)

# Create broadcaster
broadcaster = SnowforgeBroadcaster(
    exporter=exporter,
    smtp=smtp,
    poll_interval=5.0,
    max_poll_attempts=240
)

# Configure broadcast
config = BroadcastConfig(
    workspace_id="12345678-1234-1234-1234-123456789abc",
    report_id="87654321-4321-4321-4321-cba987654321",
    channel="email",
    sender="reports@company.com",
    recipients=["user1@company.com", "user2@company.com"],
    subject="Monthly Sales Report",
    body="Please find attached the monthly sales report.",
    parameters={"ReportMonth": "2024-04", "Region": "North"}
)

# Execute broadcast
result = broadcaster.broadcast(config)
print(f"Report delivered: {result.filename}")
BroadcastConfig

Configuration for report broadcasts.

Example:

from Snowforge.Broadcaster import BroadcastConfig

# Email broadcast
email_config = BroadcastConfig(
    workspace_id="workspace-guid",
    report_id="report-guid",
    channel="email",
    sender="noreply@company.com",
    recipients=["team@company.com"],
    subject="Daily Dashboard - {date}",
    body="<h1>Daily Dashboard</h1><p>See attached report.</p>",
    parameters={"date": "2024-04-15"}
)

# Teams broadcast (when implemented)
teams_config = BroadcastConfig(
    workspace_id="workspace-guid",
    report_id="report-guid",
    channel="teams",
    teams_webhook_url="https://outlook.office.com/webhook/...",
    parameters={"quarter": "Q1-2024"}
)

Configuration Providers

Load broadcast configurations from multiple sources.

JSON File Provider

Example:

from Snowforge.Broadcaster import JsonFileConfigProvider

# Load from JSON file
provider = JsonFileConfigProvider("/config/broadcasts.json")
configs = provider.get_all_configs()

for config in configs:
    broadcaster.broadcast(config)

broadcasts.json:

[
  {
    "workspace_id": "12345678-1234-1234-1234-123456789abc",
    "report_id": "87654321-4321-4321-4321-cba987654321",
    "channel": "email",
    "sender": "reports@company.com",
    "recipients": ["user@company.com"],
    "subject": "Report",
    "body": "See attached",
    "parameters": {}
  }
]
Snowflake Config Provider

Example:

from Snowforge.Broadcaster import SnowflakeConfigProvider
from Snowforge import SnowflakeIntegration

# Connect to Snowflake
conn = SnowflakeIntegration.connect(profile="svc_key_based")

# Load configs from table
provider = SnowflakeConfigProvider(
    connection=conn,
    database="CONFIG_DB",
    schema="BROADCAST",
    table="REPORT_CONFIGS"
)

configs = provider.get_all_configs()

FabricIntegration

Microsoft Fabric and Power BI API integration for pipeline management, semantic model refresh, and report export.

Key Methods

test_authentication() -> bool

Verify API token validity.

Example:

from Snowforge import FabricIntegration

fabric = FabricIntegration(
    aws_secret_name="powerbi_token",
    aws_profile="default"
)

if fabric.test_authentication():
    print("Authentication successful")
else:
    print("Authentication failed")
start_pipeline(workspace_id: str, item_id: str) -> str

Start a Fabric data pipeline.

Example:

# Start pipeline
execution_id = fabric.start_pipeline(
    workspace_id="12345678-1234-1234-1234-123456789abc",
    item_id="pipeline-item-id"
)

if execution_id != "Error":
    print(f"Pipeline started: {execution_id}")
check_pipeline_last_status(workspace_id: str, item_id: str) -> tuple[str, str]

Check pipeline execution status.

Example:

run_id, status = fabric.check_pipeline_last_status(
    workspace_id="workspace-guid",
    item_id="pipeline-item-id"
)

print(f"Pipeline {run_id}: {status}")
semantic_model_reload(workspace_id: str, item_id: str) -> str

Trigger semantic model refresh.

Example:

result = fabric.semantic_model_reload(
    workspace_id="workspace-guid",
    item_id="model-item-id"
)

if result == "Completed":
    print("Refresh initiated successfully")
export_paginated_report(access_token: str, workspace_id: str, report_id: str, input_data: dict = None, file_format: str = "PDF")

Export a paginated report.

Example:

# Export with parameters
response = fabric.export_paginated_report(
    access_token=fabric.access_token,
    workspace_id="workspace-guid",
    report_id="report-guid",
    input_data={
        "StartDate": "2024-01-01",
        "EndDate": "2024-03-31",
        "Department": "Sales"
    },
    file_format="XLSX"
)

if response and response.status_code == 202:
    export_id = response.json().get("id")
    print(f"Export initiated: {export_id}")
download_report(workspace_id: str, report_id: str, export_id: str) -> tuple[bytes, str]

Download exported report.

Example:

# Download report
content, filename = fabric.download_report(
    workspace_id="workspace-guid",
    report_id="report-guid",
    export_id="export-guid"
)

# Save to file
with open(f"/reports/{filename}", "wb") as f:
    f.write(content)

print(f"Downloaded: {filename} ({len(content)} bytes)")

Complete Fabric Workflow

from Snowforge import FabricIntegration, Debug
import time

# Initialize
fabric = FabricIntegration(aws_secret_name="powerbi_token")

# 1. Refresh data pipeline
Debug.log("Starting data pipeline...", 'INFO')
pipeline_id = fabric.start_pipeline(
    workspace_id="workspace-guid",
    item_id="pipeline-item-id"
)

# Wait for pipeline completion
while True:
    run_id, status = fabric.check_pipeline_last_status(
        workspace_id="workspace-guid",
        item_id="pipeline-item-id"
    )

    if status == "Completed":
        Debug.log("Pipeline completed successfully", 'SUCCESS')
        break
    elif status == "Failed":
        Debug.log("Pipeline failed", 'ERROR')
        break

    time.sleep(10)

# 2. Refresh semantic model
Debug.log("Refreshing semantic model...", 'INFO')
fabric.semantic_model_reload(
    workspace_id="workspace-guid",
    item_id="model-item-id"
)

# 3. Export report
Debug.log("Exporting report...", 'INFO')
response = fabric.export_paginated_report(
    access_token=fabric.access_token,
    workspace_id="workspace-guid",
    report_id="report-guid",
    file_format="PDF"
)

export_id = response.json().get("id")

# 4. Wait for export and download
while True:
    status_code = fabric.check_report_status(
        workspace_id="workspace-guid",
        report_id="report-guid",
        export_id=export_id
    )

    if status_code == 200:
        content, filename = fabric.download_report(
            workspace_id="workspace-guid",
            report_id="report-guid",
            export_id=export_id
        )

        with open(f"/reports/{filename}", "wb") as f:
            f.write(content)

        Debug.log(f"Report saved: {filename}", 'SUCCESS')
        break

    time.sleep(5)

๐ŸŽ“ Advanced Usage

End-to-End Data Pipeline

from Snowforge import (
    AWSIntegration,
    SnowflakeIntegration,
    SnowflakeLogging,
    Debug
)
from Snowforge.DataMover import Engine
from Snowforge.DataMover.Extractors.NetezzaExtractor import NetezzaExtractor
from datetime import datetime
import hashlib

def etl_pipeline():
    """Complete ETL pipeline example."""

    # 1. Initialize connections
    Debug.log("Initializing connections...", 'INFO')
    AWSIntegration.initialize(profile="production")
    SnowflakeIntegration.connect(profile="svc_key_based")

    # 2. Start logging
    process_id = int(hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8], 16)
    execution_id = SnowflakeLogging.log_start(
        task_id=200,
        process_id=process_id,
        starttime=datetime.now()
    )

    try:
        # 3. Extract data from source
        Debug.log("Extracting data from Netezza...", 'INFO')
        extractor = NetezzaExtractor()
        header, csv_file = Engine.export_to_file(
            extractor=extractor,
            output_path="/tmp/extracts",
            fully_qualified_table_name="PROD.SALES.ORDERS",
            filter_statement="WHERE order_date >= CURRENT_DATE - 7",
            verbose=True
        )

        # 4. Upload to S3
        Debug.log("Uploading to S3...", 'INFO')
        s3_key = f"raw/orders/{datetime.now().strftime('%Y/%m/%d')}/orders.csv"
        AWSIntegration.push_file_to_s3(
            bucket_name="data-lake",
            file_to_upload=csv_file,
            key=s3_key,
            verbose=True
        )

        # 5. Truncate target table
        Debug.log("Truncating target table...", 'INFO')
        SnowflakeIntegration.truncate_table(
            database="ANALYTICS",
            schema="STAGING",
            table="ORDERS_STAGING"
        )

        # 6. Load to Snowflake
        Debug.log("Loading to Snowflake...", 'INFO')
        SnowflakeIntegration.load_to_snowflake(
            stage="S3_STAGE",
            stage_key=f"/{s3_key}",
            database="ANALYTICS",
            schema="STAGING",
            table="ORDERS_STAGING",
            verbose=True
        )

        # 7. Log success
        SnowflakeLogging.log_end(
            execution_id=execution_id,
            status="SUCCESS",
            log_path=f"/logs/etl_{execution_id}.log",
            endtime=datetime.now(),
            next_execution_time=None
        )

        Debug.log("Pipeline completed successfully!", 'SUCCESS')

    except Exception as e:
        Debug.log(f"Pipeline failed: {e}", 'ERROR')

        SnowflakeLogging.log_end(
            execution_id=execution_id,
            status="FAILED",
            log_path=f"/logs/etl_{execution_id}_error.log",
            endtime=datetime.now(),
            next_execution_time=None
        )
        raise

    finally:
        SnowflakeIntegration.close_connection()

if __name__ == "__main__":
    etl_pipeline()

Multi-Report Broadcasting

from Snowforge.Broadcaster import (
    SnowforgeBroadcaster,
    SnowflakeConfigProvider,
    SmtpConfig
)
from Snowforge.Broadcaster.adapters import FabricExporter
from Snowforge import FabricIntegration, SnowflakeIntegration, Debug

def broadcast_reports():
    """Broadcast multiple reports from Snowflake configuration."""

    # Initialize connections
    fabric = FabricIntegration(aws_secret_name="powerbi_token")
    sf_conn = SnowflakeIntegration.connect(profile="svc_key_based")

    # Configure broadcaster
    broadcaster = SnowforgeBroadcaster(
        exporter=FabricExporter(fabric),
        smtp=SmtpConfig(
            host="smtp.company.com",
            port=587,
            username="reports@company.com",
            password="password",
            use_tls=True
        )
    )

    # Load configurations from Snowflake
    provider = SnowflakeConfigProvider(
        connection=sf_conn,
        database="CONFIG",
        schema="REPORTS",
        table="BROADCAST_CONFIGS"
    )

    configs = provider.get_all_configs()
    Debug.log(f"Found {len(configs)} report configurations", 'INFO')

    # Broadcast each report
    for i, config in enumerate(configs, 1):
        Debug.log(f"Broadcasting report {i}/{len(configs)}: {config.subject}", 'INFO')

        try:
            result = broadcaster.broadcast(config)
            Debug.log(f"Report {i} delivered: {result.filename}", 'SUCCESS')
        except Exception as e:
            Debug.log(f"Failed to broadcast report {i}: {e}", 'ERROR')
            continue

    # Cleanup
    SnowflakeIntegration.close_connection()

if __name__ == "__main__":
    broadcast_reports()

๐Ÿ”ง Extending Snowforge

Creating a Custom Database Extractor

from Snowforge.DataMover.Extractors.ExtractorStrategy import ExtractorStrategy
import pyodbc
import csv

class OracleExtractor(ExtractorStrategy):
    """Oracle database extractor."""

    def __init__(self, dsn: str, username: str, password: str):
        self.dsn = dsn
        self.username = username
        self.password = password
        self.connection = None

    def connect(self):
        """Establish Oracle connection."""
        if not self.connection:
            self.connection = pyodbc.connect(
                f"DSN={self.dsn};UID={self.username};PWD={self.password}"
            )

    def extract_table_query(self, fully_qualified_table_name: str,
                           filter_statement: str = None,
                           verbose: bool = False) -> str:
        """Build Oracle extraction query."""
        parts = fully_qualified_table_name.split('.')

        if len(parts) == 3:
            schema, table = parts[1], parts[2]
        elif len(parts) == 2:
            schema, table = parts
        else:
            table = parts[0]
            schema = self.username.upper()

        query = f"SELECT * FROM {schema}.{table}"

        if filter_statement:
            query += f" {filter_statement}"

        return query

    def list_all_tables(self, database_name: str, verbose: bool = False) -> list:
        """List all accessible tables in Oracle."""
        self.connect()
        cursor = self.connection.cursor()

        query = """
            SELECT owner, table_name
            FROM all_tables
            WHERE owner NOT IN ('SYS', 'SYSTEM')
            ORDER BY owner, table_name
        """

        cursor.execute(query)
        tables = [(row.owner, row.table_name) for row in cursor.fetchall()]
        cursor.close()

        return tables

    def export_external_table(self, output_path: str,
                             table_name: str,
                             filter_statement: str = None,
                             verbose: bool = False) -> tuple:
        """Export Oracle table to CSV."""
        from Snowforge import Debug

        self.connect()
        cursor = self.connection.cursor()

        # Build and execute query
        query = self.extract_table_query(table_name, filter_statement, verbose)
        Debug.log(f"Executing: {query}", 'DEBUG', verbose)

        cursor.execute(query)

        # Get column headers
        headers = [desc[0] for desc in cursor.description]

        # Export to CSV
        import os
        os.makedirs(output_path, exist_ok=True)

        table_short = table_name.split('.')[-1]
        csv_file = os.path.join(output_path, f"{table_short}.csv")

        with open(csv_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f, delimiter='|')
            writer.writerow(headers)

            row_count = 0
            while True:
                rows = cursor.fetchmany(10000)
                if not rows:
                    break

                writer.writerows(rows)
                row_count += len(rows)

        cursor.close()
        Debug.log(f"Exported {row_count} rows to {csv_file}", 'SUCCESS', verbose)

        return headers, csv_file

    def close(self):
        """Close Oracle connection."""
        if self.connection:
            self.connection.close()
            self.connection = None

# Usage
extractor = OracleExtractor(
    dsn="ORACLE_PROD",
    username="etl_user",
    password="password"
)

from Snowforge.DataMover import Engine

header, csv_file = Engine.export_to_file(
    extractor=extractor,
    output_path="/tmp/oracle_exports",
    fully_qualified_table_name="HR.EMPLOYEES",
    filter_statement="WHERE hire_date >= TO_DATE('2024-01-01', 'YYYY-MM-DD')"
)

extractor.close()

Creating a Custom Report Exporter

from Snowforge.Broadcaster.ReportExporter import ReportExporter, ExportResult
from typing import Any

class QlikSenseExporter(ReportExporter):
    """Qlik Sense report exporter."""

    def __init__(self, qlik_api_client):
        self.client = qlik_api_client

    def start_export(self, workspace_id: str, report_id: str,
                    parameters: dict[str, Any] | None = None) -> str:
        """Initiate Qlik report export."""

        # Build export request
        export_config = {
            "appId": report_id,
            "format": "pdf",
            "selections": parameters or {}
        }

        # Submit to Qlik API
        response = self.client.start_export(export_config)
        export_id = response.get("exportId")

        return export_id

    def poll_status(self, workspace_id: str, report_id: str,
                   export_id: str) -> str:
        """Check export status."""

        status_response = self.client.get_export_status(export_id)

        # Map Qlik status to standard status
        qlik_status = status_response.get("status")

        if qlik_status == "FINISHED":
            return "completed"
        elif qlik_status == "FAILED":
            return "failed"
        else:
            return "running"

    def download(self, workspace_id: str, report_id: str,
                export_id: str) -> ExportResult:
        """Download exported report."""

        # Download from Qlik
        file_bytes = self.client.download_export(export_id)
        filename = f"qlik_report_{export_id}.pdf"

        return ExportResult(content=file_bytes, filename=filename)

# Usage with Broadcaster
from Snowforge.Broadcaster import SnowforgeBroadcaster, BroadcastConfig, SmtpConfig

qlik_client = ...  # Initialize Qlik client
exporter = QlikSenseExporter(qlik_client)

broadcaster = SnowforgeBroadcaster(
    exporter=exporter,
    smtp=SmtpConfig(host="smtp.company.com", port=25)
)

config = BroadcastConfig(
    workspace_id="qlik-workspace",
    report_id="app-guid",
    channel="email",
    sender="reports@company.com",
    recipients=["user@company.com"],
    subject="Qlik Report",
    body="See attached",
    parameters={"Year": "2024"}
)

broadcaster.broadcast(config)

๐Ÿ“œ License

This project is licensed under the MIT License. See the LICENSE file for details.


๐Ÿค Contributing

We welcome contributions, suggestions, and collaboration!

How to Contribute

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/my-new-feature
  3. Commit your changes: git commit -am 'Add new feature'
  4. Push to the branch: git push origin feature/my-new-feature
  5. Submit a pull request

Reporting Issues

If you encounter bugs or have feature requests, please open an issue on our GitHub repository.

Contact

For questions about using Snowforge or collaboration opportunities:

Vi oppfordrer til รฅ ta kontakt dersom du har forslag til forbedringer, spรธrsmรฅl om bruken av Snowforge, eller รธnsker samarbeid. Ditt bidrag er alltid velkommen!


๐Ÿ™ Acknowledgments

Snowforge is maintained by Norsk Tipping and built on top of excellent open-source libraries:


Happy Data Engineering! ๐Ÿš€

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snowforge_package-0.4.4.tar.gz (59.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

snowforge_package-0.4.4-py3-none-any.whl (48.8 kB view details)

Uploaded Python 3

File details

Details for the file snowforge_package-0.4.4.tar.gz.

File metadata

  • Download URL: snowforge_package-0.4.4.tar.gz
  • Upload date:
  • Size: 59.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for snowforge_package-0.4.4.tar.gz
Algorithm Hash digest
SHA256 a3f6c65d139a0525dd5b095c23a2fd8a0460dcc98bbc4c67a86bf05306a367ef
MD5 78bbcdbd9d38bdc0deac7e9c7b799977
BLAKE2b-256 4f53375b9d805c746a01962ae0d15f68f59595bee6c9eec7f98cf71930a7f22f

See more details on using hashes here.

File details

Details for the file snowforge_package-0.4.4-py3-none-any.whl.

File metadata

File hashes

Hashes for snowforge_package-0.4.4-py3-none-any.whl
Algorithm Hash digest
SHA256 d8d9e3d1fdc3f03a600ad55664ff204d4a7169380827bff8b4336c02f2bbf64b
MD5 230dc1168d09f22da4cb099ecef03335
BLAKE2b-256 0d135f2b8a94fb137b4d6750efe6912fe83b5e6bb43fe28152a9822e6ea0442e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page