Skip to main content

Production-grade workflow orchestration system with LLM cost tracking, performance analytics, and dependency management.

Project description

Socratic Workflow

Tests Code Quality PyPI License: MIT

Why Socratic Workflow?

Building multi-step AI workflows is complex. Socratic Workflow handles the production challenges:

  • Cost Tracking - Track LLM costs across 16+ models and 5 providers (Claude, GPT-4, Gemini, Llama, Mistral)
  • Dependency Management - Define complex task graphs with automatic dependency resolution
  • Parallel Execution - Run independent tasks concurrently for maximum performance
  • Error Recovery - Automatic retry logic with exponential backoff and graceful degradation
  • Performance Analytics - Measure execution time, bottlenecks, and success rates in real-time

Production-grade workflow orchestration system with LLM cost tracking, performance analytics, and dependency management.

Features

  • 🔄 Workflow Orchestration - Define and execute multi-step AI workflows
  • 💰 Cost Tracking - Track LLM costs across providers (Claude, GPT-4, Gemini, etc.)
  • 📊 Performance Analytics - Measure execution time, success rates, bottlenecks
  • 🔗 Task Dependencies - Manage complex task graphs with dependencies
  • ⚡ Parallel Execution - Run independent tasks concurrently for speed
  • 💾 State Management - Save/restore workflow state for resilience
  • 🔄 Error Recovery - Retry logic and graceful degradation
  • 🔌 Framework Integration - Works with Openclaw, LangChain, Socratic Agents

Quick Start

Installation

# Basic installation
pip install socratic-workflow

# With LangChain integration
pip install socratic-workflow[langchain]

# With Openclaw integration
pip install socratic-workflow[openclaw]

# With everything
pip install socratic-workflow[all]

Simple Workflow

from socratic_workflow import Workflow, Task, WorkflowEngine

# Define a custom task
class GreetingTask(Task):
    def execute(self, context):
        name = self.config.get("name", "World")
        return {"greeting": f"Hello, {name}!"}

# Build workflow
workflow = Workflow("Greeting Pipeline")
workflow.add_task("greet", GreetingTask(name="Alice"))

# Execute
engine = WorkflowEngine()
result = engine.execute(workflow)

print(result.success)  # True
print(result.task_results)  # {"greet": {"greeting": "Hello, Alice!"}}
print(f"Execution time: {result.duration:.2f}s")

Workflow with Dependencies

from socratic_workflow import Workflow, SimpleTask, WorkflowEngine

# Create workflow with dependencies
workflow = Workflow("Multi-Step Pipeline")
workflow.add_task("step1", SimpleTask(result={"data": "processed"}))
workflow.add_task("step2", SimpleTask(result={"analysis": "complete"}), depends_on=["step1"])
workflow.add_task("step3", SimpleTask(result={"summary": "done"}), depends_on=["step2"])

# Execute
engine = WorkflowEngine()
result = engine.execute(workflow)

print(result.success)
print(result.duration)

Async Execution

import asyncio
from socratic_workflow import Workflow, SimpleTask, WorkflowEngine

async def main():
    workflow = Workflow("Async Pipeline")
    workflow.add_task("task1", SimpleTask(result={"value": 1}))
    workflow.add_task("task2", SimpleTask(result={"value": 2}))

    engine = WorkflowEngine()
    result = await engine.execute_async(workflow)

    print(f"Success: {result.success}")
    print(f"Duration: {result.duration:.2f}s")

asyncio.run(main())

Core Concepts

Workflow

A workflow defines a sequence of tasks with their dependencies. Use the builder pattern to create workflows:

workflow = Workflow("My Pipeline", description="Optional description")
workflow.add_task("task1", MyTask())
workflow.add_task("task2", MyTask(), depends_on=["task1"])
workflow.add_task("task3", MyTask(), depends_on=["task1"])  # Parallel with task2

Tasks

Create custom tasks by inheriting from Task:

from socratic_workflow import Task

class CustomTask(Task):
    def execute(self, context):
        # context contains results from previous tasks
        return {"result": "value"}

    async def execute_async(self, context):
        # Optional: override for true async execution
        return await self.execute(context)

WorkflowEngine

The engine orchestrates task execution:

engine = WorkflowEngine()

# Sync execution
result = engine.execute(workflow)

