Skip to main content

Python SDK for the Kailash container-node architecture

Project description

Kailash Python SDK

PyPI version Python versions Downloads MIT License Code style: black Tests: 544 passing Coverage: 100%

A Pythonic SDK for the Kailash container-node architecture

Build workflows that seamlessly integrate with Kailash's production environment while maintaining the flexibility to prototype quickly and iterate locally.


โœจ Highlights

  • ๐Ÿš€ Rapid Prototyping: Create and test workflows locally without containerization
  • ๐Ÿ—๏ธ Architecture-Aligned: Automatically ensures compliance with Kailash standards
  • ๐Ÿ”„ Seamless Handoff: Export prototypes directly to production-ready formats
  • ๐Ÿ“Š Real-time Monitoring: Live dashboards with WebSocket streaming and performance metrics
  • ๐Ÿงฉ Extensible: Easy to create custom nodes for domain-specific operations
  • โšก Fast Installation: Uses uv for lightning-fast Python package management

๐ŸŽฏ Who Is This For?

The Kailash Python SDK is designed for:

  • AI Business Coaches (ABCs) who need to prototype workflows quickly
  • Data Scientists building ML pipelines compatible with production infrastructure
  • Engineers who want to test Kailash workflows locally before deployment
  • Teams looking to standardize their workflow development process

๐Ÿš€ Quick Start

Installation

# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# For users: Install from PyPI
pip install kailash

# For developers: Clone and sync
git clone https://github.com/integrum/kailash-python-sdk.git
cd kailash-python-sdk
uv sync

Your First Workflow

from kailash.workflow import Workflow
from kailash.nodes.data import CSVReader
from kailash.nodes.code import PythonCodeNode
from kailash.runtime.local import LocalRuntime
import pandas as pd

# Create a workflow
workflow = Workflow("customer_analysis", name="customer_analysis")

# Add data reader
reader = CSVReader(file_path="customers.csv")
workflow.add_node("read_customers", reader)

# Add custom processing using Python code
def analyze_customers(data):
    """Analyze customer data and compute metrics."""
    df = pd.DataFrame(data)
    # Convert total_spent to numeric
    df['total_spent'] = pd.to_numeric(df['total_spent'])
    return {
        "total_customers": len(df),
        "avg_spend": df["total_spent"].mean(),
        "top_customers": df.nlargest(10, "total_spent").to_dict("records")
    }

analyzer = PythonCodeNode.from_function(analyze_customers, name="analyzer")
workflow.add_node("analyze", analyzer)

# Connect nodes
workflow.connect("read_customers", "analyze", {"data": "data"})

# Run locally
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow)
print(f"Analysis complete! Results: {results}")

# Export for production
from kailash.utils.export import WorkflowExporter
exporter = WorkflowExporter()
workflow.save("customer_analysis.yaml", format="yaml")

SharePoint Integration Example

from kailash.workflow import Workflow
from kailash.nodes.data import SharePointGraphReader, CSVWriter
import os

# Create workflow for SharePoint file processing
workflow = Workflow("sharepoint_processor", name="sharepoint_processor")

# Configure SharePoint reader (using environment variables)
sharepoint = SharePointGraphReader()
workflow.add_node("read_sharepoint", sharepoint)

# Process downloaded files
csv_writer = CSVWriter()
workflow.add_node("save_locally", csv_writer)

# Connect nodes
workflow.connect("read_sharepoint", "save_locally")

# Execute with credentials
from kailash.runtime.local import LocalRuntime

inputs = {
    "read_sharepoint": {
        "tenant_id": os.getenv("SHAREPOINT_TENANT_ID"),
        "client_id": os.getenv("SHAREPOINT_CLIENT_ID"),
        "client_secret": os.getenv("SHAREPOINT_CLIENT_SECRET"),
        "site_url": "https://yourcompany.sharepoint.com/sites/YourSite",
        "operation": "list_files",
        "library_name": "Documents"
    }
}

runtime = LocalRuntime()
results, run_id = runtime.execute(workflow, inputs=inputs)

๐Ÿ“š Documentation

Resource Description
๐Ÿ“– User Guide Comprehensive guide for using the SDK
๐Ÿ“‹ API Reference Detailed API documentation
๐ŸŒ API Integration Guide Complete API integration documentation
๐ŸŽ“ Examples Working examples and tutorials
๐Ÿค Contributing Contribution guidelines

๐Ÿ› ๏ธ Features

๐Ÿ“ฆ Pre-built Nodes

The SDK includes a rich set of pre-built nodes for common operations:

