Skip to main content

A simple, developer-friendly Python package for creating AI workflows

Project description

OpenWorkflows

A simple, developer-friendly Python package for creating AI workflows.

Features

  • Declarative workflow definition - Define workflows in Python with a fluent API
  • Node-based architecture - Build workflows by connecting reusable nodes
  • Type-safe - Leverage Python type hints for validation
  • Async-first - Built on asyncio for high performance
  • Extensible - Easy to create custom nodes
  • Provider agnostic - Support any LLM provider
  • Multilingual schemas - Built-in support for internationalized node metadata
  • Zero dependencies (core) - Only requires Python 3.8+

Installation

uv pip install openworkflows

For LLM support:

uv pip install "openworkflows[llm]"

Quick Start

Basic Workflow

import asyncio
from openworkflows import Workflow

# Create a workflow
workflow = Workflow("Hello World")

# Add nodes
workflow.add_node("input", "input", {"name": "message"})
workflow.add_node("output", "output")

# Connect nodes
workflow.connect("input.value", "output.value")

# Run workflow
async def main():
    result = await workflow.run(inputs={"message": "Hello, OpenWorkflows!"})
    print(result)

asyncio.run(main())

LLM Workflow

import asyncio
from openworkflows import Workflow, MockLLMProvider

# Create workflow with LLM provider
workflow = Workflow("Text Generation")
workflow.set_llm_provider(MockLLMProvider("This is AI-generated text"))

# Add nodes
workflow.add_node("input", "input", {"name": "prompt"})
workflow.add_node("generate", "generate_text", {"temperature": 0.7})
workflow.add_node("output", "output")

# Connect nodes
workflow.connect("input.value", "generate.prompt")
workflow.connect("generate.text", "output.value")

# Run workflow
async def main():
    result = await workflow.run(inputs={"prompt": "Write a haiku about code"})
    print(result["output"]["result"])

asyncio.run(main())

Template Workflow

import asyncio
from openworkflows import Workflow, MockLLMProvider

workflow = Workflow("Prompt Template")
workflow.set_llm_provider(MockLLMProvider())

# Add nodes
workflow.add_node("topic_input", "input", {"name": "topic"})
workflow.add_node("template", "template", {
    "template": "Write a short story about {topic}"
})
workflow.add_node("generate", "generate_text")
workflow.add_node("output", "output")

# Connect nodes
workflow.connect("topic_input.value", "template.variables")
workflow.connect("template.text", "generate.prompt")
workflow.connect("generate.text", "output.value")

# Run
async def main():
    result = await workflow.run(inputs={"topic": "dragons"})
    print(result["output"]["result"])

asyncio.run(main())

Creating Custom Nodes

Using the @node Decorator

from openworkflows import node, ExecutionContext, register_node

@node(inputs={"text": str}, outputs={"length": int})
async def count_chars(ctx: ExecutionContext) -> int:
    text = ctx.input("text", "")
    return len(text)

# Register the node
register_node("count_chars")(count_chars)

# Use in workflow
workflow.add_node("counter", "count_chars")

Creating a Node Class

from openworkflows import Node, ExecutionContext, register_node
from openworkflows.parameters import Parameter
from typing import Dict, Any

@register_node("uppercase")
class UppercaseNode(Node):
    """Converts text to uppercase."""

    inputs = {"text": str}
    outputs = {"result": str}

    async def execute(self, ctx: ExecutionContext) -> Dict[str, Any]:
        text = ctx.input("text", "")
        return {"result": text.upper()}

# Use in workflow
workflow.add_node("upper", "uppercase")

Node Parameters

Nodes support configurable parameters that are set at instantiation time:

from openworkflows import Node, ExecutionContext, register_node
from openworkflows.parameters import Parameter

