Skip to main content

Enterprise-grade Python SDK for data processing workflows with AWS Lambda-style interfaces.

Project description

Cuneiform SDK

Enterprise-grade Python SDK for data processing workflows with AWS Lambda-style interfaces.

A comprehensive toolkit for building, testing, and deploying data transformation workflows with DuckDB-powered SQL execution, automatic schema validation, and intelligent dataset discovery.


Table of Contents


Features

Workflow Functions

  • Decorator-based workflow function definition with metadata
  • Automatic function discovery and registration
  • Lambda-style execution model

📊 Data Processing

  • DuckDB-based in-memory SQL execution engine
  • Support for CSV, Parquet, and Parquet files
  • Automatic dataset loading and validation
  • Seamless DataFrame/table conversion

Schema Management

  • YAML-based schema definitions
  • Automatic schema validation
  • Column-level type checking
  • Schema discovery and generation

🔍 Code Analysis

  • AST-based function analysis
  • SQL dependency detection via SQLGlot
  • Dataset dependency discovery
  • Code quality insights

🎯 Developer Experience

  • Comprehensive CLI for dataset management
  • Rich error messages and validation feedback
  • Structured logging and debug support
  • Local testing capabilities

Installation

Requirements

  • Python 3.8+
  • DuckDB
  • Pandas
  • SQLGlot (for SQL analysis)

From Source

# Clone the repository
git clone <repository-url>
cd lib

# Install in development mode
pip install -e cuneiform_sdk

Basic Install

pip install cuneiform-sdk

Quick Start

1. Define a Workflow Function

from cuneiform_sdk import workflow_function, WorkflowRunContext

@workflow_function(
    name="process_customers",
    description="Process customer data and generate insights",
    version="1.0.0",
    tags=["customers", "etl"]
)
def process_customers(context: WorkflowRunContext, config: dict) -> dict:
    """
    Load customer data, transform it, and save results.
    """
    # Load input dataset
    context.load_dataset("customers", "data/customers.csv")
    
    # Execute SQL transformation
    result = context.sql("""
        SELECT 
            id,
            name,
            email,
            UPPER(city) as city,
            age * 1.1 as adjusted_age
        FROM customers
        WHERE age > 18
    """)
    
    # Save output
    output_info = context.save_dataset("customers", format="parquet")
    
    return {
        "status": "success",
        "output_path": output_info["output_path"],
        "row_count": len(result)
    }

2. Execute the Workflow

from cuneiform_sdk import WorkflowRunContext

# Create execution context
context = WorkflowRunContext(
    data_dir="./data",
    output_dir="./output",
    schemas_dir="./schemas"
)

# Run the function
result = process_customers(context, config={"filter": "active"})
print(result)

3. Use the CLI

# Download a dataset
cuneiform dataset download customers --format csv

# Validate dataset against schema
cuneiform dataset validate customers.csv

# Test your workflow
cuneiform workflow test process_customers.py --data-dir ./data

Core Concepts

WorkflowContext

The execution context that provides:

  • SQL Execution: Run queries on loaded datasets
  • Dataset Management: Load and save datasets
  • Schema Validation: Ensure data quality
  • I/O Operations: Read/write in multiple formats
context = WorkflowRunContext(
    data_dir="./data",           # Input datasets location
    output_dir="./output",       # Output datasets location
    schemas_dir="./schemas",     # Schema definitions location
    log_level="INFO"
)

Workflow Functions

Functions decorated with @workflow_function that:

  • Accept a WorkflowContext as first parameter
  • Receive configuration as additional parameters
  • Return a dictionary with execution results
  • Have automatic metadata and discovery support
@workflow_function(
    name="my_workflow",
    description="Description",
    version="1.0.0",
    tags=["tag1", "tag2"]
)
def my_workflow(context: WorkflowContext, config: dict) -> dict:
    # Implementation
    return {"status": "success"}

DatasetSchema

YAML-based schema definitions for datasets:

# schemas/customers.yaml
name: customers
version: "1.0.0"
description: "Customer master data"
columns:
  - name: id
    type: int64
    nullable: false
    description: "Unique customer ID"
  - name: name
    type: string
    nullable: false
  - name: email
    type: string
    nullable: true
  - name: created_at
    type: timestamp
    nullable: false