Data Operations

  • CSVReader - Read CSV files
  • JSONReader - Read JSON files
  • SQLDatabaseNode - Query databases
  • CSVWriter - Write CSV files
  • JSONWriter - Write JSON files

Processing Nodes

  • PythonCodeNode - Custom Python logic
  • DataTransformer - Transform data
  • Filter - Filter records
  • Aggregator - Aggregate data
  • TextProcessor - Process text

AI/ML Nodes

  • EmbeddingNode - Generate embeddings
  • VectorDatabaseNode - Vector search
  • ModelPredictorNode - ML predictions
  • LLMNode - LLM integration

API Integration Nodes

  • HTTPRequestNode - HTTP requests
  • RESTAPINode - REST API client
  • GraphQLClientNode - GraphQL queries
  • OAuth2AuthNode - OAuth 2.0 authentication
  • RateLimitedAPINode - Rate-limited API calls

Other Integration Nodes

  • KafkaConsumerNode - Kafka streaming
  • WebSocketNode - WebSocket connections
  • EmailNode - Send emails

SharePoint Integration

  • SharePointGraphReader - Read SharePoint files
  • SharePointGraphWriter - Upload to SharePoint

Real-time Monitoring

  • RealTimeDashboard - Live workflow monitoring
  • WorkflowPerformanceReporter - Comprehensive reports
  • SimpleDashboardAPI - REST API for metrics
  • DashboardAPIServer - WebSocket streaming server

๐Ÿ”ง Core Capabilities

Workflow Management

from kailash.workflow import Workflow

# Create complex workflows with branching logic
workflow = Workflow("data_pipeline", name="data_pipeline")

# Add conditional branching
validator = ValidationNode()
workflow.add_node("validate", validator)

# Different paths based on validation
workflow.add_node("process_valid", processor_a)
workflow.add_node("handle_errors", error_handler)

# Connect with conditions
workflow.connect("validate", "process_valid", condition="is_valid")
workflow.connect("validate", "handle_errors", condition="has_errors")

Immutable State Management

from kailash.workflow.state import WorkflowStateWrapper
from pydantic import BaseModel

# Define state model
class MyStateModel(BaseModel):
    counter: int = 0
    status: str = "pending"
    nested: dict = {}

# Create and wrap state object
state = MyStateModel()
state_wrapper = workflow.create_state_wrapper(state)

# Single path-based update
updated_wrapper = state_wrapper.update_in(
    ["counter"],
    42
)

# Batch update multiple fields atomically
updated_wrapper = state_wrapper.batch_update([
    (["counter"], 10),
    (["status"], "processing")
])

# Execute workflow with state management
final_state, results = workflow.execute_with_state(state_model=state)

Task Tracking

from kailash.tracking import TaskManager

# Initialize task manager
task_manager = TaskManager()

# Create a sample workflow
from kailash.workflow import Workflow
workflow = Workflow("sample_workflow", name="Sample Workflow")

# Run workflow with tracking
from kailash.runtime.local import LocalRuntime
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow, task_manager=task_manager)

# Query execution history
runs = task_manager.list_runs(status="completed", limit=10)
details = task_manager.get_run(run_id)

Local Testing

from kailash.runtime.local import LocalRuntime

# Create test runtime with debugging enabled
runtime = LocalRuntime(debug=True)

# Execute with test data
test_data = {"customers": [...]}
results = runtime.execute(workflow, inputs=test_data)

# Validate results
assert results["node_id"]["output_key"] == expected_value

Performance Monitoring & Real-time Dashboards

from kailash.visualization.performance import PerformanceVisualizer
from kailash.visualization.dashboard import RealTimeDashboard, DashboardConfig
from kailash.visualization.reports import WorkflowPerformanceReporter
from kailash.tracking import TaskManager
from kailash.runtime.local import LocalRuntime

# Run workflow with task tracking
task_manager = TaskManager()
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow, task_manager=task_manager)

# Static performance analysis
perf_viz = PerformanceVisualizer(task_manager)
outputs = perf_viz.create_run_performance_summary(run_id, output_dir="performance_report")
perf_viz.compare_runs([run_id_1, run_id_2], output_path="comparison.png")

# Real-time monitoring dashboard
config = DashboardConfig(
    update_interval=1.0,
    max_history_points=100,
    auto_refresh=True,
    theme="light"
)

dashboard = RealTimeDashboard(task_manager, config)
dashboard.start_monitoring(run_id)

# Add real-time callbacks
def on_metrics_update(metrics):
    print(f"Tasks: {metrics.completed_tasks} completed, {metrics.active_tasks} active")

dashboard.add_metrics_callback(on_metrics_update)

