Skip to main content

A simple, developer-friendly Python package for creating AI workflows

Project description

OpenWorkflows

A simple, developer-friendly Python package for creating AI workflows.

Features

  • Declarative workflow definition - Define workflows in Python with a fluent API
  • Node-based architecture - Build workflows by connecting reusable nodes
  • Type-safe - Leverage Python type hints for validation
  • Async-first - Built on asyncio for high performance
  • Extensible - Easy to create custom nodes
  • Provider agnostic - Support any LLM provider
  • Zero dependencies (core) - Only requires Python 3.8+

Installation

uv pip install openworkflows

For LLM support:

uv pip install "openworkflows[llm]"

Quick Start

Basic Workflow

import asyncio
from openworkflows import Workflow

# Create a workflow
workflow = Workflow("Hello World")

# Add nodes
workflow.add_node("input", "input", {"name": "message"})
workflow.add_node("output", "output")

# Connect nodes
workflow.connect("input.value", "output.value")

# Run workflow
async def main():
    result = await workflow.run(inputs={"message": "Hello, OpenWorkflows!"})
    print(result)

asyncio.run(main())

LLM Workflow

import asyncio
from openworkflows import Workflow, MockLLMProvider

# Create workflow with LLM provider
workflow = Workflow("Text Generation")
workflow.set_llm_provider(MockLLMProvider("This is AI-generated text"))

# Add nodes
workflow.add_node("input", "input", {"name": "prompt"})
workflow.add_node("generate", "generate_text", {"temperature": 0.7})
workflow.add_node("output", "output")

# Connect nodes
workflow.connect("input.value", "generate.prompt")
workflow.connect("generate.text", "output.value")

# Run workflow
async def main():
    result = await workflow.run(inputs={"prompt": "Write a haiku about code"})
    print(result["output"]["result"])

asyncio.run(main())

Template Workflow

import asyncio
from openworkflows import Workflow, MockLLMProvider

workflow = Workflow("Prompt Template")
workflow.set_llm_provider(MockLLMProvider())

# Add nodes
workflow.add_node("topic_input", "input", {"name": "topic"})
workflow.add_node("template", "template", {
    "template": "Write a short story about {topic}"
})
workflow.add_node("generate", "generate_text")
workflow.add_node("output", "output")

# Connect nodes
workflow.connect("topic_input.value", "template.variables")
workflow.connect("template.text", "generate.prompt")
workflow.connect("generate.text", "output.value")

# Run
async def main():
    result = await workflow.run(inputs={"topic": "dragons"})
    print(result["output"]["result"])

asyncio.run(main())

Creating Custom Nodes

Using the @node Decorator

from openworkflows import node, ExecutionContext, register_node

@node(inputs={"text": str}, outputs={"length": int})
async def count_chars(ctx: ExecutionContext) -> int:
    text = ctx.input("text", "")
    return len(text)

# Register the node
register_node("count_chars")(count_chars)

# Use in workflow
workflow.add_node("counter", "count_chars")

Creating a Node Class

from openworkflows import Node, ExecutionContext, register_node
from openworkflows.parameters import Parameter
from typing import Dict, Any

@register_node("uppercase")
class UppercaseNode(Node):
    """Converts text to uppercase."""

    inputs = {"text": str}
    outputs = {"result": str}

    async def execute(self, ctx: ExecutionContext) -> Dict[str, Any]:
        text = ctx.input("text", "")
        return {"result": text.upper()}

# Use in workflow
workflow.add_node("upper", "uppercase")

Node Parameters

Nodes support configurable parameters that are set at instantiation time:

from openworkflows import Node, ExecutionContext, register_node
from openworkflows.parameters import Parameter

