Python SDK for the Kailash container-node architecture
Project description
Kailash Python SDK
A Pythonic SDK for the Kailash container-node architecture
Build workflows that seamlessly integrate with Kailash's production environment while maintaining the flexibility to prototype quickly and iterate locally.
โจ Highlights
- ๐ Rapid Prototyping: Create and test workflows locally without containerization
- ๐๏ธ Architecture-Aligned: Automatically ensures compliance with Kailash standards
- ๐ Seamless Handoff: Export prototypes directly to production-ready formats
- ๐ Real-time Monitoring: Live dashboards with WebSocket streaming and performance metrics
- ๐งฉ Extensible: Easy to create custom nodes for domain-specific operations
- โก Fast Installation: Uses
uvfor lightning-fast Python package management
๐ฏ Who Is This For?
The Kailash Python SDK is designed for:
- AI Business Coaches (ABCs) who need to prototype workflows quickly
- Data Scientists building ML pipelines compatible with production infrastructure
- Engineers who want to test Kailash workflows locally before deployment
- Teams looking to standardize their workflow development process
๐ Quick Start
Installation
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh
# For users: Install from PyPI
uv pip install kailash
# For developers: Clone and sync
git clone https://github.com/integrum/kailash-python-sdk.git
cd kailash-python-sdk
uv sync
Your First Workflow
from kailash.workflow import Workflow
from kailash.nodes.data import CSVReader
from kailash.nodes.code import PythonCodeNode
from kailash.runtime.local import LocalRuntime
import pandas as pd
# Create a workflow
workflow = Workflow("customer_analysis", name="customer_analysis")
# Add data reader
reader = CSVReader(file_path="customers.csv")
workflow.add_node("read_customers", reader)
# Add custom processing using Python code
def analyze_customers(data):
"""Analyze customer data and compute metrics."""
df = pd.DataFrame(data)
# Convert total_spent to numeric
df['total_spent'] = pd.to_numeric(df['total_spent'])
return {
"total_customers": len(df),
"avg_spend": df["total_spent"].mean(),
"top_customers": df.nlargest(10, "total_spent").to_dict("records")
}
analyzer = PythonCodeNode.from_function(analyze_customers, name="analyzer")
workflow.add_node("analyze", analyzer)
# Connect nodes
workflow.connect("read_customers", "analyze", {"data": "data"})
# Run locally
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow)
print(f"Analysis complete! Results: {results}")
# Export for production
from kailash.utils.export import WorkflowExporter
exporter = WorkflowExporter()
workflow.save("customer_analysis.yaml", format="yaml")
SharePoint Integration Example
from kailash.workflow import Workflow
from kailash.nodes.data import SharePointGraphReader, CSVWriter
import os
# Create workflow for SharePoint file processing
workflow = Workflow("sharepoint_processor", name="sharepoint_processor")
# Configure SharePoint reader (using environment variables)
sharepoint = SharePointGraphReader()
workflow.add_node("read_sharepoint", sharepoint)
# Process downloaded files
csv_writer = CSVWriter()
workflow.add_node("save_locally", csv_writer)
# Connect nodes
workflow.connect("read_sharepoint", "save_locally")
# Execute with credentials
from kailash.runtime.local import LocalRuntime
inputs = {
"read_sharepoint": {
"tenant_id": os.getenv("SHAREPOINT_TENANT_ID"),
"client_id": os.getenv("SHAREPOINT_CLIENT_ID"),
"client_secret": os.getenv("SHAREPOINT_CLIENT_SECRET"),
"site_url": "https://yourcompany.sharepoint.com/sites/YourSite",
"operation": "list_files",
"library_name": "Documents"
}
}
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow, inputs=inputs)
๐ Documentation
| Resource | Description |
|---|---|
| ๐ User Guide | Comprehensive guide for using the SDK |
| ๐๏ธ Architecture | Architecture Decision Records |
| ๐ API Reference | Detailed API documentation |
| ๐ API Integration Guide | Complete API integration documentation |
| ๐ Examples | Working examples and tutorials |
| ๐ค Contributing | Contribution guidelines |
๐ ๏ธ Features
๐ฆ Pre-built Nodes
The SDK includes a rich set of pre-built nodes for common operations:
|
Data Operations
|
Processing Nodes
|
|
AI/ML Nodes
|
API Integration Nodes
Other Integration Nodes
SharePoint Integration
Real-time Monitoring
|
๐ง Core Capabilities
Workflow Management
from kailash.workflow import Workflow
# Create complex workflows with branching logic
workflow = Workflow("data_pipeline", name="data_pipeline")
# Add conditional branching
validator = ValidationNode()
workflow.add_node("validate", validator)
# Different paths based on validation
workflow.add_node("process_valid", processor_a)
workflow.add_node("handle_errors", error_handler)
# Connect with conditions
workflow.connect("validate", "process_valid", condition="is_valid")
workflow.connect("validate", "handle_errors", condition="has_errors")
Immutable State Management
from kailash.workflow.state import WorkflowStateWrapper
from pydantic import BaseModel
# Define state model
class MyStateModel(BaseModel):
counter: int = 0
status: str = "pending"
nested: dict = {}
# Create and wrap state object
state = MyStateModel()
state_wrapper = workflow.create_state_wrapper(state)
# Single path-based update
updated_wrapper = state_wrapper.update_in(
["counter"],
42
)
# Batch update multiple fields atomically
updated_wrapper = state_wrapper.batch_update([
(["counter"], 10),
(["status"], "processing")
])
# Execute workflow with state management
final_state, results = workflow.execute_with_state(state_model=state)
Task Tracking
from kailash.tracking import TaskManager
# Initialize task manager
task_manager = TaskManager()
# Create a sample workflow
from kailash.workflow import Workflow
workflow = Workflow("sample_workflow", name="Sample Workflow")
# Run workflow with tracking
from kailash.runtime.local import LocalRuntime
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow, task_manager=task_manager)
# Query execution history
runs = task_manager.list_runs(status="completed", limit=10)
details = task_manager.get_run(run_id)
Local Testing
from kailash.runtime.local import LocalRuntime
# Create test runtime with debugging enabled
runtime = LocalRuntime(debug=True)
# Execute with test data
test_data = {"customers": [...]}
results = runtime.execute(workflow, inputs=test_data)
# Validate results
assert results["node_id"]["output_key"] == expected_value
Performance Monitoring & Real-time Dashboards
from kailash.visualization.performance import PerformanceVisualizer
from kailash.visualization.dashboard import RealTimeDashboard, DashboardConfig
from kailash.visualization.reports import WorkflowPerformanceReporter
from kailash.tracking import TaskManager
from kailash.runtime.local import LocalRuntime
# Run workflow with task tracking
task_manager = TaskManager()
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow, task_manager=task_manager)
# Static performance analysis
perf_viz = PerformanceVisualizer(task_manager)
outputs = perf_viz.create_run_performance_summary(run_id, output_dir="performance_report")
perf_viz.compare_runs([run_id_1, run_id_2], output_path="comparison.png")
# Real-time monitoring dashboard
config = DashboardConfig(
update_interval=1.0,
max_history_points=100,
auto_refresh=True,
theme="light"
)
dashboard = RealTimeDashboard(task_manager, config)
dashboard.start_monitoring(run_id)
# Add real-time callbacks
def on_metrics_update(metrics):
print(f"Tasks: {metrics.completed_tasks} completed, {metrics.active_tasks} active")
dashboard.add_metrics_callback(on_metrics_update)
# Generate live HTML dashboard
dashboard.generate_live_report("live_dashboard.html", include_charts=True)
dashboard.stop_monitoring()
# Comprehensive performance reports
reporter = WorkflowPerformanceReporter(task_manager)
report_path = reporter.generate_report(
run_id,
output_path="workflow_report.html",
format=ReportFormat.HTML,
compare_runs=[run_id_1, run_id_2]
)
Real-time Dashboard Features:
- โก Live Metrics Streaming: Real-time task progress and resource monitoring
- ๐ Interactive Charts: CPU, memory, and throughput visualizations with Chart.js
- ๐ API Endpoints: REST and WebSocket APIs for custom integrations
- ๐ Performance Reports: Multi-format reports (HTML, Markdown, JSON) with insights
- ๐ฏ Bottleneck Detection: Automatic identification of performance issues
- ๐ฑ Responsive Design: Mobile-friendly dashboards with auto-refresh
Performance Metrics Collected:
- Execution Timeline: Gantt charts showing node execution order and duration
- Resource Usage: Real-time CPU and memory consumption
- I/O Analysis: Read/write operations and data transfer volumes
- Performance Heatmaps: Identify bottlenecks across workflow runs
- Throughput Metrics: Tasks per minute and completion rates
- Error Tracking: Failed task analysis and error patterns
API Integration
from kailash.nodes.api import (
HTTPRequestNode as RESTAPINode,
# OAuth2AuthNode,
# RateLimitedAPINode,
# RateLimitConfig
)
# OAuth 2.0 authentication
# # auth_node = OAuth2AuthNode(
# client_id="your_client_id",
# client_secret="your_client_secret",
# token_url="https://api.example.com/oauth/token"
# )
# Rate-limited API client
rate_config = None # RateLimitConfig(
# max_requests=100,
# time_window=60.0,
# strategy="token_bucket"
# )
api_client = RESTAPINode(
base_url="https://api.example.com"
# auth_node=auth_node
)
# rate_limited_client = RateLimitedAPINode(
# wrapped_node=api_client,
# rate_limit_config=rate_config
# )
Export Formats
from kailash.utils.export import WorkflowExporter, ExportConfig
exporter = WorkflowExporter()
# Export to different formats
workflow.save("workflow.yaml", format="yaml") # Kailash YAML format
workflow.save("workflow.json", format="json") # JSON representation
# Export with custom configuration
config = ExportConfig(
include_metadata=True,
container_tag="latest"
)
workflow.save("deployment.yaml", format="yaml")
๐จ Visualization
from kailash.workflow.visualization import WorkflowVisualizer
# Visualize workflow structure
visualizer = WorkflowVisualizer(workflow)
visualizer.visualize(output_path="workflow.png")
# Show in Jupyter notebook
visualizer.show()
๐ป CLI Commands
The SDK includes a comprehensive CLI for workflow management:
# Project initialization
kailash init my-project --template data-pipeline
# Workflow operations
kailash validate workflow.yaml
kailash run workflow.yaml --inputs data.json
kailash export workflow.py --format kubernetes
# Task management
kailash tasks list --status running
kailash tasks show run-123
kailash tasks cancel run-123
# Development tools
kailash test workflow.yaml --data test_data.json
kailash debug workflow.yaml --breakpoint node-id
๐๏ธ Architecture
The SDK follows a clean, modular architecture:
kailash/
โโโ nodes/ # Node implementations and base classes
โ โโโ base.py # Abstract Node class
โ โโโ data/ # Data I/O nodes
โ โโโ transform/ # Transformation nodes
โ โโโ logic/ # Business logic nodes
โ โโโ ai/ # AI/ML nodes
โโโ workflow/ # Workflow management
โ โโโ graph.py # DAG representation
โ โโโ visualization.py # Visualization tools
โโโ visualization/ # Performance visualization
โ โโโ performance.py # Performance metrics charts
โโโ runtime/ # Execution engines
โ โโโ local.py # Local execution
โ โโโ docker.py # Docker execution (planned)
โโโ tracking/ # Monitoring and tracking
โ โโโ manager.py # Task management
โ โโโ metrics_collector.py # Performance metrics
โ โโโ storage/ # Storage backends
โโโ cli/ # Command-line interface
โโโ utils/ # Utilities and helpers
๐งช Testing
The SDK is thoroughly tested with comprehensive test suites:
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov=kailash --cov-report=html
# Run specific test categories
uv run pytest tests/unit/
uv run pytest tests/integration/
uv run pytest tests/e2e/
๐ค Contributing
We welcome contributions! Please see our Contributing Guide for details.
Development Setup
# Clone the repository
git clone https://github.com/integrum/kailash-python-sdk.git
cd kailash-python-sdk
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh
# Sync dependencies (creates venv automatically and installs everything)
uv sync
# Run commands using uv (no need to activate venv)
uv run pytest
uv run kailash --help
# Or activate the venv if you prefer
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install development dependencies
uv add --dev pre-commit detect-secrets doc8
# Install Trivy (macOS with Homebrew)
brew install trivy
# Set up pre-commit hooks
pre-commit install
pre-commit install --hook-type pre-push
# Run initial setup (formats code and fixes issues)
pre-commit run --all-files
Code Quality & Pre-commit Hooks
We use automated pre-commit hooks to ensure code quality:
Hooks Include:
- Black: Code formatting
- isort: Import sorting
- Ruff: Fast Python linting
- pytest: Unit tests
- Trivy: Security vulnerability scanning
- detect-secrets: Secret detection
- doc8: Documentation linting
- mypy: Type checking
Manual Quality Checks:
# Format code
black src/ tests/
isort src/ tests/
# Linting and fixes
ruff check src/ tests/ --fix
# Type checking
mypy src/
# Run all pre-commit hooks manually
pre-commit run --all-files
# Run specific hooks
pre-commit run black
pre-commit run pytest-check
๐ Project Status
โ Completed
|
๐ง In Progress
|
๐ Planned
|
๐ฏ Test Suite Status
- Total Tests: 544 passing (100%)
- Test Categories: 15/15 at 100%
- Integration Tests: 65 passing
- Examples: 21/21 working
- Code Coverage: Comprehensive
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Acknowledgments
- The Integrum team for the Kailash architecture
- All contributors who have helped shape this SDK
- The Python community for excellent tools and libraries
๐ Support
- ๐ GitHub Issues
- ๐ง Email: support@integrum.com
- ๐ฌ Slack: Join our community
Made with โค๏ธ by the Integrum Team
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kailash-0.1.0.tar.gz.
File metadata
- Download URL: kailash-0.1.0.tar.gz
- Upload date:
- Size: 199.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
76533ca7a7465fdee1536a7530d27e7e89e526b9a86ea2f27a0605a7e1e919b3
|
|
| MD5 |
e2b1a0f43f33fbc76309aed06fd3368a
|
|
| BLAKE2b-256 |
31aa681854ae6f5d389d7691127eaa2540b6e48454940e734fae7ad60e7e1a4c
|
File details
Details for the file kailash-0.1.0-py3-none-any.whl.
File metadata
- Download URL: kailash-0.1.0-py3-none-any.whl
- Upload date:
- Size: 227.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0f249000002074b319b7411f782b0dcffbde8a92ee931de23e62084e242d436c
|
|
| MD5 |
1d1b3403dbc317f03a8e118d80e9231c
|
|
| BLAKE2b-256 |
2fc6ed7578edaa8f9ad45f3d6d743c31347cbfc2e5670ddaa27f3d219c42aadd
|