SDK for building and managing Kubiya workflows
Project description
Kubiya SDK
Build Deterministic AI Workflows That Actually Work™
Get Started • MCP Server • Documentation • Examples • API Reference
🚀 The Future of AI is Deterministic
Kubiya SDK is a serverless workflow platform that transforms unpredictable AI agents into reliable, production-grade automation. Every workflow step runs as an independent Docker container, giving you the power to run ANY software while maintaining deterministic execution.
Why We Built This
After watching teams struggle with free-wheeling agent frameworks that promise magic but deliver chaos, we took a different approach. Instead of hoping an AI will figure out the right sequence of actions, we provide the tools to define the right sequence – with AI filling in the intelligent parts. Read more about our architecture →
Core Principles
- 🐳 Serverless Containers: Every step runs in its own Docker container - use ANY language, tool, or software
- 🎯 Deterministic Execution: Same inputs → Same workflow → Same outputs, every time
- 🏗️ Stateless Architecture: Each execution starts fresh with zero state pollution
- 🚀 Infinite Scale: From 1 to 1,000,000 executions without infrastructure changes
- 🤖 MCP Compatible: Works with Claude Desktop, ChatGPT, and any MCP client
- 🏠 Your Infrastructure: Runs entirely on-premise with zero vendor lock-in
✨ Key Features
🎯 Stateless & Serverless Orchestration
# Workflows are pure schemas - no hidden state
name: incident-response
steps:
- name: detect
executor: docker
image: monitoring:latest
- name: analyze
executor: inline_agent
depends: [detect]
- name: remediate
executor: shell
depends: [analyze]
🔌 Universal Integration
# Via Kubiya API
client.execute_workflow("deploy-app", params={"version": "2.0"})
# Via MCP Server (works with ANY agent system)
mcp_client.call_tool("execute_workflow", workflow_input="deploy-app")
# Via Agent Server (OpenAI-compatible)
response = openai.chat.completions.create(
model="kubiya-workflow-agent",
messages=[{"role": "user", "content": "Deploy version 2.0"}]
)
# Direct in your code
result = workflow.run(params={"env": "production"})
📦 Installation
# Basic installation
pip install kubiya-sdk
# With all features (includes MCP server and agent capabilities)
pip install kubiya-sdk[all]
# For development
pip install kubiya-sdk[dev]
🐳 Docker Installation
# Using Docker Compose (recommended)
docker-compose up -d
# Or run the MCP Agent Server directly
docker run -p 8000:8000 \
-e KUBIYA_API_KEY=$KUBIYA_API_KEY \
-e TOGETHER_API_KEY=$TOGETHER_API_KEY \
kubiya/workflow-sdk:latest \
mcp agent --provider together --port 8000
🤖 MCP (Model Context Protocol)
Kubiya SDK includes a powerful MCP implementation that enables ANY AI system to create and execute workflows.
Quick Start: MCP Agent Server
The fastest way to get started is with our Agent Server - an OpenAI-compatible API that any AI can use:
# Start the agent server
kubiya mcp agent --provider together --port 8000
# Or with a specific model
kubiya mcp agent --provider anthropic --model claude-3-5-sonnet-20241022 --port 8000
Now ANY OpenAI-compatible client can create workflows:
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="not-needed" # Uses env vars for actual API keys
)
response = client.chat.completions.create(
model="kubiya-workflow-agent",
messages=[{
"role": "user",
"content": "Create a workflow that backs up all databases to S3"
}],
stream=True
)
for chunk in response:
print(chunk.choices[0].delta.content, end="")
MCP Tools Available
The MCP server provides these tools to AI agents:
1. compile_workflow - Convert DSL to workflow manifest
# AI agents can write simple DSL code
dsl_code = """
from kubiya.dsl import Workflow
wf = Workflow("backup-databases")
wf.description("Backup all databases to S3")
wf.step("backup-postgres", "pg_dump -h $DB_HOST > backup.sql")
wf.step("upload-to-s3", "aws s3 cp backup.sql s3://backups/")
"""
result = compile_workflow(dsl_code=dsl_code)
# Returns: {"success": true, "manifest": {...}}
2. execute_workflow - Run workflows with real-time streaming
# Execute with streaming events
result = execute_workflow(
workflow_input={"name": "backup-databases", "steps": [...]},
stream_format="vercel" # or "raw" for standard events
)
# Streams: step_running, step_complete, workflow_complete events
3. get_workflow_runners - List available execution environments
runners = get_workflow_runners()
# Returns Docker-enabled runners, Kubernetes runners, etc.
4. get_integrations - Discover available integrations
integrations = get_integrations(category="cloud")
# Returns AWS, GCP, Azure integrations with configs
5. get_workflow_secrets - Manage secure credentials
secrets = get_workflow_secrets(pattern="AWS_*")
# Returns available secrets for workflows
Claude Desktop Integration
Add to your Claude Desktop config (~/Library/Application Support/Claude/claude_desktop_config.json):
{
"mcpServers": {
"kubiya": {
"command": "kubiya",
"args": ["mcp", "server"],
"env": {
"KUBIYA_API_KEY": "your-api-key"
}
}
}
}
Now Claude can create and execute workflows directly!
Vercel AI SDK Integration
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';
const result = await streamText({
model: openai('kubiya-workflow-agent', {
baseURL: 'http://localhost:8000/v1',
}),
messages: [
{
role: 'user',
content: 'Create a CI/CD pipeline for my Node.js app',
},
],
});
// Handle streaming with proper event parsing
for await (const chunk of result.textStream) {
// Vercel format: 0:"text" or 2:{"type":"step_running",...}
console.log(chunk);
}
Direct MCP Server Usage
For lower-level control, use the MCP server directly:
# Start MCP server (stdio transport)
kubiya mcp server
# The server communicates via stdio, perfect for tool integration
🎯 Quick Start
1. Start the Agent Server
# Set your API keys
export KUBIYA_API_KEY="your-key"
export TOGETHER_API_KEY="your-key" # Or OPENAI_API_KEY, ANTHROPIC_API_KEY
# Start the server
kubiya mcp agent --provider together --port 8000
2. Create a Workflow with AI
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="not-needed")
# Ask AI to create a workflow
response = client.chat.completions.create(
model="kubiya-workflow-agent",
messages=[{
"role": "user",
"content": """
Create a workflow that:
1. Checks disk space on all servers
2. Alerts if any disk is over 80% full
3. Automatically cleans up old logs if needed
"""
}]
)
print(response.choices[0].message.content)
3. Execute the Workflow
The AI will automatically execute the workflow and stream results in real-time!
🏗️ Architecture
MCP Server Architecture
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ AI Clients │────▶│ Agent Server │────▶│ MCP Server │
│ (Claude, GPT-4) │ │ (OpenAI API) │ │ (Tools) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌─────────────────┐
│ Kubiya API │ │ Workflow │
│ (Execution) │ │ Engine │
└──────────────────┘ └─────────────────┘
Workflow Execution Flow
- AI generates DSL → Simple, readable workflow code
- MCP compiles → Validates and converts to manifest
- Kubiya executes → Runs in Docker containers
- Streams events → Real-time progress updates
🛠️ CLI Commands
MCP Commands
# Start agent server (OpenAI-compatible API)
kubiya mcp agent --provider anthropic --model claude-3-opus --port 8000
# Start MCP server (stdio transport for tools)
kubiya mcp server
# Interactive chat mode for testing
kubiya mcp chat --provider together
# Test MCP tools
kubiya mcp test
Workflow Commands
# Validate a workflow
kubiya validate workflow.py
# Execute a workflow
kubiya run workflow.py --params KEY=value
# List executions
kubiya list --limit 10
# Stream execution logs
kubiya logs <execution-id> --follow
📊 Examples
Create a Monitoring Workflow
# The AI can generate this from a simple description
from kubiya.dsl import Workflow
wf = Workflow("system-monitor")
wf.description("Monitor system health and alert on issues")
# Check CPU usage
wf.step("check-cpu", """
cpu_usage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)
if (( $(echo "$cpu_usage > 80" | bc -l) )); then
echo "HIGH_CPU_ALERT: ${cpu_usage}%"
fi
""")
# Check memory
wf.step("check-memory", """
mem_usage=$(free | grep Mem | awk '{print ($3/$2) * 100.0}')
if (( $(echo "$mem_usage > 80" | bc -l) )); then
echo "HIGH_MEMORY_ALERT: ${mem_usage}%"
fi
""")
# Send alerts
wf.step("send-alerts")
.condition("${check-cpu.output} contains 'ALERT' or ${check-memory.output} contains 'ALERT'")
.shell("curl -X POST $SLACK_WEBHOOK -d '{\"text\": \"System Alert: $OUTPUT\"}'")
Multi-Language Data Pipeline
# AI can orchestrate complex multi-language workflows
wf = Workflow("data-pipeline")
# Python for data extraction
wf.step("extract")
.docker("python:3.11-slim")
.packages(["pandas", "requests"])
.code("""
import pandas as pd
data = pd.read_csv('https://data.source/file.csv')
data.to_parquet('/tmp/data.parquet')
""")
# R for statistical analysis
wf.step("analyze")
.docker("r-base:latest")
.code("""
library(arrow)
data <- read_parquet('/tmp/data.parquet')
summary_stats <- summary(data)
write.csv(summary_stats, '/tmp/analysis.csv')
""")
# Node.js for API upload
wf.step("upload")
.docker("node:20-slim")
.code("""
const fs = require('fs');
const axios = require('axios');
const data = fs.readFileSync('/tmp/analysis.csv');
await axios.post('https://api.destination/upload', data);
""")
🚀 Production Deployment
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: kubiya-agent-server
spec:
replicas: 3
template:
spec:
containers:
- name: agent-server
image: kubiya/workflow-sdk:latest
command: ["kubiya", "mcp", "agent"]
args: ["--provider", "anthropic", "--port", "8000"]
env:
- name: KUBIYA_API_KEY
valueFrom:
secretKeyRef:
name: kubiya-secrets
key: api-key
ports:
- containerPort: 8000
Docker Compose
version: '3.8'
services:
agent-server:
image: kubiya/workflow-sdk:latest
command: kubiya mcp agent --provider together --port 8000
ports:
- "8000:8000"
environment:
- KUBIYA_API_KEY=${KUBIYA_API_KEY}
- TOGETHER_API_KEY=${TOGETHER_API_KEY}
restart: unless-stopped
📚 Documentation
🚀 Getting Started
🤖 MCP Documentation
- MCP Overview - Understanding Model Context Protocol
- Agent Server Guide - OpenAI-compatible API
- MCP Tools Reference - Available MCP tools
- Authentication - API keys and security
- Integration Examples - Claude, ChatGPT, Vercel AI
🏗️ Workflow Development
- DSL Reference - Workflow syntax
- Docker Steps - Container execution
- Testing Workflows - Test and debug
📡 API Reference
- REST API - HTTP endpoints
- Streaming Events - SSE and Vercel formats
- Client SDK - Python client
🤝 Support
📄 License
AGPL-3.0 - See LICENSE for details.
Stop hoping AI agents will work. Start shipping workflows that do.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kubiya_sdk-2.0.5.tar.gz.
File metadata
- Download URL: kubiya_sdk-2.0.5.tar.gz
- Upload date:
- Size: 214.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
52d4ecc365b0dcfd3a09db798e71d5d66f37b0a58e1c5c8e8cfaa8d4795cebb3
|
|
| MD5 |
66b77e2e731d51316fbd899a6b82cfb3
|
|
| BLAKE2b-256 |
f5bd358a6ad3859d123929888ec917a4af870f75f130d580a333168568c4ba97
|
File details
Details for the file kubiya_sdk-2.0.5-py3-none-any.whl.
File metadata
- Download URL: kubiya_sdk-2.0.5-py3-none-any.whl
- Upload date:
- Size: 234.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
392ffc4df10eae71cc43f196fe8a9615b91de635706eefafb61ead4b139a6837
|
|
| MD5 |
7ce998053b2e85a28a9a97d223d39e29
|
|
| BLAKE2b-256 |
91511d4ef91b3ef81a3443ff6775ccc381e21c9fa26a593aab8d9c9916ed32e2
|