@register_node("multiplier")
class MultiplierNode(Node):
    inputs = {"value": float}
    outputs = {"result": float}
    parameters = {
        "factor": Parameter(
            name="factor",
            type=float,
            default=2.0,
            required=False,
            description="Multiplication factor",
            validator=lambda x: x > 0,  # Custom validation
        ),
    }

    async def execute(self, ctx: ExecutionContext) -> Dict[str, Any]:
        value = ctx.input("value")
        factor = self.param("factor")  # Access parameter
        return {"result": value * factor}

# Use with custom parameters
workflow.add_node("mult", "multiplier", {"factor": 3.5})

Built-in nodes support parameters:

# GenerateTextNode with parameters
workflow.add_node("gen", "generate_text", {
    "model": "gpt-4",
    "temperature": 0.7,
    "max_tokens": 500
})

# TemplateNode with parameters
workflow.add_node("template", "template", {
    "template": "Hello, {name}!",
    "strict": True  # Fail on missing variables
})

# TransformNode with choices validation
workflow.add_node("transform", "transform", {
    "transform": "upper"  # Must be a valid choice
})

Built-in Nodes

Input/Output Nodes

  • input - Receive workflow inputs
  • output - Collect workflow outputs

Transform Nodes

  • template - Fill text templates with variables
  • transform - Apply transformations (upper, lower, strip, etc.)
  • merge - Combine multiple inputs

LLM Nodes

  • generate_text - Generate text using an LLM

HTTP Nodes

  • http_request - Make HTTP requests with templated URLs and bodies
  • http_get - Simplified GET request node
  • http_post - Simplified POST request node

LLM Providers

Mock Provider (for testing)

from openworkflows import MockLLMProvider

provider = MockLLMProvider("Mock response")
workflow.set_llm_provider(provider)

Custom Provider

from openworkflows import LLMProvider
from typing import Optional, AsyncIterator

class MyLLMProvider(LLMProvider):
    async def generate(self, prompt: str, system: Optional[str] = None,
                      temperature: float = 0.7, max_tokens: Optional[int] = None,
                      **kwargs) -> str:
        # Your implementation
        pass

    async def stream(self, prompt: str, system: Optional[str] = None,
                    temperature: float = 0.7, max_tokens: Optional[int] = None,
                    **kwargs) -> AsyncIterator[str]:
        # Your implementation
        pass

    async def embed(self, text: str, **kwargs) -> list[float]:
        # Your implementation
        pass

workflow.set_llm_provider(MyLLMProvider())

Advanced Features

Services

Inject custom services into workflow context:

workflow.add_service("database", db_connection)
workflow.add_service("cache", redis_client)

# Access in nodes
class MyNode(Node):
    async def execute(self, ctx: ExecutionContext):
        db = ctx.service("database")
        # Use database

Metadata

Pass metadata to all nodes:

result = await workflow.run(
    inputs={"message": "Hello"},
    metadata={"user_id": "123", "request_id": "abc"}
)

# Access in nodes
class MyNode(Node):
    async def execute(self, ctx: ExecutionContext):
        user_id = ctx.metadata.get("user_id")

Error Handling

try:
    result = await workflow.run(inputs={"prompt": "test"})
except ValueError as e:
    print(f"Validation error: {e}")
except Exception as e:
    print(f"Execution error: {e}")

Frontend Integration

Multilingual Node Schemas

OpenWorkflows nodes include optional multilingual schemas for building visual workflow editors. All built-in nodes have English and Polish translations.

from openworkflows import registry

# Get schema for a specific node
info = registry.get_node_info("transform")
print(info["schema"]["label"]["en"])  # "Transform"
print(info["schema"]["label"]["pl"])  # "Przekształć"
print(info["schema"]["icon"])          # "🔄"

# Get all registered nodes
all_nodes = registry.list_nodes()
for node_type in all_nodes:
    info = registry.get_node_info(node_type)
    if info and info.get("schema"):
        print(f"{info['schema']['icon']} {node_type}: {info['schema']['label']['en']}")

Schema Format

Nodes can include an optional schema dict with multilingual labels:

from openworkflows import Node, register_node