@register_node("multiplier")
class MultiplierNode(Node):
    inputs = {"value": float}
    outputs = {"result": float}
    parameters = {
        "factor": Parameter(
            name="factor",
            type=float,
            default=2.0,
            required=False,
            description="Multiplication factor",
            validator=lambda x: x > 0,  # Custom validation
        ),
    }

    async def execute(self, ctx: ExecutionContext) -> Dict[str, Any]:
        value = ctx.input("value")
        factor = self.param("factor")  # Access parameter
        return {"result": value * factor}

# Use with custom parameters
workflow.add_node("mult", "multiplier", {"factor": 3.5})

Built-in nodes support parameters:

# GenerateTextNode with parameters
workflow.add_node("gen", "generate_text", {
    "model": "gpt-4",
    "temperature": 0.7,
    "max_tokens": 500
})

# TemplateNode with parameters
workflow.add_node("template", "template", {
    "template": "Hello, {name}!",
    "strict": True  # Fail on missing variables
})

# TransformNode with choices validation
workflow.add_node("transform", "transform", {
    "transform": "upper"  # Must be a valid choice
})

Built-in Nodes

Input/Output Nodes

  • input - Receive workflow inputs
  • output - Collect workflow outputs

Transform Nodes

  • template - Fill text templates with variables
  • transform - Apply transformations (upper, lower, strip, etc.)
  • merge - Combine multiple inputs

LLM Nodes

  • generate_text - Generate text using an LLM

HTTP Nodes

  • http_request - Make HTTP requests with templated URLs and bodies
  • http_get - Simplified GET request node
  • http_post - Simplified POST request node

LLM Providers

Mock Provider (for testing)

from openworkflows import MockLLMProvider

provider = MockLLMProvider("Mock response")
workflow.set_llm_provider(provider)

Custom Provider

from openworkflows import LLMProvider
from typing import Optional, AsyncIterator

class MyLLMProvider(LLMProvider):
    async def generate(self, prompt: str, system: Optional[str] = None,
                      temperature: float = 0.7, max_tokens: Optional[int] = None,
                      **kwargs) -> str:
        # Your implementation
        pass

    async def stream(self, prompt: str, system: Optional[str] = None,
                    temperature: float = 0.7, max_tokens: Optional[int] = None,
                    **kwargs) -> AsyncIterator[str]:
        # Your implementation
        pass

    async def embed(self, text: str, **kwargs) -> list[float]:
        # Your implementation
        pass

workflow.set_llm_provider(MyLLMProvider())

Advanced Features

Services

Inject custom services into workflow context:

workflow.add_service("database", db_connection)
workflow.add_service("cache", redis_client)

# Access in nodes
class MyNode(Node):
    async def execute(self, ctx: ExecutionContext):
        db = ctx.service("database")
        # Use database

Metadata

Pass metadata to all nodes:

result = await workflow.run(
    inputs={"message": "Hello"},
    metadata={"user_id": "123", "request_id": "abc"}
)

# Access in nodes
class MyNode(Node):
    async def execute(self, ctx: ExecutionContext):
        user_id = ctx.metadata.get("user_id")

Error Handling

try:
    result = await workflow.run(inputs={"prompt": "test"})
except ValueError as e:
    print(f"Validation error: {e}")
except Exception as e:
    print(f"Execution error: {e}")

Frontend Integration

Node Schema Generation

OpenWorkflows provides automatic schema generation for building visual workflow editors:

from openworkflows.schema import get_node_schema, get_all_node_schemas

# Get schema for a specific node type
schema = get_node_schema("transform")
print(schema)
# {
#   "type": "transform",
#   "name": "Transform",
#   "description": "Node that applies a transformation...",
#   "inputs": [
#     {"name": "input", "type": "Any", "component": "text"}
#   ],
#   "outputs": [
#     {"name": "output", "type": "Any"}
#   ],
#   "parameters": [
#     {
#       "name": "transform",
#       "component": "dropdown",
#       "options": ["identity", "upper", "lower", "strip", ...],
#       "default": "identity",
#       "required": false,
#       "description": "Transformation to apply"
#     }
#   ]
# }

