A Python package for supporting dataplatform-migration from on-prem to cloud
Project description
๐ Snowforge - Enterprise Data Integration Platform
Snowforge is a comprehensive Python package designed to streamline data integration and orchestration across cloud and on-premise systems. It provides a unified interface for managing data pipelines between AWS, Snowflake, Microsoft Fabric, Power BI, and various on-premise database systems.
๐ Table of Contents
- Overview
- Key Features
- Installation
- Configuration
- Quick Start
- Core Modules
- Advanced Usage
- Extending Snowforge
- License
- Contributing
๐ฏ Overview
Snowforge addresses the complexity of modern data engineering by providing:
- Unified Configuration Management: Single TOML-based configuration for all integrations
- Multi-Cloud Support: Seamless integration with AWS S3, Secrets Manager, and Snowflake
- BI Report Distribution: Automated export and delivery of Power BI and Fabric reports
- Extensible Architecture: Strategy pattern-based extractors for any database system
- Production-Ready Logging: Console and Snowflake-backed execution tracking
- Parallel Processing: Built-in support for concurrent data operations
๐๏ธ Architecture
Snowforge is organized into specialized modules:
Snowforge/
โโโ AWSIntegration.py # AWS S3 and Secrets Manager operations
โโโ SnowflakeIntegration.py # Snowflake connections and data loading
โโโ SnowflakeLogging.py # Task execution logging to Snowflake
โโโ Config.py # Configuration management
โโโ Logging.py # Centralized console logging
โโโ DataMover/ # Data extraction and movement engine
โ โโโ DataMover.py
โ โโโ Extractors/ # Database-specific extractors
โโโ Broadcaster/ # BI report export and distribution
โ โโโ SnowforgeBroadcaster.py
โ โโโ BroadcastConfig.py
โ โโโ Email.py
โ โโโ adapters/ # Platform-specific exporters
โโโ FabricIntegration/ # Microsoft Fabric and Power BI API
โโโ FabricIntegration.py
โโโ PowerAPI.py
โจ Key Features
Data Integration
- Multi-Source Extraction: Extract data from Netezza, Oracle, PostgreSQL, and more
- Parallel Processing: Multi-threaded and multi-process data operations
- Smart Chunking: Automatic file segmentation for large datasets
- S3 Integration: Efficient uploads with progress tracking and multipart transfer
Snowflake Operations
- Flexible Authentication: Key-pair or browser-based authentication
- Connection Pooling: Automatic connection reuse and management
- Snowpipe Streaming: Real-time data ingestion support
- Data Loading: Direct COPY INTO operations from S3 stages
BI and Reporting
- Report Broadcasting: Automated export and delivery of paginated reports
- Multi-Channel Delivery: Email and Teams (extensible)
- Template-Based Configuration: Jinja2 templating for dynamic parameters
- Export Polling: Automatic status monitoring and download
Configuration & Security
- Centralized Configuration: Single TOML file for all credentials and profiles
- AWS Secrets Manager: Secure token and credential storage
- Multiple Profiles: Support for dev, staging, and production environments
- Environment Variable Override: Flexible configuration paths
Logging & Monitoring
- Colored Console Output: Easy-to-read log levels with color coding
- Snowflake Task Tracking: Persistent execution history in Snowflake
- Verbose Mode: Detailed debugging information on demand
- Execution Metadata: Start time, end time, status, and next execution tracking
๐ฅ Installation
From PyPI
pip install snowforge-package
From Source
git clone https://github.com/Norsk-Tipping/snowforge-package.git
cd snowforge-package
pip install -e .
Requirements
- Python >= 3.12
- Dependencies (automatically installed):
- boto3
- snowflake-connector-python
- snowpipe-streaming
- colored, coloredlogs
- tqdm
- toml
- requests
- jinja2
โ๏ธ Configuration
Snowforge uses a snowforge_config.toml file for managing credentials and profiles. The file is searched in the following order:
- Path specified in the
SNOWFORGE_CONFIG_PATHenvironment variable - Current working directory:
./snowforge_config.toml - User config directory:
~/.config/snowforge_config.toml - Package installation directory
Configuration File Structure
# AWS Configuration
[AWS.default]
AWS_ACCESS_KEY = "your-access-key-id"
AWS_SECRET_KEY = "your-secret-access-key"
REGION = "us-east-1"
[AWS.production]
AWS_ACCESS_KEY = "prod-access-key"
AWS_SECRET_KEY = "prod-secret-key"
REGION = "eu-west-1"
# Snowflake Configuration
[SNOWFLAKE.default]
USERNAME = "your-username"
ACCOUNT = "your-account-identifier"
ROLE = "SYSADMIN"
SNOWFLAKE_WAREHOUSE = "COMPUTE_WH"
[SNOWFLAKE.svc_key_based]
USERNAME = "service_account"
ACCOUNT = "xy12345.us-east-1"
KEY_FILE_PATH = "/path/to/private_key.p8"
KEY_FILE_PASSWORD = "key_password"
SNOWFLAKE_DATABASE = "ANALYTICS"
SNOWFLAKE_SCHEMA = "PUBLIC"
SNOWFLAKE_WAREHOUSE = "ETL_WH"
ROLE = "ETL_ROLE"
# Required for SnowflakeLogging
[SNOWFLAKE.snowforge]
USERNAME = "logging_user"
ACCOUNT = "xy12345.us-east-1"
KEY_FILE_PATH = "/path/to/logging_key.p8"
KEY_FILE_PASSWORD = "key_password"
SNOWFLAKE_DATABASE = "LOGGING_DB"
SNOWFLAKE_SCHEMA = "TASK_LOGS"
Environment Variables
# Override config file location
export SNOWFORGE_CONFIG_PATH="/custom/path/snowforge_config.toml"
๐ Quick Start
Basic Workflow Example
from Snowforge import (
AWSIntegration,
SnowflakeIntegration,
SnowflakeLogging,
Debug
)
from datetime import datetime
# Initialize AWS
AWSIntegration.initialize(profile="default", verbose=True)
# Connect to Snowflake
conn = SnowflakeIntegration.connect(profile="svc_key_based", verbose=True)
# Log task execution
execution_id = SnowflakeLogging.log_start(
task_id=1,
process_id=12345,
starttime=datetime.now()
)
# Your data processing logic here
Debug.log("Processing data...", level='INFO')
# Complete task logging
SnowflakeLogging.log_end(
execution_id=execution_id,
status="SUCCESS",
log_path="/logs/task_1.log",
endtime=datetime.now(),
next_execution_time=None
)
# Clean up
SnowflakeIntegration.close_connection()
๐ Core Modules
AWSIntegration
Manages AWS S3 and Secrets Manager operations with automatic authentication and error handling.
Key Methods
initialize(profile: str = "default", verbose: bool = False)
Initialize AWS clients for S3 and Secrets Manager.
Example:
from Snowforge import AWSIntegration
# Initialize with default profile
AWSIntegration.initialize(profile="default")
# Initialize with production profile and verbose output
AWSIntegration.initialize(profile="production", verbose=True)
get_secret(secret_name: str, verbose: bool = False) -> dict | str | None
Retrieve secrets from AWS Secrets Manager.
Example:
# Get database credentials
db_creds = AWSIntegration.get_secret("production/database/credentials")
# Get API token
api_token = AWSIntegration.get_secret("powerbi/refresh_token")
if isinstance(db_creds, dict):
username = db_creds.get("username")
password = db_creds.get("password")
push_file_to_s3(bucket_name: str, file_to_upload: str, key: str, config: TransferConfig = None, verbose: bool = False)
Upload files to S3 with progress tracking and multipart support.
Example:
from boto3.s3.transfer import TransferConfig
# Configure for large files
transfer_config = AWSIntegration.define_s3_transfer_config(
size_threshold=0.5, # 500 MB threshold
threads=10 # 10 concurrent threads
)
# Upload file
AWSIntegration.push_file_to_s3(
bucket_name="my-data-bucket",
file_to_upload="/tmp/large_dataset.csv",
key="raw/2024/dataset.csv",
config=transfer_config,
verbose=True
)
get_bucket_contents(bucket_name: str, verbose: bool = False) -> list[str]
List all files in an S3 bucket.
Example:
# List all files in bucket
files = AWSIntegration.get_bucket_contents("my-data-bucket")
for file_key in files:
print(f"Found: {file_key}")
SnowflakeIntegration
Establishes and manages connections to Snowflake with support for multiple authentication methods.
Key Methods
connect(user_name: str = None, account: str = None, profile: str = "default", verbose: bool = False)
Connect to Snowflake using various authentication methods.
Example:
from Snowforge import SnowflakeIntegration
# Method 1: Using profile with key-pair authentication
conn = SnowflakeIntegration.connect(profile="svc_key_based", verbose=True)
# Method 2: Using external browser authentication
conn = SnowflakeIntegration.connect(
user_name="john.doe@company.com",
account="xy12345.us-east-1"
)
# Method 3: Using default profile
conn = SnowflakeIntegration.connect()
# Execute queries
cursor = conn.cursor()
cursor.execute("SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()")
print(cursor.fetchone())
cursor.close()
load_to_snowflake(stage: str, stage_key: str, database: str, schema: str, table: str, profile: str = "default", verbose: bool = False)
Load data from Snowflake stage into table using COPY INTO.
Example:
# Load CSV file from internal stage
SnowflakeIntegration.load_to_snowflake(
stage="MY_STAGE",
stage_key="/data/2024/sales.csv",
database="ANALYTICS",
schema="RAW",
table="SALES_DATA",
profile="svc_key_based",
verbose=True
)
truncate_table(database: str, schema: str, table: str, profile: str = "default", verbose: bool = False)
Truncate a Snowflake table.
Example:
# Truncate staging table before load
SnowflakeIntegration.truncate_table(
database="ANALYTICS",
schema="STAGING",
table="TEMP_SALES",
profile="svc_key_based"
)
connect_to_pipe(profile: str = "default", client_name: str = None, db_name: str = None, schema_name: str = None, pipe_name: str = None, verbose: bool = False)
Connect to Snowpipe Streaming for real-time data ingestion.
Example:
from Snowforge import SnowflakeIntegration
from datetime import datetime
# Connect to Snowpipe
client = SnowflakeIntegration.connect_to_pipe(
profile="svc_key_based",
client_name="LOG_STREAM",
db_name="LOGGING_DB",
schema_name="LOGS",
pipe_name="LOG_PIPE"
)
# Stream data
data = {
"SOURCE": "APPLICATION",
"TIMESTAMP": datetime.now().isoformat(),
"CRITICALITY": "INFO",
"MESSAGE": "User login successful",
"WORKFLOW_NAME": "Authentication",
"WORKFLOW_INSTANCE": "12345"
}
SnowflakeIntegration.stream_to_pipe(client, data, channel_name="LOG_CHANNEL")
# Close connection
SnowflakeIntegration.close_pipe_connection(client)
close_connection()
Close the active Snowflake connection.
Example:
# Always close connections when done
SnowflakeIntegration.close_connection()
SnowflakeLogging
Provides structured task execution logging directly to Snowflake for audit trails and monitoring.
Prerequisites
Before using SnowflakeLogging, you must:
- Create a profile named
snowforgein your config file - Execute the SQL setup scripts in your Snowflake account
View setup requirements:
from Snowforge import SnowflakeLogging
# Print SQL scripts to console
sql_files = SnowflakeLogging.show_requirements(print_to_console=True)
Key Methods
log_start(task_id: int, process_id: int, starttime: datetime, verbose: bool = False) -> int
Log the start of a task execution and return an execution ID.
Example:
from Snowforge import SnowflakeLogging
from datetime import datetime
import hashlib
# Generate unique process ID
process_id = int(hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8], 16)
# Log task start
execution_id = SnowflakeLogging.log_start(
task_id=42,
process_id=process_id,
starttime=datetime.now(),
verbose=True
)
print(f"Task execution started: {execution_id}")
log_end(execution_id: int, status: str, log_path: str, endtime: datetime, next_execution_time: datetime = None, verbose: bool = False)
Log the completion of a task execution.
Example:
from datetime import datetime, timedelta
# Log successful completion
SnowflakeLogging.log_end(
execution_id=execution_id,
status="SUCCESS",
log_path="/logs/2024/04/task_42.log",
endtime=datetime.now(),
next_execution_time=datetime.now() + timedelta(hours=24),
verbose=True
)
# Log failure
SnowflakeLogging.log_end(
execution_id=execution_id,
status="FAILED",
log_path="/logs/2024/04/task_42_error.log",
endtime=datetime.now(),
next_execution_time=None # No next execution on failure
)
Complete Logging Example
from Snowforge import SnowflakeLogging, Debug
from datetime import datetime
import hashlib
def run_data_pipeline():
"""Example data pipeline with Snowflake logging."""
# Generate process ID
process_id = int(hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8], 16)
# Start logging
execution_id = SnowflakeLogging.log_start(
task_id=100,
process_id=process_id,
starttime=datetime.now()
)
try:
# Your pipeline logic here
Debug.log("Extracting data...", 'INFO')
Debug.log("Transforming data...", 'INFO')
Debug.log("Loading to target...", 'INFO')
# Log success
SnowflakeLogging.log_end(
execution_id=execution_id,
status="SUCCESS",
log_path=f"/logs/pipeline_{execution_id}.log",
endtime=datetime.now(),
next_execution_time=None
)
except Exception as e:
Debug.log(f"Pipeline failed: {e}", 'ERROR')
# Log failure
SnowflakeLogging.log_end(
execution_id=execution_id,
status="FAILED",
log_path=f"/logs/pipeline_{execution_id}_error.log",
endtime=datetime.now(),
next_execution_time=None
)
raise
if __name__ == "__main__":
run_data_pipeline()
Config
Centralized configuration management with automatic file discovery and profile selection.
Key Methods
get_snowflake_credentials(profile: str = "default", verbose: bool = False) -> dict
Retrieve Snowflake credentials for a specific profile.
Example:
from Snowforge import Config
# Get default profile
creds = Config.get_snowflake_credentials()
# Get specific profile
prod_creds = Config.get_snowflake_credentials(profile="production", verbose=True)
print(f"Username: {prod_creds['USERNAME']}")
print(f"Account: {prod_creds['ACCOUNT']}")
get_aws_credentials(profile: str = "default", verbose: bool = False) -> dict
Retrieve AWS credentials for a specific profile.
Example:
# Get AWS credentials
aws_creds = Config.get_aws_credentials(profile="production")
print(f"Region: {aws_creds['REGION']}")
print(f"Access Key: {aws_creds['AWS_ACCESS_KEY'][:8]}...")
find_config_file(config_paths: list = None, verbose: bool = False) -> str | None
Locate the configuration file.
Example:
# Find config file
config_path = Config.find_config_file(verbose=True)
if config_path:
print(f"Using config: {config_path}")
else:
print("No config file found")
Logging
Centralized console logging with colored output for better visibility.
Key Methods
log(message: str, level: str = 'INFO', verbose_logging: bool = False)
Log messages with colored output based on severity level.
Example:
from Snowforge import Debug
# Basic logging
Debug.log("Application started", 'INFO')
Debug.log("Configuration loaded", 'SUCCESS')
Debug.log("Missing parameter", 'WARNING')
Debug.log("Connection failed", 'ERROR')
Debug.log("System crash", 'CRITICAL')
# Debug logging (only shown when verbose=True)
Debug.log("Variable value: 42", 'DEBUG', verbose_logging=True)
# Custom formatting
Debug.log(
f"Processing file: large_dataset.csv\n"
f"Size: 1.5 GB\n"
f"Rows: 10,000,000",
'INFO'
)
Log Levels
| Level | Color | Usage |
|---|---|---|
INFO |
White | General information |
SUCCESS |
Green | Successful operations |
WARNING |
Yellow | Warning messages |
ERROR |
Red | Error messages |
CRITICAL |
Bright Red | Critical failures |
DEBUG |
Blue | Debug information (verbose mode only) |
DataMover
Extensible data extraction and movement engine with parallel processing support.
Key Classes
Engine
Orchestrates data extraction and file operations.
Key Methods:
export_to_file(extractor: ExtractorStrategy, output_path: str, fully_qualified_table_name: str, filter_statement: str = None, verbose: bool = False) -> tuple
Example:
from Snowforge.DataMover import Engine
from Snowforge.DataMover.Extractors.NetezzaExtractor import NetezzaExtractor
# Initialize extractor
extractor = NetezzaExtractor()
# Export table to file
header, csv_file = Engine.export_to_file(
extractor=extractor,
output_path="/tmp/exports",
fully_qualified_table_name="PROD_DB.SALES.TRANSACTIONS",
filter_statement="WHERE transaction_date >= '2024-01-01'",
verbose=True
)
print(f"Header: {header}")
print(f"Output file: {csv_file}")
parallel_process(worker_func: object, args_list: list[tuple], num_workers: int = None) -> list
Execute functions in parallel processes.
Example:
def process_chunk(chunk_id, data_file, start_offset, end_offset):
"""Process a chunk of data."""
print(f"Processing chunk {chunk_id}: {start_offset} to {end_offset}")
# Processing logic here
# Prepare work items
args_list = [
(1, "/tmp/data.csv", 0, 1000000),
(2, "/tmp/data.csv", 1000000, 2000000),
(3, "/tmp/data.csv", 2000000, 3000000),
]
# Execute in parallel
processes = Engine.parallel_process(
worker_func=process_chunk,
args_list=args_list,
num_workers=3
)
# Wait for completion
for process in processes:
process.join()
calculate_chunks(external_table: str, compression: int = 4) -> int
Calculate optimal number of chunks for a file.
Example:
# Calculate chunks for large file
num_chunks = Engine.calculate_chunks(
external_table="/tmp/large_export.csv",
compression=4
)
print(f"File will be split into {num_chunks} chunks")
ExtractorStrategy
Abstract base class for database-specific extractors.
Implementing a Custom Extractor:
from Snowforge.DataMover.Extractors.ExtractorStrategy import ExtractorStrategy
class PostgreSQLExtractor(ExtractorStrategy):
"""PostgreSQL data extractor implementation."""
def __init__(self, connection_string):
self.connection_string = connection_string
def extract_table_query(self, fully_qualified_table_name: str,
filter_statement: str = None,
verbose: bool = False):
"""Build extraction query for PostgreSQL."""
query = f"SELECT * FROM {fully_qualified_table_name}"
if filter_statement:
query += f" {filter_statement}"
return query
def list_all_tables(self, database_name: str, verbose: bool = False):
"""List all tables in PostgreSQL database."""
query = """
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
"""
# Execute query and return results
pass
def export_external_table(self, output_path: str,
table_name: str,
filter_statement: str = None,
verbose: bool = False):
"""Export PostgreSQL table to CSV file."""
# Implementation here
pass
# Use custom extractor
extractor = PostgreSQLExtractor("postgresql://user:pass@host:5432/db")
header, file = Engine.export_to_file(
extractor=extractor,
output_path="/tmp/exports",
fully_qualified_table_name="public.customers"
)
Broadcaster
Automated BI report export and multi-channel delivery system.
Key Classes
SnowforgeBroadcaster
Main orchestrator for report broadcasting.
Example:
from Snowforge.Broadcaster import (
SnowforgeBroadcaster,
BroadcastConfig,
SmtpConfig
)
from Snowforge.Broadcaster.adapters import FabricExporter
from Snowforge import FabricIntegration
# Initialize Fabric integration
fabric = FabricIntegration(
aws_secret_name="powerbi_token",
aws_profile="production"
)
# Create exporter
exporter = FabricExporter(fabric)
# Configure SMTP
smtp = SmtpConfig(
host="smtp.company.com",
port=587,
username="reports@company.com",
password="smtp_password",
use_tls=True
)
# Create broadcaster
broadcaster = SnowforgeBroadcaster(
exporter=exporter,
smtp=smtp,
poll_interval=5.0,
max_poll_attempts=240
)
# Configure broadcast
config = BroadcastConfig(
workspace_id="12345678-1234-1234-1234-123456789abc",
report_id="87654321-4321-4321-4321-cba987654321",
channel="email",
sender="reports@company.com",
recipients=["user1@company.com", "user2@company.com"],
subject="Monthly Sales Report",
body="Please find attached the monthly sales report.",
parameters={"ReportMonth": "2024-04", "Region": "North"}
)
# Execute broadcast
result = broadcaster.broadcast(config)
print(f"Report delivered: {result.filename}")
BroadcastConfig
Configuration for report broadcasts.
Example:
from Snowforge.Broadcaster import BroadcastConfig
# Email broadcast
email_config = BroadcastConfig(
workspace_id="workspace-guid",
report_id="report-guid",
channel="email",
sender="noreply@company.com",
recipients=["team@company.com"],
subject="Daily Dashboard - {date}",
body="<h1>Daily Dashboard</h1><p>See attached report.</p>",
parameters={"date": "2024-04-15"}
)
# Teams broadcast (when implemented)
teams_config = BroadcastConfig(
workspace_id="workspace-guid",
report_id="report-guid",
channel="teams",
teams_webhook_url="https://outlook.office.com/webhook/...",
parameters={"quarter": "Q1-2024"}
)
Configuration Providers
Load broadcast configurations from multiple sources.
JSON File Provider
Example:
from Snowforge.Broadcaster import JsonFileConfigProvider
# Load from JSON file
provider = JsonFileConfigProvider("/config/broadcasts.json")
configs = provider.get_all_configs()
for config in configs:
broadcaster.broadcast(config)
broadcasts.json:
[
{
"workspace_id": "12345678-1234-1234-1234-123456789abc",
"report_id": "87654321-4321-4321-4321-cba987654321",
"channel": "email",
"sender": "reports@company.com",
"recipients": ["user@company.com"],
"subject": "Report",
"body": "See attached",
"parameters": {}
}
]
Snowflake Config Provider
Example:
from Snowforge.Broadcaster import SnowflakeConfigProvider
from Snowforge import SnowflakeIntegration
# Connect to Snowflake
conn = SnowflakeIntegration.connect(profile="svc_key_based")
# Load configs from table
provider = SnowflakeConfigProvider(
connection=conn,
database="CONFIG_DB",
schema="BROADCAST",
table="REPORT_CONFIGS"
)
configs = provider.get_all_configs()
FabricIntegration
Microsoft Fabric and Power BI API integration for pipeline management, semantic model refresh, and report export.
Key Methods
test_authentication() -> bool
Verify API token validity.
Example:
from Snowforge import FabricIntegration
fabric = FabricIntegration(
aws_secret_name="powerbi_token",
aws_profile="default"
)
if fabric.test_authentication():
print("Authentication successful")
else:
print("Authentication failed")
start_pipeline(workspace_id: str, item_id: str) -> str
Start a Fabric data pipeline.
Example:
# Start pipeline
execution_id = fabric.start_pipeline(
workspace_id="12345678-1234-1234-1234-123456789abc",
item_id="pipeline-item-id"
)
if execution_id != "Error":
print(f"Pipeline started: {execution_id}")
check_pipeline_last_status(workspace_id: str, item_id: str) -> tuple[str, str]
Check pipeline execution status.
Example:
run_id, status = fabric.check_pipeline_last_status(
workspace_id="workspace-guid",
item_id="pipeline-item-id"
)
print(f"Pipeline {run_id}: {status}")
semantic_model_reload(workspace_id: str, item_id: str) -> str
Trigger semantic model refresh.
Example:
result = fabric.semantic_model_reload(
workspace_id="workspace-guid",
item_id="model-item-id"
)
if result == "Completed":
print("Refresh initiated successfully")
export_paginated_report(access_token: str, workspace_id: str, report_id: str, input_data: dict = None, file_format: str = "PDF")
Export a paginated report.
Example:
# Export with parameters
response = fabric.export_paginated_report(
access_token=fabric.access_token,
workspace_id="workspace-guid",
report_id="report-guid",
input_data={
"StartDate": "2024-01-01",
"EndDate": "2024-03-31",
"Department": "Sales"
},
file_format="XLSX"
)
if response and response.status_code == 202:
export_id = response.json().get("id")
print(f"Export initiated: {export_id}")
download_report(workspace_id: str, report_id: str, export_id: str) -> tuple[bytes, str]
Download exported report.
Example:
# Download report
content, filename = fabric.download_report(
workspace_id="workspace-guid",
report_id="report-guid",
export_id="export-guid"
)
# Save to file
with open(f"/reports/{filename}", "wb") as f:
f.write(content)
print(f"Downloaded: {filename} ({len(content)} bytes)")
Complete Fabric Workflow
from Snowforge import FabricIntegration, Debug
import time
# Initialize
fabric = FabricIntegration(aws_secret_name="powerbi_token")
# 1. Refresh data pipeline
Debug.log("Starting data pipeline...", 'INFO')
pipeline_id = fabric.start_pipeline(
workspace_id="workspace-guid",
item_id="pipeline-item-id"
)
# Wait for pipeline completion
while True:
run_id, status = fabric.check_pipeline_last_status(
workspace_id="workspace-guid",
item_id="pipeline-item-id"
)
if status == "Completed":
Debug.log("Pipeline completed successfully", 'SUCCESS')
break
elif status == "Failed":
Debug.log("Pipeline failed", 'ERROR')
break
time.sleep(10)
# 2. Refresh semantic model
Debug.log("Refreshing semantic model...", 'INFO')
fabric.semantic_model_reload(
workspace_id="workspace-guid",
item_id="model-item-id"
)
# 3. Export report
Debug.log("Exporting report...", 'INFO')
response = fabric.export_paginated_report(
access_token=fabric.access_token,
workspace_id="workspace-guid",
report_id="report-guid",
file_format="PDF"
)
export_id = response.json().get("id")
# 4. Wait for export and download
while True:
status_code = fabric.check_report_status(
workspace_id="workspace-guid",
report_id="report-guid",
export_id=export_id
)
if status_code == 200:
content, filename = fabric.download_report(
workspace_id="workspace-guid",
report_id="report-guid",
export_id=export_id
)
with open(f"/reports/{filename}", "wb") as f:
f.write(content)
Debug.log(f"Report saved: {filename}", 'SUCCESS')
break
time.sleep(5)
๐ Advanced Usage
End-to-End Data Pipeline
from Snowforge import (
AWSIntegration,
SnowflakeIntegration,
SnowflakeLogging,
Debug
)
from Snowforge.DataMover import Engine
from Snowforge.DataMover.Extractors.NetezzaExtractor import NetezzaExtractor
from datetime import datetime
import hashlib
def etl_pipeline():
"""Complete ETL pipeline example."""
# 1. Initialize connections
Debug.log("Initializing connections...", 'INFO')
AWSIntegration.initialize(profile="production")
SnowflakeIntegration.connect(profile="svc_key_based")
# 2. Start logging
process_id = int(hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8], 16)
execution_id = SnowflakeLogging.log_start(
task_id=200,
process_id=process_id,
starttime=datetime.now()
)
try:
# 3. Extract data from source
Debug.log("Extracting data from Netezza...", 'INFO')
extractor = NetezzaExtractor()
header, csv_file = Engine.export_to_file(
extractor=extractor,
output_path="/tmp/extracts",
fully_qualified_table_name="PROD.SALES.ORDERS",
filter_statement="WHERE order_date >= CURRENT_DATE - 7",
verbose=True
)
# 4. Upload to S3
Debug.log("Uploading to S3...", 'INFO')
s3_key = f"raw/orders/{datetime.now().strftime('%Y/%m/%d')}/orders.csv"
AWSIntegration.push_file_to_s3(
bucket_name="data-lake",
file_to_upload=csv_file,
key=s3_key,
verbose=True
)
# 5. Truncate target table
Debug.log("Truncating target table...", 'INFO')
SnowflakeIntegration.truncate_table(
database="ANALYTICS",
schema="STAGING",
table="ORDERS_STAGING"
)
# 6. Load to Snowflake
Debug.log("Loading to Snowflake...", 'INFO')
SnowflakeIntegration.load_to_snowflake(
stage="S3_STAGE",
stage_key=f"/{s3_key}",
database="ANALYTICS",
schema="STAGING",
table="ORDERS_STAGING",
verbose=True
)
# 7. Log success
SnowflakeLogging.log_end(
execution_id=execution_id,
status="SUCCESS",
log_path=f"/logs/etl_{execution_id}.log",
endtime=datetime.now(),
next_execution_time=None
)
Debug.log("Pipeline completed successfully!", 'SUCCESS')
except Exception as e:
Debug.log(f"Pipeline failed: {e}", 'ERROR')
SnowflakeLogging.log_end(
execution_id=execution_id,
status="FAILED",
log_path=f"/logs/etl_{execution_id}_error.log",
endtime=datetime.now(),
next_execution_time=None
)
raise
finally:
SnowflakeIntegration.close_connection()
if __name__ == "__main__":
etl_pipeline()
Multi-Report Broadcasting
from Snowforge.Broadcaster import (
SnowforgeBroadcaster,
SnowflakeConfigProvider,
SmtpConfig
)
from Snowforge.Broadcaster.adapters import FabricExporter
from Snowforge import FabricIntegration, SnowflakeIntegration, Debug
def broadcast_reports():
"""Broadcast multiple reports from Snowflake configuration."""
# Initialize connections
fabric = FabricIntegration(aws_secret_name="powerbi_token")
sf_conn = SnowflakeIntegration.connect(profile="svc_key_based")
# Configure broadcaster
broadcaster = SnowforgeBroadcaster(
exporter=FabricExporter(fabric),
smtp=SmtpConfig(
host="smtp.company.com",
port=587,
username="reports@company.com",
password="password",
use_tls=True
)
)
# Load configurations from Snowflake
provider = SnowflakeConfigProvider(
connection=sf_conn,
database="CONFIG",
schema="REPORTS",
table="BROADCAST_CONFIGS"
)
configs = provider.get_all_configs()
Debug.log(f"Found {len(configs)} report configurations", 'INFO')
# Broadcast each report
for i, config in enumerate(configs, 1):
Debug.log(f"Broadcasting report {i}/{len(configs)}: {config.subject}", 'INFO')
try:
result = broadcaster.broadcast(config)
Debug.log(f"Report {i} delivered: {result.filename}", 'SUCCESS')
except Exception as e:
Debug.log(f"Failed to broadcast report {i}: {e}", 'ERROR')
continue
# Cleanup
SnowflakeIntegration.close_connection()
if __name__ == "__main__":
broadcast_reports()
๐ง Extending Snowforge
Creating a Custom Database Extractor
from Snowforge.DataMover.Extractors.ExtractorStrategy import ExtractorStrategy
import pyodbc
import csv
class OracleExtractor(ExtractorStrategy):
"""Oracle database extractor."""
def __init__(self, dsn: str, username: str, password: str):
self.dsn = dsn
self.username = username
self.password = password
self.connection = None
def connect(self):
"""Establish Oracle connection."""
if not self.connection:
self.connection = pyodbc.connect(
f"DSN={self.dsn};UID={self.username};PWD={self.password}"
)
def extract_table_query(self, fully_qualified_table_name: str,
filter_statement: str = None,
verbose: bool = False) -> str:
"""Build Oracle extraction query."""
parts = fully_qualified_table_name.split('.')
if len(parts) == 3:
schema, table = parts[1], parts[2]
elif len(parts) == 2:
schema, table = parts
else:
table = parts[0]
schema = self.username.upper()
query = f"SELECT * FROM {schema}.{table}"
if filter_statement:
query += f" {filter_statement}"
return query
def list_all_tables(self, database_name: str, verbose: bool = False) -> list:
"""List all accessible tables in Oracle."""
self.connect()
cursor = self.connection.cursor()
query = """
SELECT owner, table_name
FROM all_tables
WHERE owner NOT IN ('SYS', 'SYSTEM')
ORDER BY owner, table_name
"""
cursor.execute(query)
tables = [(row.owner, row.table_name) for row in cursor.fetchall()]
cursor.close()
return tables
def export_external_table(self, output_path: str,
table_name: str,
filter_statement: str = None,
verbose: bool = False) -> tuple:
"""Export Oracle table to CSV."""
from Snowforge import Debug
self.connect()
cursor = self.connection.cursor()
# Build and execute query
query = self.extract_table_query(table_name, filter_statement, verbose)
Debug.log(f"Executing: {query}", 'DEBUG', verbose)
cursor.execute(query)
# Get column headers
headers = [desc[0] for desc in cursor.description]
# Export to CSV
import os
os.makedirs(output_path, exist_ok=True)
table_short = table_name.split('.')[-1]
csv_file = os.path.join(output_path, f"{table_short}.csv")
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f, delimiter='|')
writer.writerow(headers)
row_count = 0
while True:
rows = cursor.fetchmany(10000)
if not rows:
break
writer.writerows(rows)
row_count += len(rows)
cursor.close()
Debug.log(f"Exported {row_count} rows to {csv_file}", 'SUCCESS', verbose)
return headers, csv_file
def close(self):
"""Close Oracle connection."""
if self.connection:
self.connection.close()
self.connection = None
# Usage
extractor = OracleExtractor(
dsn="ORACLE_PROD",
username="etl_user",
password="password"
)
from Snowforge.DataMover import Engine
header, csv_file = Engine.export_to_file(
extractor=extractor,
output_path="/tmp/oracle_exports",
fully_qualified_table_name="HR.EMPLOYEES",
filter_statement="WHERE hire_date >= TO_DATE('2024-01-01', 'YYYY-MM-DD')"
)
extractor.close()
Creating a Custom Report Exporter
from Snowforge.Broadcaster.ReportExporter import ReportExporter, ExportResult
from typing import Any
class QlikSenseExporter(ReportExporter):
"""Qlik Sense report exporter."""
def __init__(self, qlik_api_client):
self.client = qlik_api_client
def start_export(self, workspace_id: str, report_id: str,
parameters: dict[str, Any] | None = None) -> str:
"""Initiate Qlik report export."""
# Build export request
export_config = {
"appId": report_id,
"format": "pdf",
"selections": parameters or {}
}
# Submit to Qlik API
response = self.client.start_export(export_config)
export_id = response.get("exportId")
return export_id
def poll_status(self, workspace_id: str, report_id: str,
export_id: str) -> str:
"""Check export status."""
status_response = self.client.get_export_status(export_id)
# Map Qlik status to standard status
qlik_status = status_response.get("status")
if qlik_status == "FINISHED":
return "completed"
elif qlik_status == "FAILED":
return "failed"
else:
return "running"
def download(self, workspace_id: str, report_id: str,
export_id: str) -> ExportResult:
"""Download exported report."""
# Download from Qlik
file_bytes = self.client.download_export(export_id)
filename = f"qlik_report_{export_id}.pdf"
return ExportResult(content=file_bytes, filename=filename)
# Usage with Broadcaster
from Snowforge.Broadcaster import SnowforgeBroadcaster, BroadcastConfig, SmtpConfig
qlik_client = ... # Initialize Qlik client
exporter = QlikSenseExporter(qlik_client)
broadcaster = SnowforgeBroadcaster(
exporter=exporter,
smtp=SmtpConfig(host="smtp.company.com", port=25)
)
config = BroadcastConfig(
workspace_id="qlik-workspace",
report_id="app-guid",
channel="email",
sender="reports@company.com",
recipients=["user@company.com"],
subject="Qlik Report",
body="See attached",
parameters={"Year": "2024"}
)
broadcaster.broadcast(config)
๐ License
This project is licensed under the MIT License. See the LICENSE file for details.
๐ค Contributing
We welcome contributions, suggestions, and collaboration!
How to Contribute
- Fork the repository
- Create a feature branch:
git checkout -b feature/my-new-feature - Commit your changes:
git commit -am 'Add new feature' - Push to the branch:
git push origin feature/my-new-feature - Submit a pull request
Reporting Issues
If you encounter bugs or have feature requests, please open an issue on our GitHub repository.
Contact
For questions about using Snowforge or collaboration opportunities:
- Authors: Andreas Heggelund, Christophe Lebegue
- Email: andreasheggelund@gmail.com
Vi oppfordrer til รฅ ta kontakt dersom du har forslag til forbedringer, spรธrsmรฅl om bruken av Snowforge, eller รธnsker samarbeid. Ditt bidrag er alltid velkommen!
๐ Acknowledgments
Snowforge is maintained by Norsk Tipping and built on top of excellent open-source libraries:
- boto3 - AWS SDK for Python
- snowflake-connector-python - Snowflake connector
- Jinja2 - Template engine
- requests - HTTP library
Happy Data Engineering! ๐
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file snowforge_package-0.4.4.tar.gz.
File metadata
- Download URL: snowforge_package-0.4.4.tar.gz
- Upload date:
- Size: 59.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a3f6c65d139a0525dd5b095c23a2fd8a0460dcc98bbc4c67a86bf05306a367ef
|
|
| MD5 |
78bbcdbd9d38bdc0deac7e9c7b799977
|
|
| BLAKE2b-256 |
4f53375b9d805c746a01962ae0d15f68f59595bee6c9eec7f98cf71930a7f22f
|
File details
Details for the file snowforge_package-0.4.4-py3-none-any.whl.
File metadata
- Download URL: snowforge_package-0.4.4-py3-none-any.whl
- Upload date:
- Size: 48.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d8d9e3d1fdc3f03a600ad55664ff204d4a7169380827bff8b4336c02f2bbf64b
|
|
| MD5 |
230dc1168d09f22da4cb099ecef03335
|
|
| BLAKE2b-256 |
0d135f2b8a94fb137b4d6750efe6912fe83b5e6bb43fe28152a9822e6ea0442e
|