# Generate live HTML dashboard
dashboard.generate_live_report("live_dashboard.html", include_charts=True)
dashboard.stop_monitoring()

# Comprehensive performance reports
reporter = WorkflowPerformanceReporter(task_manager)
report_path = reporter.generate_report(
    run_id,
    output_path="workflow_report.html",
    format=ReportFormat.HTML,
    compare_runs=[run_id_1, run_id_2]
)

Real-time Dashboard Features:

  • โšก Live Metrics Streaming: Real-time task progress and resource monitoring
  • ๐Ÿ“Š Interactive Charts: CPU, memory, and throughput visualizations with Chart.js
  • ๐Ÿ”Œ API Endpoints: REST and WebSocket APIs for custom integrations
  • ๐Ÿ“ˆ Performance Reports: Multi-format reports (HTML, Markdown, JSON) with insights
  • ๐ŸŽฏ Bottleneck Detection: Automatic identification of performance issues
  • ๐Ÿ“ฑ Responsive Design: Mobile-friendly dashboards with auto-refresh

Performance Metrics Collected:

  • Execution Timeline: Gantt charts showing node execution order and duration
  • Resource Usage: Real-time CPU and memory consumption
  • I/O Analysis: Read/write operations and data transfer volumes
  • Performance Heatmaps: Identify bottlenecks across workflow runs
  • Throughput Metrics: Tasks per minute and completion rates
  • Error Tracking: Failed task analysis and error patterns

API Integration

from kailash.nodes.api import (
    HTTPRequestNode as RESTAPINode,
    # OAuth2AuthNode,
    # RateLimitedAPINode,
    # RateLimitConfig
)

# OAuth 2.0 authentication
# # auth_node = OAuth2AuthNode(
#     client_id="your_client_id",
#     client_secret="your_client_secret",
#     token_url="https://api.example.com/oauth/token"
# )

# Rate-limited API client
rate_config = None  # RateLimitConfig(
#     max_requests=100,
#     time_window=60.0,
#     strategy="token_bucket"
# )

api_client = RESTAPINode(
    base_url="https://api.example.com"
    # auth_node=auth_node
)

# rate_limited_client = RateLimitedAPINode(
#     wrapped_node=api_client,
#     rate_limit_config=rate_config
# )

Export Formats

from kailash.utils.export import WorkflowExporter, ExportConfig

exporter = WorkflowExporter()

# Export to different formats
workflow.save("workflow.yaml", format="yaml")  # Kailash YAML format
workflow.save("workflow.json", format="json")  # JSON representation

# Export with custom configuration
config = ExportConfig(
    include_metadata=True,
    container_tag="latest"
)
workflow.save("deployment.yaml", format="yaml")

๐ŸŽจ Visualization

from kailash.workflow.visualization import WorkflowVisualizer

# Visualize workflow structure
visualizer = WorkflowVisualizer(workflow)
visualizer.visualize(output_path="workflow.png")

# Show in Jupyter notebook
visualizer.show()

๐Ÿ’ป CLI Commands

The SDK includes a comprehensive CLI for workflow management:

# Project initialization
kailash init my-project --template data-pipeline

# Workflow operations
kailash validate workflow.yaml
kailash run workflow.yaml --inputs data.json
kailash export workflow.py --format kubernetes

# Task management
kailash tasks list --status running
kailash tasks show run-123
kailash tasks cancel run-123

# Development tools
kailash test workflow.yaml --data test_data.json
kailash debug workflow.yaml --breakpoint node-id

๐Ÿ—๏ธ Architecture

The SDK follows a clean, modular architecture:

kailash/
โ”œโ”€โ”€ nodes/           # Node implementations and base classes
โ”‚   โ”œโ”€โ”€ base.py      # Abstract Node class
โ”‚   โ”œโ”€โ”€ data/        # Data I/O nodes
โ”‚   โ”œโ”€โ”€ transform/   # Transformation nodes
โ”‚   โ”œโ”€โ”€ logic/       # Business logic nodes
โ”‚   โ””โ”€โ”€ ai/          # AI/ML nodes
โ”œโ”€โ”€ workflow/        # Workflow management
โ”‚   โ”œโ”€โ”€ graph.py     # DAG representation
โ”‚   โ””โ”€โ”€ visualization.py  # Visualization tools
โ”œโ”€โ”€ visualization/   # Performance visualization
โ”‚   โ””โ”€โ”€ performance.py    # Performance metrics charts
โ”œโ”€โ”€ runtime/         # Execution engines
โ”‚   โ”œโ”€โ”€ local.py     # Local execution
โ”‚   โ””โ”€โ”€ docker.py    # Docker execution (planned)
โ”œโ”€โ”€ tracking/        # Monitoring and tracking
โ”‚   โ”œโ”€โ”€ manager.py   # Task management
โ”‚   โ””โ”€โ”€ metrics_collector.py  # Performance metrics
โ”‚   โ””โ”€โ”€ storage/     # Storage backends
โ”œโ”€โ”€ cli/             # Command-line interface
โ””โ”€โ”€ utils/           # Utilities and helpers