Usage Guide

Defining Workflow Functions

Basic Definition

from cuneiform_sdk import workflow_function, WorkflowRunContext

@workflow_function(
    name="transform_data",
    description="Transform raw data",
    version="1.0.0"
)
def transform_data(context: WorkflowRunContext, config: dict) -> dict:
    context.load_dataset("raw_data")
    
    context.sql("""
        CREATE TABLE processed_data AS
        SELECT * FROM raw_data WHERE status = 'active'
    """)
    
    return context.save_dataset("processed_data")

With Multiple Datasets

@workflow_function(name="merge_datasets", tags=["merge", "etl"])
def merge_datasets(context: WorkflowRunContext, config: dict) -> dict:
    # Load multiple datasets
    context.load_dataset("customers", "data/customers.csv")
    context.load_dataset("orders", "data/orders.parquet")
    
    # Transform
    result = context.sql("""
        SELECT 
            c.id as customer_id,
            c.name,
            COUNT(*) as order_count,
            SUM(o.amount) as total_spent
        FROM customers c
        LEFT JOIN orders o ON c.id = o.customer_id
        GROUP BY c.id, c.name
        ORDER BY total_spent DESC
    """)
    
    return context.save_dataset("customer_orders")

Working with Datasets

Loading Datasets

# Load from default location (data_dir/table_name.csv)
context.load_dataset("customers")

# Load from custom location
context.load_dataset("customers", "data/source/customers.parquet")

# Load multiple datasets
for table_name in ["customers", "orders", "products"]:
    context.load_dataset(table_name)

Saving Datasets

# Save single dataset
output = context.save_dataset("results", format="parquet")
print(output["output_path"])  # Path to saved file

# Save multiple datasets
outputs = context.save_datasets(["table1", "table2", "table3"])
for table_name, path in outputs.items():
    print(f"{table_name} -> {path}")

Working with DataFrames

# Get a pandas DataFrame
df = context.get_dataframe("customers")
print(df.head())

# Convert DataFrame to table
context.save_dataframe(df, "my_table")

Executing SQL

Simple Queries

# Execute SELECT query
result = context.sql("""
    SELECT id, name, email FROM customers LIMIT 10
""")

# Execute CREATE TABLE AS SELECT
context.sql("""
    CREATE TABLE high_value_customers AS
    SELECT * FROM customers WHERE lifetime_value > 10000
""")

# Execute UPDATE
context.sql("""
    UPDATE customers SET last_updated = CURRENT_TIMESTAMP
    WHERE status = 'active'
""")

Multi-Statement Execution

context.sql("""
    CREATE TEMP TABLE temp_data AS SELECT * FROM raw_data;
    
    CREATE TABLE processed AS
    SELECT *, ROW_NUMBER() OVER (ORDER BY id) as rn
    FROM temp_data;
    
    DROP TABLE temp_data;
""")

Complex Transformations

# Window functions
context.sql("""
    CREATE TABLE ranked_customers AS
    SELECT 
        *,
        ROW_NUMBER() OVER (ORDER BY spending DESC) as rank,
        ROUND(spending / SUM(spending) OVER () * 100, 2) as pct_of_total
    FROM customers
""")

# Aggregations with HAVING
context.sql("""
    CREATE TABLE customer_stats AS
    SELECT 
        region,
        COUNT(*) as customer_count,
        AVG(age) as avg_age,
        SUM(lifetime_value) as total_value
    FROM customers
    GROUP BY region
    HAVING COUNT(*) > 100
""")

Schema Management

Define Schemas

Create YAML files in your schemas/ directory:

# schemas/transactions.yaml
name: transactions
version: "1.0.0"
description: "Financial transaction records"
columns:
  - name: transaction_id
    type: string
    nullable: false
  - name: customer_id
    type: int64
    nullable: false
  - name: amount
    type: float64
    nullable: false
  - name: currency
    type: string
    nullable: false
  - name: transaction_date
    type: timestamp
    nullable: false
  - name: status
    type: string
    nullable: false

Validate Data Against Schema

from cuneiform_sdk import SchemaManager

schema_manager = SchemaManager("schemas/")
schema = schema_manager.load_schema("transactions")

# Validate DataFrame
try:
    is_valid = schema.validate_dataframe(df)
    print("✓ Data is valid")