@register_node("my_node")
class MyNode(Node):
    inputs = {"text": str}
    outputs = {"result": str}

    schema = {
        "label": {"en": "My Node", "pl": "Mój Węzeł"},
        "description": {"en": "Process text", "pl": "Przetwarzaj tekst"},
        "category": "transform",
        "icon": "⚡",
        "inputs": {
            "text": {
                "label": {"en": "Input Text", "pl": "Tekst Wejściowy"},
                "description": {"en": "Text to process", "pl": "Tekst do przetworzenia"}
            }
        },
        "outputs": {
            "result": {
                "label": {"en": "Result", "pl": "Wynik"}
            }
        },
        "parameters": {
            "mode": {
                "label": {"en": "Mode", "pl": "Tryb"},
                "choices": {
                    "fast": {"en": "Fast", "pl": "Szybki"},
                    "accurate": {"en": "Accurate", "pl": "Dokładny"}
                }
            }
        }
    }

    async def execute(self, ctx):
        return {"result": ctx.input("text").upper()}

Using Schemas in Your Frontend

import json
from openworkflows import registry

# Export all node metadata for frontend
all_nodes = registry.list_nodes()
schemas = {
    name: registry.get_node_info(name)
    for name in all_nodes
}

with open("node_schemas.json", "w") as f:
    json.dump(schemas, f, indent=2)

Your frontend can use these schemas to:

  • Display node names/descriptions in multiple languages
  • Show icons for visual node identification
  • Render localized input/output labels
  • Display parameter choices in the user's language
  • Organize nodes by category

Example frontend usage:

// Load schemas
const schemas = await fetch('/api/node-schemas').then(r => r.json());

// Get localized node name
const userLang = 'pl';  // or 'en'
const nodeInfo = schemas['transform'];
const nodeName = nodeInfo.schema.label[userLang];  // "Przekształć"
const nodeIcon = nodeInfo.schema.icon;              // "🔄"

// Render parameter choices in user's language
const choices = nodeInfo.schema.parameters.transform.choices;
// { "upper": {"en": "UPPERCASE", "pl": "WIELKIE LITERY"}, ... }

Architecture

OpenWorkflows uses a node-based directed acyclic graph (DAG) architecture:

  1. Nodes - Self-contained units of work with typed inputs/outputs and optional multilingual schemas
  2. Edges - Connections between nodes that pass data
  3. Workflow - Orchestrates node execution in topological order
  4. Context - Provides nodes with inputs, services, and metadata
  5. Registry - Manages node type registration, instantiation, and schema export

Development

# Clone repository
git clone https://github.com/yourusername/openworkflows.git
cd openworkflows

# Install with dev dependencies
uv pip install -e ".[dev]"

# Run tests
pytest

# Format code
black openworkflows
ruff check openworkflows

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License - see LICENSE file for details

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openworkflows-0.1.1.tar.gz (1.2 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

openworkflows-0.1.1-py3-none-any.whl (34.8 kB view details)

Uploaded Python 3

File details

Details for the file openworkflows-0.1.1.tar.gz.

File metadata

  • Download URL: openworkflows-0.1.1.tar.gz
  • Upload date:
  • Size: 1.2 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.21

File hashes

Hashes for openworkflows-0.1.1.tar.gz
Algorithm Hash digest
SHA256 78238d756af1f6fbcf114c8be10a43a2b0e8d9fd77b86efe7c59f62db08f5747
MD5 3bbc368b5f34fe9994ce030720679adf
BLAKE2b-256 f6572b4290b5f7f43861e3071d40f5f116023512a0adbc4933c8f94979b2844c

See more details on using hashes here.

File details

Details for the file openworkflows-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for openworkflows-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3db4756220fe4a67f559f09148f5f88e54ad93a09c521bc9b68e3c19af5b4fd1
MD5 ff2ba4bd527317ed075b519c7a2757d4
BLAKE2b-256 41146ae26c45d666627db886f1890fd6697e9f9f97e3347dfd9b2fda0deb22c0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page