# Async execution
result = await engine.execute_async(workflow)

# Save/load state
engine.save_state("workflow.json")
engine.load_state("workflow.json")

Results

Get detailed execution results:

result = engine.execute(workflow)

print(result.success)          # bool
print(result.duration)         # float (seconds)
print(result.task_results)     # Dict[str, Any]
print(result.errors)           # Dict[str, str]
print(result.to_dict())        # Full result as dict

API Reference

Workflow

class Workflow:
    def __init__(self, name: str, description: str = "")
    def add_task(self, task_id: str, task: Task, depends_on: Optional[List[str]] = None) -> Workflow
    def get_task(self, task_id: str) -> Optional[Task]
    def list_tasks(self) -> List[str]
    def get_dependencies(self, task_id: str) -> List[str]
    def to_dict(self) -> Dict[str, Any]

Task

class Task(ABC):
    def __init__(self, name: Optional[str] = None, **kwargs)
    @abstractmethod
    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]
    async def execute_async(self, context: Dict[str, Any]) -> Dict[str, Any]
    def to_dict(self) -> Dict[str, Any]

WorkflowEngine

class WorkflowEngine:
    def __init__(self, llm_client: Optional[Any] = None)
    def execute(self, workflow: Workflow) -> WorkflowResult
    async def execute_async(self, workflow: Workflow) -> WorkflowResult
    def save_state(self, path: str) -> None
    def load_state(self, path: str) -> None

WorkflowResult

class WorkflowResult:
    success: bool
    duration: float
    task_results: Dict[str, Any]
    errors: Dict[str, str]
    metrics: Dict[str, Any]
    def to_dict(self) -> Dict[str, Any]

Support Development

If you find this package useful, consider supporting development:

Your support helps fund development of the entire Socratic ecosystem.

Examples

Example 1: Basic Workflow

See examples/01_basic_workflow.py for a complete example.

Example 2: Cost Tracking

See examples/02_cost_tracking.py (Phase 2).

Example 3: Parallel Execution

See examples/03_parallel_execution.py (Phase 3).

Example 4: Error Recovery

See examples/04_error_recovery.py (Phase 3).

Advanced Features

Cost Tracking

Track LLM costs across multiple providers:

from socratic_workflow import CostTracker

tracker = CostTracker()

# Track individual calls
cost1 = tracker.track_call("claude-opus-4", 1000, 500)
cost2 = tracker.track_call("gpt-4", 1000, 500)

# Get summary
summary = tracker.get_summary()
print(f"Total cost: ${summary['total_cost_usd']:.4f}")
print(f"By provider: {summary['cost_by_provider']}")

# Get recommendations
recommendations = tracker.get_recommendations()
for rec in recommendations:
    print(f"- {rec}")

Supports 16+ models across 5 providers:

  • Anthropic: Claude Opus-4, Sonnet-3.5, Haiku-4.5
  • OpenAI: GPT-4, GPT-4o, GPT-3.5-turbo
  • Google: Gemini 1.5 Pro, Gemini 1.5 Flash
  • Meta: Llama-2-70b, Llama-3-70b
  • Mistral: Mistral-7b, Mistral-Large

Parallel Execution

Run independent tasks concurrently:

from socratic_workflow.execution.executor import ParallelExecutor

executor = ParallelExecutor(max_workers=5)
result = await executor.execute_parallel(workflow)

metrics = executor.get_execution_metrics()
print(f"Success rate: {metrics['success_rate']:.1%}")

Error Recovery

Automatic retry with exponential backoff:

from socratic_workflow.execution.retry import RetryConfig, retry

config = RetryConfig(
    max_attempts=3,
    initial_delay=1.0,
    exponential_base=2.0,
    jitter=True
)

@retry(config)
def risky_operation():
    # Automatically retried on failure
    return result

Documentation

  • Workflow Optimization Guide - Complete guide to the Quality Controller mechanism, path optimization, cost calculation, and risk assessment algorithms

Performance Metrics

Collect and analyze performance data:

from socratic_workflow.analytics import MetricsCollector

collector = MetricsCollector()
# ... execute workflow ...
summary = collector.get_summary()

print(f"Total duration: {summary['total_duration']:.2f}s")
print(f"Bottlenecks: {summary['bottlenecks']}")
print(f"Success rate: {summary['success_rate']:.1%}")