except ValidationError as e:
    print(f"✗ Validation error: {e}")

Generate Schemas

from cuneiform_sdk import SchemaManager

schema_manager = SchemaManager("schemas/")

# Generate schema from DataFrame
df = context.get_dataframe("customers")
schema = schema_manager.generate_schema_from_dataframe(df, name="customers")

# Save schema
schema_manager.save_schema(schema, "customers.yaml")

CLI Commands

Dataset Management

# Download dataset from remote storage
cuneiform dataset download <dataset-name> --format csv

# Validate dataset against schema
cuneiform dataset validate <dataset-file>

# Generate schema from dataset
cuneiform dataset generate-schema <dataset-file> --output schema.yaml

# List available datasets
cuneiform dataset list

Workflow Management

# Scan directory for workflow functions
cuneiform workflow scan <directory>

# Test workflow locally
cuneiform workflow test <workflow-file> --data-dir ./data

# Show workflow metadata
cuneiform workflow info <workflow-function>

Configuration

# Use verbose logging
cuneiform --verbose <command>

# Specify custom data directory
cuneiform --data-dir ./custom/data <command>

API Reference

WorkflowContext

Abstract interface for workflow execution.

Methods

Method Description
sql(query: str) -> Any Execute SQL query
table(table_name: str) -> Any Get table reference
list_tables() -> List[str] List all tables
load_dataset(name: str, path: str = None) Load dataset
save_dataset(name: str, format: str = "parquet") -> dict Save dataset
save_datasets(names: List[str], format: str) -> dict Save multiple
get_dataframe(name: str, path: str = None) -> pd.DataFrame Get DataFrame
save_dataframe(df: pd.DataFrame, name: str) Save DataFrame
log(msg: str, level: str = "INFO") Log message

WorkflowRunContext

DuckDB-based implementation of WorkflowContext.

context = WorkflowRunContext(
    data_dir: str = "data",
    output_dir: str = "output", 
    schemas_dir: str = "datasets",
    log_level: str = "INFO",
    connection: Optional[object] = None
)

DatasetSchema

Schema definition for a dataset.

schema = DatasetSchema(
    name: str,                          # Dataset name
    columns: List[ColumnSchema],        # Column definitions
    description: Optional[str] = None,  # Description
    version: Optional[str] = None       # Version
)

# Load from YAML
schema = DatasetSchema.from_yaml_file("schemas/customers.yaml")

# Load from dictionary
schema = DatasetSchema.from_dict({
    "name": "customers",
    "columns": [{"name": "id", "type": "int64", "nullable": false}]
})

# Validate DataFrame
schema.validate_dataframe(df)

SchemaManager

Manage schema files and operations.

manager = SchemaManager("schemas/")

# Load schema
schema = manager.load_schema("customers")

# Save schema
manager.save_schema(schema, "customers.yaml")

# Generate from DataFrame
schema = manager.generate_schema_from_dataframe(df, "customers")

# List schemas
schemas = manager.list_schemas()

workflow_function Decorator

Mark functions as workflow functions.

@workflow_function(
    name: Optional[str] = None,           # Function name
    description: Optional[str] = None,    # Description
    version: Optional[str] = None,        # Version
    tags: Optional[List[str]] = None      # Tags for categorization
)
def my_function(context: WorkflowContext, config: dict) -> dict:
    pass

Examples

Example 1: Customer Segmentation

@workflow_function(
    name="segment_customers",
    description="Segment customers by spending patterns",
    tags=["customers", "segmentation"]
)
def segment_customers(context: WorkflowRunContext, config: dict) -> dict:
    context.load_dataset("customers", "data/customers.csv")
    context.load_dataset("orders", "data/orders.parquet")
    
    # Create segments based on spending
    context.sql("""
        CREATE TABLE customer_segments AS
        WITH spending_stats AS (
            SELECT 
                o.customer_id,
                COUNT(*) as order_count,
                SUM(o.total_amount) as total_spent,
                AVG(o.total_amount) as avg_order_value,
                MAX(o.order_date) as last_order_date
            FROM orders o
            GROUP BY o.customer_id
        )
        SELECT 
            c.id,
            c.name,
            c.email,
            ss.order_count,
            ss.total_spent,
            CASE 
                WHEN ss.total_spent > 10000 THEN 'VIP'
                WHEN ss.total_spent > 5000 THEN 'Premium'
                WHEN ss.total_spent > 1000 THEN 'Regular'
                ELSE 'Low Value'
            END as segment,
            DATE_DIFF('day', ss.last_order_date, CURRENT_DATE) as days_since_purchase
        FROM customers c
        LEFT JOIN spending_stats ss ON c.id = ss.customer_id
        ORDER BY ss.total_spent DESC
    """)
    
    return context.save_dataset("customer_segments", format="parquet")

