Enterprise-grade Python SDK for data processing workflows with AWS Lambda-style interfaces.
Project description
Cuneiform SDK
Enterprise-grade Python SDK for data processing workflows with AWS Lambda-style interfaces.
A comprehensive toolkit for building, testing, and deploying data transformation workflows with DuckDB-powered SQL execution, automatic schema validation, and intelligent dataset discovery.
Table of Contents
- Features
- Installation
- Quick Start
- Core Concepts
- Usage Guide
- CLI Commands
- API Reference
- Examples
- Advanced Features
- Troubleshooting
Features
✨ Workflow Functions
- Decorator-based workflow function definition with metadata
- Automatic function discovery and registration
- Lambda-style execution model
📊 Data Processing
- DuckDB-based in-memory SQL execution engine
- Support for CSV, Parquet, and Parquet files
- Automatic dataset loading and validation
- Seamless DataFrame/table conversion
✅ Schema Management
- YAML-based schema definitions
- Automatic schema validation
- Column-level type checking
- Schema discovery and generation
🔍 Code Analysis
- AST-based function analysis
- SQL dependency detection via SQLGlot
- Dataset dependency discovery
- Code quality insights
🎯 Developer Experience
- Comprehensive CLI for dataset management
- Rich error messages and validation feedback
- Structured logging and debug support
- Local testing capabilities
Installation
Requirements
- Python 3.8+
- DuckDB
- Pandas
- SQLGlot (for SQL analysis)
From Source
# Clone the repository
git clone <repository-url>
cd lib
# Install in development mode
pip install -e cuneiform_sdk
Basic Install
pip install cuneiform-sdk
Quick Start
1. Define a Workflow Function
from cuneiform_sdk import workflow_function, WorkflowRunContext
@workflow_function(
name="process_customers",
description="Process customer data and generate insights",
version="1.0.0",
tags=["customers", "etl"]
)
def process_customers(context: WorkflowRunContext, config: dict) -> dict:
"""
Load customer data, transform it, and save results.
"""
# Load input dataset
context.load_dataset("customers", "data/customers.csv")
# Execute SQL transformation
result = context.sql("""
SELECT
id,
name,
email,
UPPER(city) as city,
age * 1.1 as adjusted_age
FROM customers
WHERE age > 18
""")
# Save output
output_info = context.save_dataset("customers", format="parquet")
return {
"status": "success",
"output_path": output_info["output_path"],
"row_count": len(result)
}
2. Execute the Workflow
from cuneiform_sdk import WorkflowRunContext
# Create execution context
context = WorkflowRunContext(
data_dir="./data",
output_dir="./output",
schemas_dir="./schemas"
)
# Run the function
result = process_customers(context, config={"filter": "active"})
print(result)
3. Use the CLI
# Download a dataset
cuneiform dataset download customers --format csv
# Validate dataset against schema
cuneiform dataset validate customers.csv
# Test your workflow
cuneiform workflow test process_customers.py --data-dir ./data
Core Concepts
WorkflowContext
The execution context that provides:
- SQL Execution: Run queries on loaded datasets
- Dataset Management: Load and save datasets
- Schema Validation: Ensure data quality
- I/O Operations: Read/write in multiple formats
context = WorkflowRunContext(
data_dir="./data", # Input datasets location
output_dir="./output", # Output datasets location
schemas_dir="./schemas", # Schema definitions location
log_level="INFO"
)
Workflow Functions
Functions decorated with @workflow_function that:
- Accept a
WorkflowContextas first parameter - Receive configuration as additional parameters
- Return a dictionary with execution results
- Have automatic metadata and discovery support
@workflow_function(
name="my_workflow",
description="Description",
version="1.0.0",
tags=["tag1", "tag2"]
)
def my_workflow(context: WorkflowContext, config: dict) -> dict:
# Implementation
return {"status": "success"}
DatasetSchema
YAML-based schema definitions for datasets:
# schemas/customers.yaml
name: customers
version: "1.0.0"
description: "Customer master data"
columns:
- name: id
type: int64
nullable: false
description: "Unique customer ID"
- name: name
type: string
nullable: false
- name: email
type: string
nullable: true
- name: created_at
type: timestamp
nullable: false
Usage Guide
Defining Workflow Functions
Basic Definition
from cuneiform_sdk import workflow_function, WorkflowRunContext
@workflow_function(
name="transform_data",
description="Transform raw data",
version="1.0.0"
)
def transform_data(context: WorkflowRunContext, config: dict) -> dict:
context.load_dataset("raw_data")
context.sql("""
CREATE TABLE processed_data AS
SELECT * FROM raw_data WHERE status = 'active'
""")
return context.save_dataset("processed_data")
With Multiple Datasets
@workflow_function(name="merge_datasets", tags=["merge", "etl"])
def merge_datasets(context: WorkflowRunContext, config: dict) -> dict:
# Load multiple datasets
context.load_dataset("customers", "data/customers.csv")
context.load_dataset("orders", "data/orders.parquet")
# Transform
result = context.sql("""
SELECT
c.id as customer_id,
c.name,
COUNT(*) as order_count,
SUM(o.amount) as total_spent
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
GROUP BY c.id, c.name
ORDER BY total_spent DESC
""")
return context.save_dataset("customer_orders")
Working with Datasets
Loading Datasets
# Load from default location (data_dir/table_name.csv)
context.load_dataset("customers")
# Load from custom location
context.load_dataset("customers", "data/source/customers.parquet")
# Load multiple datasets
for table_name in ["customers", "orders", "products"]:
context.load_dataset(table_name)
Saving Datasets
# Save single dataset
output = context.save_dataset("results", format="parquet")
print(output["output_path"]) # Path to saved file
# Save multiple datasets
outputs = context.save_datasets(["table1", "table2", "table3"])
for table_name, path in outputs.items():
print(f"{table_name} -> {path}")
Working with DataFrames
# Get a pandas DataFrame
df = context.get_dataframe("customers")
print(df.head())
# Convert DataFrame to table
context.save_dataframe(df, "my_table")
Executing SQL
Simple Queries
# Execute SELECT query
result = context.sql("""
SELECT id, name, email FROM customers LIMIT 10
""")
# Execute CREATE TABLE AS SELECT
context.sql("""
CREATE TABLE high_value_customers AS
SELECT * FROM customers WHERE lifetime_value > 10000
""")
# Execute UPDATE
context.sql("""
UPDATE customers SET last_updated = CURRENT_TIMESTAMP
WHERE status = 'active'
""")
Multi-Statement Execution
context.sql("""
CREATE TEMP TABLE temp_data AS SELECT * FROM raw_data;
CREATE TABLE processed AS
SELECT *, ROW_NUMBER() OVER (ORDER BY id) as rn
FROM temp_data;
DROP TABLE temp_data;
""")
Complex Transformations
# Window functions
context.sql("""
CREATE TABLE ranked_customers AS
SELECT
*,
ROW_NUMBER() OVER (ORDER BY spending DESC) as rank,
ROUND(spending / SUM(spending) OVER () * 100, 2) as pct_of_total
FROM customers
""")
# Aggregations with HAVING
context.sql("""
CREATE TABLE customer_stats AS
SELECT
region,
COUNT(*) as customer_count,
AVG(age) as avg_age,
SUM(lifetime_value) as total_value
FROM customers
GROUP BY region
HAVING COUNT(*) > 100
""")
Schema Management
Define Schemas
Create YAML files in your schemas/ directory:
# schemas/transactions.yaml
name: transactions
version: "1.0.0"
description: "Financial transaction records"
columns:
- name: transaction_id
type: string
nullable: false
- name: customer_id
type: int64
nullable: false
- name: amount
type: float64
nullable: false
- name: currency
type: string
nullable: false
- name: transaction_date
type: timestamp
nullable: false
- name: status
type: string
nullable: false
Validate Data Against Schema
from cuneiform_sdk import SchemaManager
schema_manager = SchemaManager("schemas/")
schema = schema_manager.load_schema("transactions")
# Validate DataFrame
try:
is_valid = schema.validate_dataframe(df)
print("✓ Data is valid")
except ValidationError as e:
print(f"✗ Validation error: {e}")
Generate Schemas
from cuneiform_sdk import SchemaManager
schema_manager = SchemaManager("schemas/")
# Generate schema from DataFrame
df = context.get_dataframe("customers")
schema = schema_manager.generate_schema_from_dataframe(df, name="customers")
# Save schema
schema_manager.save_schema(schema, "customers.yaml")
CLI Commands
Dataset Management
# Download dataset from remote storage
cuneiform dataset download <dataset-name> --format csv
# Validate dataset against schema
cuneiform dataset validate <dataset-file>
# Generate schema from dataset
cuneiform dataset generate-schema <dataset-file> --output schema.yaml
# List available datasets
cuneiform dataset list
Workflow Management
# Scan directory for workflow functions
cuneiform workflow scan <directory>
# Test workflow locally
cuneiform workflow test <workflow-file> --data-dir ./data
# Show workflow metadata
cuneiform workflow info <workflow-function>
Configuration
# Use verbose logging
cuneiform --verbose <command>
# Specify custom data directory
cuneiform --data-dir ./custom/data <command>
API Reference
WorkflowContext
Abstract interface for workflow execution.
Methods
| Method | Description |
|---|---|
sql(query: str) -> Any |
Execute SQL query |
table(table_name: str) -> Any |
Get table reference |
list_tables() -> List[str] |
List all tables |
load_dataset(name: str, path: str = None) |
Load dataset |
save_dataset(name: str, format: str = "parquet") -> dict |
Save dataset |
save_datasets(names: List[str], format: str) -> dict |
Save multiple |
get_dataframe(name: str, path: str = None) -> pd.DataFrame |
Get DataFrame |
save_dataframe(df: pd.DataFrame, name: str) |
Save DataFrame |
log(msg: str, level: str = "INFO") |
Log message |
WorkflowRunContext
DuckDB-based implementation of WorkflowContext.
context = WorkflowRunContext(
data_dir: str = "data",
output_dir: str = "output",
schemas_dir: str = "datasets",
log_level: str = "INFO",
connection: Optional[object] = None
)
DatasetSchema
Schema definition for a dataset.
schema = DatasetSchema(
name: str, # Dataset name
columns: List[ColumnSchema], # Column definitions
description: Optional[str] = None, # Description
version: Optional[str] = None # Version
)
# Load from YAML
schema = DatasetSchema.from_yaml_file("schemas/customers.yaml")
# Load from dictionary
schema = DatasetSchema.from_dict({
"name": "customers",
"columns": [{"name": "id", "type": "int64", "nullable": false}]
})
# Validate DataFrame
schema.validate_dataframe(df)
SchemaManager
Manage schema files and operations.
manager = SchemaManager("schemas/")
# Load schema
schema = manager.load_schema("customers")
# Save schema
manager.save_schema(schema, "customers.yaml")
# Generate from DataFrame
schema = manager.generate_schema_from_dataframe(df, "customers")
# List schemas
schemas = manager.list_schemas()
workflow_function Decorator
Mark functions as workflow functions.
@workflow_function(
name: Optional[str] = None, # Function name
description: Optional[str] = None, # Description
version: Optional[str] = None, # Version
tags: Optional[List[str]] = None # Tags for categorization
)
def my_function(context: WorkflowContext, config: dict) -> dict:
pass
Examples
Example 1: Customer Segmentation
@workflow_function(
name="segment_customers",
description="Segment customers by spending patterns",
tags=["customers", "segmentation"]
)
def segment_customers(context: WorkflowRunContext, config: dict) -> dict:
context.load_dataset("customers", "data/customers.csv")
context.load_dataset("orders", "data/orders.parquet")
# Create segments based on spending
context.sql("""
CREATE TABLE customer_segments AS
WITH spending_stats AS (
SELECT
o.customer_id,
COUNT(*) as order_count,
SUM(o.total_amount) as total_spent,
AVG(o.total_amount) as avg_order_value,
MAX(o.order_date) as last_order_date
FROM orders o
GROUP BY o.customer_id
)
SELECT
c.id,
c.name,
c.email,
ss.order_count,
ss.total_spent,
CASE
WHEN ss.total_spent > 10000 THEN 'VIP'
WHEN ss.total_spent > 5000 THEN 'Premium'
WHEN ss.total_spent > 1000 THEN 'Regular'
ELSE 'Low Value'
END as segment,
DATE_DIFF('day', ss.last_order_date, CURRENT_DATE) as days_since_purchase
FROM customers c
LEFT JOIN spending_stats ss ON c.id = ss.customer_id
ORDER BY ss.total_spent DESC
""")
return context.save_dataset("customer_segments", format="parquet")
Example 2: Data Quality Checks
@workflow_function(
name="validate_data_quality",
description="Run data quality checks",
tags=["quality", "validation"]
)
def validate_data_quality(context: WorkflowRunContext, config: dict) -> dict:
context.load_dataset("transactions")
# Run quality checks
checks = context.sql("""
SELECT
'null_customer_ids' as check_name,
COUNT(*) as issue_count
FROM transactions
WHERE customer_id IS NULL
UNION ALL
SELECT
'negative_amounts' as check_name,
COUNT(*) as issue_count
FROM transactions
WHERE amount < 0
UNION ALL
SELECT
'future_dates' as check_name,
COUNT(*) as issue_count
FROM transactions
WHERE transaction_date > CURRENT_DATE
""")
# Save report
context.save_dataset("quality_report")
return {"checks_completed": True, "report_saved": True}
Example 3: Time Series Analysis
@workflow_function(
name="analyze_trends",
description="Analyze sales trends over time",
tags=["analysis", "timeseries"]
)
def analyze_trends(context: WorkflowRunContext, config: dict) -> dict:
context.load_dataset("sales", "data/sales.parquet")
context.sql("""
CREATE TABLE sales_trends AS
WITH daily_sales AS (
SELECT
DATE(order_date) as sale_date,
SUM(amount) as daily_total,
COUNT(*) as transaction_count
FROM sales
GROUP BY DATE(order_date)
)
SELECT
sale_date,
daily_total,
transaction_count,
AVG(daily_total) OVER (
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7day,
LAG(daily_total, 1) OVER (ORDER BY sale_date) as prev_day_total,
ROUND(
(daily_total - LAG(daily_total, 1) OVER (ORDER BY sale_date)) /
LAG(daily_total, 1) OVER (ORDER BY sale_date) * 100,
2
) as pct_change
FROM daily_sales
ORDER BY sale_date
""")
return context.save_dataset("sales_trends")
Advanced Features
Workflow Function Discovery
Automatically discover and register workflow functions:
from cuneiform_sdk import WorkflowRegistry, get_global_registry
# Discover functions in a module
registry = WorkflowRegistry()
registry.discover_functions_in_module("my_workflows")
# List registered functions
for func_name, metadata in registry.functions.items():
print(f"Function: {func_name}")
print(f" Description: {metadata.description}")
print(f" Version: {metadata.version}")
print(f" Tags: {metadata.tags}")
# Get function by name
func_metadata = registry.get_function("segment_customers")
Code Analysis and Dependency Discovery
Analyze workflow functions for dependencies:
from cuneiform_sdk import WorkflowFunctionScanner
scanner = WorkflowFunctionScanner(".")
# Scan directory for workflow functions
analyses = scanner.scan_directory("workflows/")
for analysis in analyses:
print(f"Function: {analysis.name}")
print(f"Input datasets: {[d.name for d in analysis.dependencies if d.type == 'input']}")
print(f"Output datasets: {[d.name for d in analysis.dependencies if d.type == 'output']}")
print(f"SQL operations: {len(analysis.sql_operations)}")
if analysis.issues:
print(f"Issues: {analysis.issues}")
SQL Analysis
Extract table dependencies from SQL:
from cuneiform_sdk import WorkflowFunctionScanner
scanner = WorkflowFunctionScanner(".")
analyzer = scanner.sql_analyzer
# Analyze SQL query
operation = analyzer.analyze_sql_query("""
SELECT c.id, c.name, COUNT(*) as order_count
FROM customers c
JOIN orders o ON c.id = o.customer_id
WHERE c.status = 'active'
GROUP BY c.id, c.name
""")
print(f"Input tables: {operation.inputs}")
print(f"Output tables: {operation.outputs}")
print(f"Operation type: {operation.operation_type}")
Custom Logging
context.log("Processing customer data", "INFO")
context.log("Starting data validation", "DEBUG")
context.log("Warning: Large dataset detected", "WARNING")
context.log("Critical error in transformation", "ERROR")
Error Handling
from cuneiform_sdk import (
CuneiformError,
ValidationError,
DatasetError,
ContextError
)
@workflow_function(name="robust_workflow")
def robust_workflow(context: WorkflowRunContext, config: dict) -> dict:
try:
context.load_dataset("customers")
except DatasetError as e:
return {
"status": "error",
"error_type": "dataset_error",
"message": str(e),
"error_code": e.error_code
}
except ValidationError as e:
return {
"status": "error",
"error_type": "validation_error",
"message": str(e)
}
except CuneiformError as e:
return {
"status": "error",
"message": str(e),
"context": e.context
}
Troubleshooting
Dataset Not Found
Problem: DatasetError: Dataset 'customers' not found
Solution:
- Verify file exists in data directory
- Check file naming: should be
customers.csv,customers.parquet, orcustomers.pq - Specify explicit path:
context.load_dataset("customers", "path/to/file.csv")
Schema Validation Fails
Problem: ValidationError: Column 'age' expected int64, got float64
Solution:
- Update schema to match data types
- Add type conversion in SQL:
CAST(age AS INT64) - Update DataFrame before saving
SQL Syntax Errors
Problem: Error: SQL parse error
Solution:
- Check DuckDB SQL documentation for correct syntax
- Test queries separately in DuckDB
- Use multi-line strings for clarity
- Enable verbose logging:
context = WorkflowRunContext(log_level="DEBUG")
Memory Issues with Large Datasets
Problem: MemoryError: Unable to allocate memory
Solution:
- Process data in chunks using
LIMITand pagination - Use SQL to filter data early
- Delete intermediate tables:
DROP TABLE temp_table - Increase available memory to DuckDB process
Import Errors
Problem: ModuleNotFoundError: No module named 'cuneiform_sdk'
Solution:
- Ensure package is installed:
pip install cuneiform-sdk - Check Python path includes package location
- Use relative imports within package
- Install in development mode:
pip install -e .
Support
For issues, questions, or contributions:
- 📧 Email: support@peernova.com
- 🐛 Report bugs: GitHub Issues
- 📚 Documentation: Full Docs
License
Cuneiform SDK is proprietary software. See LICENSE file for details.
Changelog
v0.1.0 (2024)
- Initial release
- WorkflowContext interface and DuckDB implementation
- Schema validation system
- CLI tools for dataset management
- Workflow function discovery and analysis
- SQL dependency detection
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cuneiform_sdk-0.1.3.tar.gz.
File metadata
- Download URL: cuneiform_sdk-0.1.3.tar.gz
- Upload date:
- Size: 42.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
57ebeb1d811418968073ef9dff74748dc6d9b99b85ae4ee9d89063784a5e54b9
|
|
| MD5 |
47fd3f10ab1c8e7679f62b4241fd195c
|
|
| BLAKE2b-256 |
7d8db9ac1b8999ed9a883f1ce97d1e35d914fcebe88da7017b8d27ba4a115654
|
File details
Details for the file cuneiform_sdk-0.1.3-py3-none-any.whl.
File metadata
- Download URL: cuneiform_sdk-0.1.3-py3-none-any.whl
- Upload date:
- Size: 40.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
770bdbc514d8cfb02cc63c68ac603ffb6c0609668ddd35651f3c427bc4923036
|
|
| MD5 |
650e9825e5570e17a5917bb6ab94acaf
|
|
| BLAKE2b-256 |
b84ba0d0a5d415e57d8bb07ebf11893718e4b061e01589273dfe05b09addd1b9
|