๐Ÿงช Testing

The SDK is thoroughly tested with comprehensive test suites:

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=kailash --cov-report=html

# Run specific test categories
uv run pytest tests/unit/
uv run pytest tests/integration/
uv run pytest tests/e2e/

๐Ÿค Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Setup

# Clone the repository
git clone https://github.com/integrum/kailash-python-sdk.git
cd kailash-python-sdk

# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# Sync dependencies (creates venv automatically and installs everything)
uv sync

# Run commands using uv (no need to activate venv)
uv run pytest
uv run kailash --help

# Or activate the venv if you prefer
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

# Install development dependencies
uv add --dev pre-commit detect-secrets doc8

# Install Trivy (macOS with Homebrew)
brew install trivy

# Set up pre-commit hooks
pre-commit install
pre-commit install --hook-type pre-push

# Run initial setup (formats code and fixes issues)
pre-commit run --all-files

Code Quality & Pre-commit Hooks

We use automated pre-commit hooks to ensure code quality:

Hooks Include:

  • Black: Code formatting
  • isort: Import sorting
  • Ruff: Fast Python linting
  • pytest: Unit tests
  • Trivy: Security vulnerability scanning
  • detect-secrets: Secret detection
  • doc8: Documentation linting
  • mypy: Type checking

Manual Quality Checks:

# Format code
black src/ tests/
isort src/ tests/

# Linting and fixes
ruff check src/ tests/ --fix

# Type checking
mypy src/

# Run all pre-commit hooks manually
pre-commit run --all-files

# Run specific hooks
pre-commit run black
pre-commit run pytest-check

๐Ÿ“ˆ Project Status

โœ… Completed

  • Core node system with 15+ node types
  • Workflow builder with DAG validation
  • Local & async execution engines
  • Task tracking with metrics
  • Multiple storage backends
  • Export functionality (YAML/JSON)
  • CLI interface
  • Immutable state management
  • API integration with rate limiting
  • OAuth 2.0 authentication
  • SharePoint Graph API integration
  • Real-time performance metrics collection
  • Performance visualization dashboards
  • Real-time monitoring dashboard with WebSocket streaming
  • Comprehensive performance reports (HTML, Markdown, JSON)
  • 100% test coverage (544 tests)
  • 15 test categories all passing
  • 21+ working examples

๐Ÿšง In Progress

  • Comprehensive API documentation
  • Security audit & hardening
  • Performance optimizations
  • Docker runtime finalization

๐Ÿ“‹ Planned

  • Cloud deployment templates
  • Visual workflow editor
  • Plugin system
  • Additional integrations

๐ŸŽฏ Test Suite Status

  • Total Tests: 544 passing (100%)
  • Test Categories: 15/15 at 100%
  • Integration Tests: 65 passing
  • Examples: 21/21 working
  • Code Coverage: Comprehensive

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

  • The Integrum team for the Kailash architecture
  • All contributors who have helped shape this SDK
  • The Python community for excellent tools and libraries

๐Ÿ“ž Support


Made with โค๏ธ by the Integrum Team

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kailash-0.1.1.tar.gz (199.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kailash-0.1.1-py3-none-any.whl (227.7 kB view details)

Uploaded Python 3

File details

Details for the file kailash-0.1.1.tar.gz.

File metadata

  • Download URL: kailash-0.1.1.tar.gz
  • Upload date:
  • Size: 199.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for kailash-0.1.1.tar.gz
Algorithm Hash digest
SHA256 54d9dbde62c1ef5997b2a8c0998ea529dd875ad1b7d05b005fcab43197346018
MD5 2193208264b9d0f8fd8450880a03eac4
BLAKE2b-256 dc36f25398613841ee92fdde0cda049daea08d13c145a14e8a42619dfde08ae8

See more details on using hashes here.

File details

Details for the file kailash-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: kailash-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 227.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for kailash-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d04abc841c9483b9e426ab2ec89e5d4c4517a0138d93a6aa2385e641cd6d8f27
MD5 775ad1d97b3b6bb5fecdb26ad0847fe4
BLAKE2b-256 f0cd4506c299aca5f723853a17fe0506e8c074fdf567d010ee89d0cd940a98ab

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page