Example 2: Data Quality Checks

@workflow_function(
    name="validate_data_quality",
    description="Run data quality checks",
    tags=["quality", "validation"]
)
def validate_data_quality(context: WorkflowRunContext, config: dict) -> dict:
    context.load_dataset("transactions")
    
    # Run quality checks
    checks = context.sql("""
        SELECT 
            'null_customer_ids' as check_name,
            COUNT(*) as issue_count
        FROM transactions
        WHERE customer_id IS NULL
        
        UNION ALL
        
        SELECT 
            'negative_amounts' as check_name,
            COUNT(*) as issue_count
        FROM transactions
        WHERE amount < 0
        
        UNION ALL
        
        SELECT 
            'future_dates' as check_name,
            COUNT(*) as issue_count
        FROM transactions
        WHERE transaction_date > CURRENT_DATE
    """)
    
    # Save report
    context.save_dataset("quality_report")
    
    return {"checks_completed": True, "report_saved": True}

Example 3: Time Series Analysis

@workflow_function(
    name="analyze_trends",
    description="Analyze sales trends over time",
    tags=["analysis", "timeseries"]
)
def analyze_trends(context: WorkflowRunContext, config: dict) -> dict:
    context.load_dataset("sales", "data/sales.parquet")
    
    context.sql("""
        CREATE TABLE sales_trends AS
        WITH daily_sales AS (
            SELECT 
                DATE(order_date) as sale_date,
                SUM(amount) as daily_total,
                COUNT(*) as transaction_count
            FROM sales
            GROUP BY DATE(order_date)
        )
        SELECT 
            sale_date,
            daily_total,
            transaction_count,
            AVG(daily_total) OVER (
                ORDER BY sale_date 
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ) as moving_avg_7day,
            LAG(daily_total, 1) OVER (ORDER BY sale_date) as prev_day_total,
            ROUND(
                (daily_total - LAG(daily_total, 1) OVER (ORDER BY sale_date)) / 
                LAG(daily_total, 1) OVER (ORDER BY sale_date) * 100, 
                2
            ) as pct_change
        FROM daily_sales
        ORDER BY sale_date
    """)
    
    return context.save_dataset("sales_trends")

Advanced Features

Workflow Function Discovery

Automatically discover and register workflow functions:

from cuneiform_sdk import WorkflowRegistry, get_global_registry

# Discover functions in a module
registry = WorkflowRegistry()
registry.discover_functions_in_module("my_workflows")

# List registered functions
for func_name, metadata in registry.functions.items():
    print(f"Function: {func_name}")
    print(f"  Description: {metadata.description}")
    print(f"  Version: {metadata.version}")
    print(f"  Tags: {metadata.tags}")

# Get function by name
func_metadata = registry.get_function("segment_customers")

Code Analysis and Dependency Discovery

Analyze workflow functions for dependencies:

from cuneiform_sdk import WorkflowFunctionScanner

scanner = WorkflowFunctionScanner(".")

# Scan directory for workflow functions
analyses = scanner.scan_directory("workflows/")

for analysis in analyses:
    print(f"Function: {analysis.name}")
    print(f"Input datasets: {[d.name for d in analysis.dependencies if d.type == 'input']}")
    print(f"Output datasets: {[d.name for d in analysis.dependencies if d.type == 'output']}")
    print(f"SQL operations: {len(analysis.sql_operations)}")
    if analysis.issues:
        print(f"Issues: {analysis.issues}")

SQL Analysis

Extract table dependencies from SQL:

from cuneiform_sdk import WorkflowFunctionScanner

scanner = WorkflowFunctionScanner(".")
analyzer = scanner.sql_analyzer