Integrations

LangChain

Integrate with LangChain agents:

import json
from socratic_workflow.integrations.langchain import SocraticWorkflowTool
from langchain.agents import Tool

tool = SocraticWorkflowTool(track_costs=True)

# Create LangChain tool
langchain_tool = Tool(
    name="execute_workflow",
    func=tool.execute_sync,
    description=tool.get_tool_description()
)

# Use in agent
from langchain.agents import initialize_agent
agent = initialize_agent([langchain_tool], llm, agent="zero-shot-react-description")

Openclaw

Create Openclaw skills:

from socratic_workflow.integrations.openclaw import SocraticWorkflowSkill

skill = SocraticWorkflowSkill(
    use_parallel_executor=True,
    track_costs=True,
    track_metrics=True
)

# Get capabilities for discovery
capabilities = skill.get_capabilities()

# Execute workflow
result = await skill.execute_workflow({
    "name": "My Workflow",
    "tasks": [
        {"id": "task1", "type": "SimpleTask", "result": {"status": "ok"}}
    ]
})

Roadmap

Phase 1: Core Infrastructure ✅

  • Workflow definition and execution
  • Task base class
  • State management
  • Sync and async execution

Phase 2: Cost Tracking & Analytics ✅

  • Cost tracker with provider pricing (16+ models)
  • Performance metrics collection
  • Cost recommendations

Phase 3: Advanced Execution ✅

  • Parallel task execution with asyncio
  • Task scheduler with dependency resolution
  • Retry logic with exponential backoff

Phase 4: Integrations ✅

  • Openclaw skill integration
  • LangChain tool integration
  • Socratic Agents compatibility

Phase 5: Polish & Release ✅

  • Comprehensive documentation
  • 4 complete examples
  • PyPI publication ready
  • 95% test coverage (188 tests)
  • All quality checks passing

Contributing

Contributions welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass
  5. Submit a pull request

Testing

# Run tests
pytest

# With coverage
pytest --cov=src/socratic_workflow

# Specific test file
pytest tests/unit/test_engine.py -v

Quality

# Format code
black src/ tests/ examples/

# Lint
ruff check src/ tests/ examples/

# Type check
mypy src/socratic_workflow

License

MIT License - see LICENSE file for details.

Project Status

Version: 0.1.0 (MVP) ✅

Statistics

  • Lines of Code: ~3,400
  • Test Coverage: 95% (188 tests)
  • Supported Python: 3.9, 3.10, 3.11, 3.12
  • Quality: Black, Ruff, MyPy ✅
  • CI/CD: GitHub Actions (automated testing and quality checks)

Implementation Progress

  • ✅ Phase 1: Core Infrastructure (100%)
  • ✅ Phase 2: Cost Tracking & Analytics (100%)
  • ✅ Phase 3: Advanced Execution (100%)
  • ✅ Phase 4: Integrations (100%)
  • ✅ Phase 5: Polish & Release (100%)

Support

Related Projects


Built with ❤️ by the Socratic Ecosystem

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

socratic_workflow-0.1.4.tar.gz (19.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

socratic_workflow-0.1.4-py3-none-any.whl (9.0 kB view details)

Uploaded Python 3

File details

Details for the file socratic_workflow-0.1.4.tar.gz.

File metadata

  • Download URL: socratic_workflow-0.1.4.tar.gz
  • Upload date:
  • Size: 19.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for socratic_workflow-0.1.4.tar.gz
Algorithm Hash digest
SHA256 b8f2c10393f0ba20f093a01951501ac8d8fedc9c2634ea5591a28b34bde5816d
MD5 9b052f02d52c0e4601c384e4a2a9fff9
BLAKE2b-256 55264ffad5bac0488341d303e992488e8bb6065dc52c81c0f33b0ec42761ca22

See more details on using hashes here.

File details

Details for the file socratic_workflow-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for socratic_workflow-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 91530520fa8c78c1be29e1682f8bdbb7ba26110e11b8486a8346f50a0eb813c7
MD5 f21533b04c86e27196b58f21f612d8a9
BLAKE2b-256 21d73fb4ba661fb9a40e2ba936f12f10c28d28cdc559e9018ec53dc36e772849

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page