# Get all registered node schemas
all_schemas = get_all_node_schemas()

Type to UI Component Mapping

Python types are automatically mapped to UI components:

Python Type UI Component Example
str Text input {"component": "text"}
int Number input (integer) {"component": "number", "inputType": "integer"}
float Number input (float) {"component": "number", "inputType": "float"}
bool Checkbox {"component": "checkbox"}
dict / Dict[str, Any] JSON editor {"component": "json"}
list / List[T] JSON editor {"component": "json"}
Literal["a", "b", "c"] Dropdown {"component": "dropdown", "options": ["a", "b", "c"]}
Optional[T] Same as T + optional flag {"component": "text", "optional": true}

Using Schemas in Your Frontend

import json
from openworkflows.schema import get_all_node_schemas

# Export schemas for frontend
schemas = get_all_node_schemas()
with open("node_schemas.json", "w") as f:
    json.dump(schemas, f, indent=2)

Your frontend can use these schemas to:

  • Render node configuration forms dynamically
  • Validate user inputs before workflow execution
  • Display appropriate UI components (dropdowns, text inputs, checkboxes, etc.)
  • Show parameter descriptions and defaults
  • Handle optional vs required fields

Example React/TypeScript usage:

// Load schemas
const schemas = await fetch('/api/node-schemas').then(r => r.json());

// Render node config form
function NodeConfigForm({ nodeType }) {
  const schema = schemas.find(s => s.type === nodeType);

  return (
    <form>
      {schema.parameters.map(param => {
        switch(param.component) {
          case 'dropdown':
            return <Select options={param.options} defaultValue={param.default} />;
          case 'text':
            return <Input type="text" defaultValue={param.default} />;
          case 'number':
            return <Input type="number" defaultValue={param.default} />;
          case 'checkbox':
            return <Checkbox defaultChecked={param.default} />;
          case 'json':
            return <JsonEditor defaultValue={param.default} />;
        }
      })}
    </form>
  );
}

Architecture

OpenWorkflows uses a node-based directed acyclic graph (DAG) architecture:

  1. Nodes - Self-contained units of work with typed inputs/outputs
  2. Edges - Connections between nodes that pass data
  3. Workflow - Orchestrates node execution in topological order
  4. Context - Provides nodes with inputs, services, and metadata
  5. Registry - Manages node type registration and instantiation
  6. Schema - Introspects nodes to generate frontend-compatible JSON schemas

Development

# Clone repository
git clone https://github.com/yourusername/openworkflows.git
cd openworkflows

# Install with dev dependencies
uv pip install -e ".[dev]"

# Run tests
pytest

# Format code
black openworkflows
ruff check openworkflows

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License - see LICENSE file for details

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openworkflows-0.1.0.tar.gz (1.2 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

openworkflows-0.1.0-py3-none-any.whl (29.7 kB view details)

Uploaded Python 3

File details

Details for the file openworkflows-0.1.0.tar.gz.

File metadata

  • Download URL: openworkflows-0.1.0.tar.gz
  • Upload date:
  • Size: 1.2 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.21

File hashes

Hashes for openworkflows-0.1.0.tar.gz
Algorithm Hash digest
SHA256 fbcd0d3633091d326a944a04e4d3da986605bd07728a162c3ad97a5103a693a9
MD5 4d12185356d0b9423f7184b5bb769cf6
BLAKE2b-256 70a91d777bdeb82caed16aebc14ce6ca5f64ca7300a51691a3c6447d08725ef7

See more details on using hashes here.

File details

Details for the file openworkflows-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for openworkflows-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1b590b6053901a36c3053530125cf37945a3b7ca2dbe969a3edf7a1e51de2d74
MD5 a0a004ec0cd291b1e9fbf086c374d971
BLAKE2b-256 a1206b570ca87ef8c6422574d347b309b0d8d1944e0d396d6e69215afba5dcad

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page