# Analyze SQL query
operation = analyzer.analyze_sql_query("""
    SELECT c.id, c.name, COUNT(*) as order_count
    FROM customers c
    JOIN orders o ON c.id = o.customer_id
    WHERE c.status = 'active'
    GROUP BY c.id, c.name
""")

print(f"Input tables: {operation.inputs}")
print(f"Output tables: {operation.outputs}")
print(f"Operation type: {operation.operation_type}")

Custom Logging

context.log("Processing customer data", "INFO")
context.log("Starting data validation", "DEBUG")
context.log("Warning: Large dataset detected", "WARNING")
context.log("Critical error in transformation", "ERROR")

Error Handling

from cuneiform_sdk import (
    CuneiformError,
    ValidationError,
    DatasetError,
    ContextError
)

@workflow_function(name="robust_workflow")
def robust_workflow(context: WorkflowRunContext, config: dict) -> dict:
    try:
        context.load_dataset("customers")
    except DatasetError as e:
        return {
            "status": "error",
            "error_type": "dataset_error",
            "message": str(e),
            "error_code": e.error_code
        }
    except ValidationError as e:
        return {
            "status": "error",
            "error_type": "validation_error",
            "message": str(e)
        }
    except CuneiformError as e:
        return {
            "status": "error",
            "message": str(e),
            "context": e.context
        }

Troubleshooting

Dataset Not Found

Problem: DatasetError: Dataset 'customers' not found

Solution:

  1. Verify file exists in data directory
  2. Check file naming: should be customers.csv, customers.parquet, or customers.pq
  3. Specify explicit path: context.load_dataset("customers", "path/to/file.csv")

Schema Validation Fails

Problem: ValidationError: Column 'age' expected int64, got float64

Solution:

  1. Update schema to match data types
  2. Add type conversion in SQL: CAST(age AS INT64)
  3. Update DataFrame before saving

SQL Syntax Errors

Problem: Error: SQL parse error

Solution:

  1. Check DuckDB SQL documentation for correct syntax
  2. Test queries separately in DuckDB
  3. Use multi-line strings for clarity
  4. Enable verbose logging: context = WorkflowRunContext(log_level="DEBUG")

Memory Issues with Large Datasets

Problem: MemoryError: Unable to allocate memory

Solution:

  1. Process data in chunks using LIMIT and pagination
  2. Use SQL to filter data early
  3. Delete intermediate tables: DROP TABLE temp_table
  4. Increase available memory to DuckDB process

Import Errors

Problem: ModuleNotFoundError: No module named 'cuneiform_sdk'

Solution:

  1. Ensure package is installed: pip install cuneiform-sdk
  2. Check Python path includes package location
  3. Use relative imports within package
  4. Install in development mode: pip install -e .

Support

For issues, questions, or contributions:


License

Cuneiform SDK is proprietary software. See LICENSE file for details.


Changelog

v0.1.0 (2024)

  • Initial release
  • WorkflowContext interface and DuckDB implementation
  • Schema validation system
  • CLI tools for dataset management
  • Workflow function discovery and analysis
  • SQL dependency detection

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cuneiform_sdk-0.1.3.tar.gz (42.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cuneiform_sdk-0.1.3-py3-none-any.whl (40.6 kB view details)

Uploaded Python 3

File details

Details for the file cuneiform_sdk-0.1.3.tar.gz.

File metadata

  • Download URL: cuneiform_sdk-0.1.3.tar.gz
  • Upload date:
  • Size: 42.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for cuneiform_sdk-0.1.3.tar.gz
Algorithm Hash digest
SHA256 57ebeb1d811418968073ef9dff74748dc6d9b99b85ae4ee9d89063784a5e54b9
MD5 47fd3f10ab1c8e7679f62b4241fd195c
BLAKE2b-256 7d8db9ac1b8999ed9a883f1ce97d1e35d914fcebe88da7017b8d27ba4a115654

See more details on using hashes here.

File details

Details for the file cuneiform_sdk-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: cuneiform_sdk-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 40.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for cuneiform_sdk-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 770bdbc514d8cfb02cc63c68ac603ffb6c0609668ddd35651f3c427bc4923036
MD5 650e9825e5570e17a5917bb6ab94acaf
BLAKE2b-256 b84ba0d0a5d415e57d8bb07ebf11893718e4b061e01589273dfe05b09addd1b9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page