Python 3.10+ implementation of GnOuGo.Flow.Core
Project description
gnougo-flow-core — YAML Workflow DSL Engine (Python)
Python 3.10+ implementation of GnOuGo.Flow.Core, the declarative YAML workflow DSL engine.
Write YAML workflows that orchestrate LLMs, MCP servers, templates, loops, human input, and dynamic code generation — all from a single file.
Package Status and Parity
The .NET library at src/GnOuGo.Flow.Core/ is the source of truth.
This Python package mirrors its public surface as closely as Python idioms allow. See PORTING_TODO.md for the detailed parity log and remaining work items.
| Area | Status |
|---|---|
YAML DSL parser (version:) |
Yes |
| Validation + compilation pipeline | Yes |
Expression interpolation ${...} + built-in functions |
Yes (AST-based JS-subset interpreter) |
Mustache template.render engine |
Yes |
WFScript (functions: block) |
Yes multi-statement (var/let/const, if/else, return) |
| Runtime engine + step registry | Yes |
Step types: set, assert.non_null, emit, sequence, parallel, loop.sequential, loop.parallel, switch, template.render, llm.call, mcp.list, mcp.call, human.input, workflow.call, workflow.plan, workflow.execute |
Yes |
MCP integrations (InMemoryMcpClientFactory, ConfiguredMcpClientFactory, cache helper) |
Yes |
MCP progressEvents -> thinking telemetry + stdio JSONL real-time progress |
Yes |
MCP server-level DiscoveryTimeoutSeconds / CallTimeoutSeconds metadata |
Yes |
LLMRequest.reasoning field |
Yes |
| Model metadata catalog (pricing, token limits, capabilities, overrides) | Yes |
workflow.plan default mode="auto" classifier |
Yes |
workflow.plan defaults reasoning="medium" |
Yes |
workflow.plan repair mode for persisted workflow fixes |
Yes |
workflow.plan pipeline decomposition, structured extraction, quality reports, and strict semantic checks |
Yes |
MCP tool output_schema / example_response planning contracts |
Yes |
Workflow source telemetry (source_text / source_format) |
Yes |
JsonSchemaConverter (inputs/outputs to JSON Schema) |
Yes |
WorkflowCheckpointer + WorkflowEngine.resume_async |
Yes |
CLI: validate / inspect / run subcommands |
Yes |
Table of Contents
- Package Status and Parity
- Architecture
- Get Started — One-file with mocks
- Quick Start
- Document Structure
- Step Types Reference
- Typed Inputs
- Typed Outputs
- Expressions
${...} - WFScript - Custom JavaScript Functions
- Error Handling
- Model Metadata Catalog
- CLI
- Python Runtime Notes
Architecture
librairies/python/gnougo-flow-core/
pyproject.toml # Python package metadata and dependencies
src/gnougo_flow_core/ # Publishable Python library
models.py # DSL model (Document, Workflow, Step, etc.)
parsing.py # Parse YAML to model (PyYAML)
expressions.py # Expression interpolation `${...}`
_jsmini.py # In-tree JS-subset interpreter for expressions and WFScript
templating.py # Minimal Mustache-compatible renderer
scripting.py # WFScript helpers
compilation.py # Document validation + compilation
runtime.py # Execution engine + executor registry
runtime_contracts.py # Protocols for LLM, MCP, HITL, workflow fetching, telemetry
checkpointing.py # Workflow checkpoint contracts and in-memory implementation
integrations/ # MCP and LLM adapter helpers
runtime_steps/ # Executor re-export modules for step families
tests/ # Dedicated Python unit tests
The package is intentionally independent from the .NET assembly at runtime. It keeps the same DSL concepts and stable contracts so workflows can be shared across Python and .NET hosts.
Get Started — One-file with mocks
This example is a complete Python script that runs fully locally: the LLM client and MCP server are mocked in memory, so no API key, network call, or external MCP process is required.
Install the package:
python -m pip install gnougo-flow-core
Create one_file_flow.py:
import asyncio
import json
from gnougo_flow_core.compilation import WorkflowCompiler
from gnougo_flow_core.integrations import InMemoryMcpClientFactory, MockMcpServerConfig
from gnougo_flow_core.models import LLMResponse, McpCallResult, McpToolInfo
from gnougo_flow_core.parsing import WorkflowParser
from gnougo_flow_core.runtime import WorkflowEngine, apply_workflow_input_defaults
WORKFLOW_YAML = """
version: 1
name: one-file-mocked-flow
workflows:
main:
inputs:
topic: { type: string, required: true }
steps:
- id: discover
type: mcp.list
input:
servers: [demo]
include: ["tools"]
- id: facts
type: mcp.call
input:
server: demo
kind: tool
method: get_facts
request:
topic: "${data.inputs.topic}"
- id: summarize
type: llm.call
input:
model: mock-gpt
prompt: "Summarize these facts as one sentence: ${json(data.steps.facts.response)}"
- id: final
type: template.render
input:
engine: mustache
template: "{{summary}}"
data:
summary: "${data.steps.summarize.text}"
mode: text
outputs:
answer: "${data.steps.final.text}"
tools_seen: "${len(data.steps.discover.tools)}"
facts: "${data.steps.facts.response}"
"""
class MockLLMClient:
async def call_async(self, request):
return LLMResponse(
text=f"[Mock {request.model}] Summary generated from MCP facts.",
usage={"prompt_tokens": 12, "completion_tokens": 18, "total_tokens": 30},
)
def build_mcp_factory() -> InMemoryMcpClientFactory:
factory = InMemoryMcpClientFactory()
def get_facts(arguments):
topic = (arguments or {}).get("topic", "unknown")
return McpCallResult(
is_error=False,
content={
"topic": topic,
"facts": [
f"{topic} is handled by a mocked MCP tool.",
"No network or external service is required.",
],
},
)
factory.register_server(
"demo",
MockMcpServerConfig(
description="A mock knowledge server",
tools=[
McpToolInfo(
name="get_facts",
description="Returns deterministic facts for a topic",
input_schema={
"type": "object",
"properties": {"topic": {"type": "string"}},
"required": ["topic"],
},
output_schema={
"type": "object",
"properties": {
"topic": {"type": "string"},
"facts": {"type": "array", "items": {"type": "string"}},
},
"additionalProperties": False,
},
)
],
tool_handlers={"get_facts": get_facts},
),
)
return factory
async def main() -> None:
document = WorkflowParser.parse(WORKFLOW_YAML)
compiled = WorkflowCompiler().compile(document)
workflow = compiled.workflows[compiled.entrypoint]
engine = WorkflowEngine()
engine.llm_client = MockLLMClient()
engine.mcp_client_factory = build_mcp_factory()
inputs = apply_workflow_input_defaults(workflow.source, {"topic": "GnOuGo.Flow"})
result = await engine.execute_async(workflow, inputs)
if not result.success:
message = result.error.message if result.error else "unknown error"
raise RuntimeError(f"Workflow failed: {message}")
print(json.dumps(result.outputs, indent=2, ensure_ascii=False))
if __name__ == "__main__":
asyncio.run(main())
Run it:
python one_file_flow.py
Expected output shape:
{
"answer": "[Mock mock-gpt] Summary generated from MCP facts.",
"tools_seen": 1,
"facts": {
"topic": "GnOuGo.Flow",
"facts": [
"GnOuGo.Flow is handled by a mocked MCP tool.",
"No network or external service is required."
]
}
}
When developing inside this repository, you can run against the local source tree instead of the published package:
$env:PYTHONPATH = "C:\github\GnouGo\librairies\python\gnougo-flow-core\src"
python one_file_flow.py
Quick Start
Install the published Python package:
python -m pip install gnougo-flow-core
Or add it to a local uv project:
uv add gnougo-flow-core
For repository development, install the package with its development extras from this directory:
uv sync --extra dev
Create hello.yaml:
version: 1
name: hello-world
workflows:
main:
inputs:
name: { type: string, required: true }
steps:
- id: greet
type: template.render
input:
engine: mustache
template: "Hello {{name}}! Welcome to GnOuGo.Flow."
data: { name: "${data.inputs.name}" }
mode: text
outputs:
greeting: "${data.steps.greet.text}"
Validate it:
gnougo-flow validate hello.yaml
Inspect it:
gnougo-flow inspect hello.yaml
Run it from the CLI:
gnougo-flow run hello.yaml -i name=World
Run it from Python:
import asyncio
from gnougo_flow_core.compilation import WorkflowCompiler
from gnougo_flow_core.parsing import WorkflowParser
from gnougo_flow_core.runtime import WorkflowEngine, apply_workflow_input_defaults
async def main() -> None:
yaml_text = open("hello.yaml", encoding="utf-8").read()
document = WorkflowParser.parse(yaml_text)
compiled = WorkflowCompiler().compile(document)
workflow = compiled.workflows[compiled.entrypoint]
inputs = apply_workflow_input_defaults(workflow.source, {"name": "World"})
result = await WorkflowEngine().execute_async(workflow, inputs)
if not result.success:
raise RuntimeError(result.error.message if result.error else "Workflow failed")
print(result.outputs)
asyncio.run(main())
Runtime integrations such as LLM clients, MCP clients, human input providers, workflow fetchers, telemetry, and checkpointing are injected through Python protocols in gnougo_flow_core.runtime_contracts.
Document Structure
Every workflow file starts with:
version: 1 # DSL version (required, always 1)
name: my-workflow # Document name (optional)
functions: | # Global WFScript functions (optional)
function myHelper(x) { return x * 2; }
workflows:
main: # Entrypoint workflow (by convention)
inputs: # Input parameters with types (optional)
message: { type: string, required: true }
steps: # Ordered list of steps (required)
- id: step1
type: template.render
input: { ... }
outputs: # Output expressions (optional)
result: "${data.steps.step1.text}"
You can define multiple workflows in the same document and call them via workflow.call.
Step Common Fields
Every step supports:
- id: unique_step_id # Required — unique within the workflow
type: step_type # Required — one of the step types below
if: "${expression}" # Optional — guard; step is skipped if false
input: { ... } # Step-specific input (supports ${...} at any depth)
output: alias_name # Optional — also expose output as data.<alias_name>
retry: # Optional — automatic retry for retryable errors
max: 3
backoff_ms: 1000
backoff_mult: 2.0
jitter_ms: 100
on_error: # Optional — error handler (see Error Handling)
cases:
- if: "${error.code == \"LLM_TIMEOUT\"}"
action: continue
set_output: "fallback value"
- action: stop
Data Access
All expressions read from a shared data context:
| Path | Content |
|---|---|
data.inputs.* |
Workflow input parameters |
data.steps.<step_id>.* |
Output of a previously executed step |
data.env.* |
Environment variables |
Step Types Reference
template.render — Mustache Templating
Renders a Mustache template with data from the workflow context.
- id: greet
type: template.render
input:
engine: mustache
template: "Hello {{name}}, you have {{count}} items."
data:
name: "${data.inputs.name}"
count: "${len(data.inputs.items)}"
mode: text # "text" (default) or "json"
Output: { text: "Hello World, you have 3 items." }
llm.call — Call a Language Model
Sends a prompt to an LLM and returns the response. Supports structured JSON output.
Basic call
- id: summarize
type: llm.call
input:
model: gpt-4o-mini # Required
prompt: "Summarize this: ${data.inputs.text}" # Required
system: "You are a concise summarizer." # Optional
provider: openai # Optional (default: auto-routed)
temperature: 0.7 # Optional override; omit by default
max_tokens: 2048 # Optional
reasoning: auto # Optional — auto|minimal|low|medium|high|max
# Default: omitted (provider decides).
# Unsupported optional fields are removed by runtime metadata.
temperature, reasoning, structured_output, and tool-calling support are checked against the runtime model metadata catalog before the configured LLM client is called. For example, a request to o4-mini with temperature: 0.7 is automatically sent without temperature.
Output: { text: "...", usage: { prompt_tokens, completion_tokens, total_tokens }, meta: { model } }
Structured output (JSON mode)
- id: classify
type: llm.call
input:
model: gpt-4o
prompt: "Classify this ticket and return JSON: ${data.inputs.ticket}"
structured_output:
schema_inline:
type: object
properties:
category: { type: string }
priority: { type: string, enum: [low, medium, high, critical] }
confidence: { type: number }
required: [category, priority]
strict: true
Output: { text: "...", json: { category: "bug", priority: "high", confidence: 0.92 }, usage: {...} }
Access: data.steps.classify.json.category, data.steps.classify.json.priority
mcp.list — Discover MCP Server Capabilities
Lists tools, resources, and/or prompts exposed by one or more MCP servers.
Use a one-item array for a single server, or servers: ["*"] to discover all configured MCP servers.
- id: discover
type: mcp.list
input:
servers: [github, docs] # Required — configured MCP server names
include: ["tools", "prompts"] # Optional — default: ["tools"]
- id: discover_all
type: mcp.list
input:
servers: ["*"]
include: ["tools"]
Output: { status, text, servers: [...], tools: [...], resources: [...], prompts: [...] }
Flattened tools, resources, and prompts entries each include a server field so downstream steps can keep the server affinity when multiple MCP servers are discovered at once.
timeout_ms is treated as the workflow-requested timeout. When the configured MCP server metadata includes DiscoveryTimeoutSeconds, the effective timeout is the maximum of timeout_ms and the server-level value, matching the .NET behavior that prevents generated workflows from undercutting known-slow MCP servers.
mcp.call — Call MCP Tools or Prompts
Calls one or more capabilities on an MCP server. Three modes are available:
Direct tool call (preferred when tool names are known)
- id: weather
type: mcp.call
input:
server: weather-server
kind: tool
method: get_weather
request: { location: "Paris", units: "celsius" }
timeout_ms: 30000
Output: { status: "ok", response: { temperature: 22, ... } }
Direct prompt call
- id: summarize_prompt
type: mcp.call
input:
server: my-server
kind: prompt
method: summarize_document
request: { text: "${data.inputs.document}" }
Output: { status: "ok", text: "...", messages: [...] }
LLM-assisted call (auto-selects the right tool)
Combine mcp.list → mcp.call with a prompt to let an LLM choose the best tool:
- id: discover
type: mcp.list
input:
servers: [github]
- id: smart_call
type: mcp.call
input:
server: github
model: gpt-4o-mini
temperature: 0.2
prompt: "Find and call the right tool to list my repositories"
tools: "${data.steps.discover.tools}"
prompts: "${data.steps.discover.prompts}"
structured_output:
schema_inline:
type: object
properties:
repos:
type: array
items:
type: object
properties:
name: { type: string }
url: { type: string }
required: [name, url]
required: [repos]
strict: true
Output (LLM-assisted): { status: "ok", selection_mode: "llm", text: "...", tool_calls: [...], results: [...], json: {...} }
MCP progress events -> thinking telemetry
The Python runtime mirrors the .NET GnOuGo.Flow.Core progress contract. For stdio MCP transports, ConfiguredMcpClientFactory.capture_stdio_error_line(...) can receive structured JSONL stderr messages with this shape while the tool is still running:
{
"type": "gnougo.mcp.progress",
"server": "GnOuGo.GithubCopilot.Mcp",
"method": "code_agent_edit",
"kind": "tool",
"event": {
"kind": "session_create",
"level": "thinking",
"message": "Creating Copilot agent session.",
"timestamp": "2026-05-20T10:00:00Z",
"file": "src/Program.cs"
}
}
Matching messages are forwarded immediately as gnougo-flow.step.thinking telemetry events. As a fallback/history mechanism, mcp.call also scans the final tool response for progressEvents (aliases accepted: progress_events, progress, events) and forwards each item the same way. Real-time events are deduplicated against final fallback events.
progressEvents is the stable GnOuGo-facing contract. MCP servers may map provider-specific or SDK-specific events into this schema, but the Python Flow runtime does not depend on native SDK event types.
timeout_ms is treated as the workflow-requested call timeout. When the configured MCP server metadata includes CallTimeoutSeconds, the effective timeout is the maximum of timeout_ms and the server-level value.
Output access patterns
| Mode | Access |
|---|---|
| Single tool | data.steps.<id>.status, data.steps.<id>.response |
| Single prompt | data.steps.<id>.status, data.steps.<id>.text |
| Batch/auto | data.steps.<id>.results (array) |
| LLM-assisted | data.steps.<id>.text, data.steps.<id>.json |
Important: The
responseobject is tool-specific.workflow.plantreats single-tool MCP responses as opaque unless the tool advertisesoutput_schemaorexample_response. Accessdata.steps.<id>.response.<field>only for documented fields. Otherwise pass the whole response withjson(data.steps.<id>.response)or add anllm.callnormalization step withstructured_output.
set — Initialize or Modify Variables
Sets variables in the workflow data context using expressions.
- id: init_vars
type: set
input:
total: 0
prefix: "report_"
full_name: "${data.inputs.first_name + ' ' + data.inputs.last_name}"
items_count: "${len(data.inputs.items)}"
Output: { total: 0, prefix: "report_", full_name: "...", items_count: 5 }
assert.non_null — Require Values Before Using Them
Fails if any resolved input value is null, and exposes the same object as output for downstream steps. Use it to refine nullable structured-output fields before passing them into strict MCP or workflow inputs.
- id: require_doc
type: assert.non_null
input:
id: "${data.steps.derive_doc.json.id}"
- id: fetch
type: mcp.call
input:
server: docs
method: get_doc
request:
id: "${data.steps.require_doc.id}"
emit — Send Progress Messages to the UI
Pushes real-time feedback to the user interface during long-running workflows.
- id: notify_progress
type: emit
input:
message: "Processing item ${data.steps.loop.index} of ${data.steps.loop.count}..."
level: progress # "thinking" | "info" | "progress" | "response"
| Level | Visual |
|---|---|
thinking |
Subtle animated (default) |
info |
Blue informational |
progress |
Green progress indicator |
response |
Highlighted, monospace — appears as assistant content |
human.input — Pause and Wait for User Input
Pauses the workflow and prompts the user for input. The workflow resumes when the user submits a response.
Quick choices
- id: approve
type: human.input
input:
mode: choice
prompt: "The agent wants to call API X. Approve?"
context: "${json(data.steps.plan)}"
choices:
- approve
- reject
- modify
timeout_ms: 36000000 # 10 hours (default)
Structured form fields
- id: user_config
type: human.input
input:
mode: form
prompt: "Please configure the following settings:"
fields:
- name: api_key
type: string
required: true
description: Your API key
- name: region
type: select
options: [us-east, eu-west, ap-south]
default: us-east
- name: max_retries
type: string
required: false
default: "3"
Output: The user's response as a JSON object (e.g., { "response": "approve" } or { "api_key": "...", "region": "eu-west", "max_retries": "3" }).
Modes: text, choice, form, confirm. When omitted, the engine infers form from fields, choice/confirm from choices, otherwise text.
Field types: string, text, textarea, markdown, json, yaml, number, integer, boolean, select, radio, multiselect, checkbox, password, secret, url, email, date, file, directory.
Timeout: If the user doesn't respond within
timeout_ms, the step fails with error codeHUMAN_INPUT_TIMEOUT.
sequence — Run Steps Sequentially
Groups sub-steps that execute one after another.
- id: pipeline
type: sequence
steps:
- id: step_a
type: llm.call
input: { model: gpt-4o-mini, prompt: "Step A" }
- id: step_b
type: llm.call
input: { model: gpt-4o-mini, prompt: "Continue from: ${data.steps.step_a.text}" }
parallel — Run Branches in Parallel
Executes independent branches concurrently.
- id: gather
type: parallel
branches:
- steps:
- id: fetch_weather
type: mcp.call
input: { server: weather, kind: tool, method: get_weather, request: { location: "Paris" } }
- steps:
- id: fetch_news
type: mcp.call
input: { server: news, kind: tool, method: get_headlines, request: { topic: "tech" } }
loop.sequential — Iterate Sequentially
Loops with while condition or fixed times count.
# Fixed count
- id: retry_loop
type: loop.sequential
input:
times: 5
steps:
- id: attempt
type: llm.call
input: { model: gpt-4o-mini, prompt: "Attempt ${data.steps.retry_loop.index}" }
# While condition
- id: poll
type: loop.sequential
input:
while: "${data.steps.check.status != 'ready'}"
max_iterations: 20
steps:
- id: check
type: mcp.call
input: { server: my-server, kind: tool, method: check_status, request: {} }
Loop context: data.steps.<loop_id>.index (current iteration, 0-based), data.steps.<loop_id>.count (total completed).
loop.parallel — Iterate in Parallel
Loops over an array of items, executing iterations concurrently.
- id: process_all
type: loop.parallel
input:
items: "${data.inputs.urls}"
max_concurrency: 5
steps:
- id: fetch
type: mcp.call
input:
server: http-client
kind: tool
method: fetch_url
request: { url: "${data.steps.process_all.item}" }
Loop context: data.steps.<loop_id>.item (current item), data.steps.<loop_id>.index, data.steps.<loop_id>.results (collected results).
switch — Conditional Branching
Two forms: expression-based and when-based.
Form A — Expression/value matching
- id: route
type: switch
input:
expr: "${data.steps.classify.json.category}"
cases:
- value: bug
steps:
- id: handle_bug
type: llm.call
input: { model: gpt-4o-mini, prompt: "Triage this bug..." }
- value: feature
steps:
- id: handle_feature
type: llm.call
input: { model: gpt-4o-mini, prompt: "Plan this feature..." }
default:
- id: handle_other
type: emit
input: { message: "Unknown category, routing to human.", level: info }
Form B — When conditions
- id: priority_route
type: switch
cases:
- when: "${data.inputs.priority == 'critical'}"
steps:
- id: escalate
type: human.input
input: { mode: text, prompt: "Critical issue! Immediate action required." }
- when: "${data.inputs.priority == 'high'}"
steps:
- id: auto_handle
type: llm.call
input: { model: gpt-4o, prompt: "Handle high-priority: ${data.inputs.message}" }
default:
- id: queue
type: emit
input: { message: "Queued for later processing.", level: info }
workflow.call — Call a Sub-Workflow
Calls another workflow through one canonical shape:
input.refidentifies the target workflow.input.argsprovides the target workflow inputs.- The called workflow result is stored in
data.steps.<step_id>.outputs.
Resolution is delegated to WorkflowEngine.workflow_call_resolver (DefaultWorkflowCallResolver by default), so applications can add their own ref.kind values without changing the workflow.call step shape.
Canonical call
- id: run_analysis
type: workflow.call
input:
ref:
kind: local
name: analysis # Name of a workflow in the same document
args:
data: "${data.inputs.raw_data}"
Input/output contract
workflow.call acts like a function call between workflows:
| Where | Meaning |
|---|---|
Parent workflow data.inputs.* |
Inputs received by the currently running workflow. In CLI/Agent usage, these are the values passed by the caller or collected by the UI. |
workflow.call.input.args.* |
Values sent to the called workflow. |
Called workflow data.inputs.* |
The called workflow reads args here. |
Called workflow outputs.* |
Values returned by the called workflow. |
Parent workflow data.steps.<call_step_id>.outputs.* |
Returned values available after the call. |
Parent workflow data.steps.<call_step_id>.workflow |
Name of the workflow that was executed. |
If the called workflow has no outputs block, the engine returns the called workflow step outputs instead. Prefer defining explicit outputs so the contract stays stable.
Complete local example
This example defines three workflows in the same file:
mainreceives the application input.normalize_messageprepares data.classify_messageconsumes normalized data and returns a classification.
version: 1
name: workflow-call-demo
workflows:
main:
inputs:
message: { type: string, required: true }
steps:
- id: normalize
type: workflow.call
input:
ref:
kind: local
name: normalize_message
args:
text: "${data.inputs.message}"
- id: classify
type: workflow.call
input:
ref:
kind: local
name: classify_message
args:
text: "${data.steps.normalize.outputs.normalized_text}"
- id: summary
type: template.render
input:
engine: mustache
template: "Message '{{text}}' was classified as {{category}}."
mode: text
data:
text: "${data.steps.normalize.outputs.normalized_text}"
category: "${data.steps.classify.outputs.category}"
outputs:
normalized_text: "${data.steps.normalize.outputs.normalized_text}"
category: "${data.steps.classify.outputs.category}"
summary: "${data.steps.summary.text}"
normalize_message:
inputs:
text: { type: string, required: true }
steps:
- id: normalize
type: set
input:
normalized_text: "${lower(trim(data.inputs.text))}"
outputs:
normalized_text: "${data.steps.normalize.normalized_text}"
classify_message:
inputs:
text: { type: string, required: true }
steps:
- id: classify
type: set
input:
category: "${contains(data.inputs.text, 'urgent') ? 'critical' : 'standard'}"
outputs:
category: "${data.steps.classify.category}"
Run it from the CLI:
gnougo-flow run workflow-call-demo.yaml -i 'message=Urgent: please review this document'
Expected output fields:
{
"normalized_text": "urgent: please review this document",
"category": "critical",
"summary": "Message 'urgent: please review this document' was classified as critical."
}
Plugging into the current system
In the current GnOuGo flow system, the outer workflow is the integration point:
- The CLI, Agent UI, API, or another workflow provides the outer workflow inputs.
- The outer workflow maps those inputs into sub-workflow
args. - Each sub-workflow declares the
inputsit expects and theoutputsit returns. - The outer workflow reads sub-workflow results from
data.steps.<call_id>.outputs. - The outer workflow exposes its final contract through its own
outputsblock.
This keeps sub-workflows independently testable and reusable: a sub-workflow should not depend on the parent workflow's data.inputs; it should only depend on the args passed to it.
Use this same shape for every resolver-supported reference. The built-in resolver supports local, url, and workspace references, but documentation and generated workflows should prefer the local form above unless an application explicitly configures external workflow resolution.
workflow.route — Select and Run Workflows
Routes a prompt to one or more workflow candidates, resolves the selected workflows, maps inputs, executes them, and combines their outputs.
- id: route
type: workflow.route
input:
prompt: "${data.inputs.prompt}"
candidates:
- ref: { kind: database }
tags_any: [git, documents]
limit: 20
- ref: { kind: local, name: fallback }
description: General fallback.
selection: { mode: multiple, min: 1, max: 3 }
args:
passthrough: true
auto_extract:
provider: openai
model: gpt-5.4-mini
add:
history: "${data.inputs.history}"
execution:
parallel: true
max_concurrency: 3
combine:
strategy: synthesize
Output shape:
{
"selected": [{ "id": "database:DocumentAgent", "name": "DocumentAgent", "reason": "..." }],
"results": [{ "workflow": "DocumentAgent", "success": true, "outputs": { "answer": "..." } }],
"answer": "Final synthesized answer",
"text": "Final synthesized answer"
}
args.passthrough: true starts from the current workflow inputs, and args.add can add explicit values. When args.auto_extract is enabled, workflow.route resolves the selected workflow first, treats that workflow's declared YAML inputs as the authoritative target contract, and asks the LLM to map prompt and history into exactly those input names. Candidate skill.inputs metadata may be included as a hint, but it only becomes the extraction schema when the selected workflow has no declared inputs. Extracted fields and passthrough aliases that are not declared by the target input schema are ignored.
After extraction, defaults are applied and the selected workflow inputs are validated before execution. Before each selected workflow runs, workflow.route emits gnougo-flow.workflow_route.inputs_extracted plus a user-visible gnougo-flow.step.thinking event with level progress, source workflow.route, selected workflow metadata, argument keys, and resolved input keys. When ExecutionLimits.log_step_content is enabled, telemetry includes redacted/truncated resolved input values; otherwise it exposes keys only.
workflow.plan — Generate a Workflow Dynamically via LLM
The most powerful step type: asks an LLM to generate a complete YAML workflow from a natural-language instruction, then validates and compiles it before execution.
mode defaults to auto. Auto mode first asks the configured LLM to estimate the request's cyclomatic complexity and choose basic or pipeline. It chooses basic for requests under 10 meaningful branches, and pipeline when the request should be decomposed into leaf workflows before assembly.
Basic usage
- id: plan
type: workflow.plan
input:
mode: auto # default; use basic to force the single-plan path
generator:
model: gpt-4o
instruction: "Build a workflow that fetches weather for Paris and summarizes it."
context: "Available tools include weather and summarization APIs."
Full configuration
- id: plan
type: workflow.plan
input:
mode: auto # auto | basic | pipeline | repair
generator:
model: gpt-4o # LLM model for planning
provider: openai # Optional — LLM provider
instruction: "Analyze the user's request and build a workflow."
context: "${json(data.inputs)}"
# Reasoning effort for the planning LLM call (and the MCP pre-filter).
# Defaults to "medium" because planning is reasoning-heavy work.
# Set to "auto" to let the provider decide, or any of:
# "minimal" | "low" | "medium" | "high" | "max" | "auto".
# Models without thinking support ignore this field.
reasoning: medium
# MCP pre-filter: uses an LLM to select only relevant MCP servers/tools
# before injecting them into the planning prompt (reduces prompt size)
prefilter: true # true (default) | false | { model, provider }
# Policy constraints — restrict what the LLM can generate
policy:
allowed_step_types: # Whitelist of step types
- llm.call
- mcp.call
- mcp.list
- template.render
- set
- emit
- sequence
denied_step_types: # Blacklist (takes precedence)
- workflow.plan # Prevent recursive planning
allow_remote_workflow_refs: false
# Limits
limits:
max_steps_total: 20 # Maximum number of steps in the generated workflow
# Validation
validate:
compile: true # Parse + compile the generated YAML (default: true)
dry_run: true # Optional: execute once with fake providers before accepting
# Self-correction on failure
on_invalid:
action: reprompt # "reprompt" (re-send error to LLM) | "fail"
max_attempts: 3 # Number of attempts before giving up
Auto and basic modes
mode: auto is the default. It performs one classifier LLM call before generation and returns the classifier result under meta.mode_selection. The classifier estimates complexity by counting meaningful branches such as conditions, switch/case paths, loops, retries, error handling, cleanup paths, validation branches, tool-orchestration choices, and state transitions.
Use mode: basic to skip classification and run the original single workflow-generation path directly. Use mode: pipeline to force decomposition.
Repair mode
Use mode: repair to repair an existing persisted workflow. The LLM receives the current YAML plus a user repair instruction and/or structured runtime error details, then returns a full replacement YAML document. The prompt asks for the smallest patch-style change and the result still goes through parse, policy, limits, compile, semantic validation, MCP discovery coverage, and optional dry-run validation.
- id: repair_plan
type: workflow.plan
input:
mode: repair
generator:
model: gpt-4o
reasoning: medium
prefilter: true
repair:
existing_yaml: "${data.inputs.workflow_yaml}"
prompt: "Fix the final output mapping without changing public inputs."
failed_input: "${data.inputs.failed_prompt}"
error:
code: MCP_CALL_ERROR
type: mcp.call
message: "Tool request used the wrong field name."
details:
tool: issue_get
validate:
compile: true
dry_run: true
on_invalid:
action: reprompt
max_attempts: 3
repair.existing_yaml is required, and at least one of repair.prompt or repair.error.message must be present. If repair.error is provided, repair.error.message is required. In repair mode, on_invalid.max_attempts bounds validation repair retries for invalid replacement YAML.
Pipeline mode
Pipeline mode normalizes the user prompt, asks the LLM to mark extractable :::subworkflow leaf blocks, generates each leaf as an independently valid workflow, then asks for a compact parent orchestration graph:
document:
name: generated-pipeline-workflow
graph:
inputs:
query: string
steps:
- id: call_collect_data
leaf: collect_data
args:
query: ${data.inputs.query}
outputs:
collect_data_outputs: ${data.steps.call_collect_data.outputs}
The runtime renders graph leaf nodes into local workflow.call steps, grafts the validated leaf workflows, moves leaf document-level functions: into that leaf workflow scope, checks required leaf arguments, and validates the final YAML. If extractable-block annotation fails validation, workflow.plan reprompts with the invalid annotated Markdown and exact validation errors.
When engine.llm_capabilities is configured and reports that the selected provider/model supports structured output, pipeline extraction uses strict structured output for the extractable-block phase and rejects markdown-only extraction. Pipeline output includes pipeline.specs, pipeline.quality_report, and pipeline.inspection with leaf contracts, planned MCP tools, main graph inspection, and validation metadata.
Pipeline mode is intentionally stricter than older Python releases: main assembly may orchestrate, branch, loop, derive deterministic values, and call generated leaves, but external work, LLM calls, raw MCP calls, human input, templates, and nested planning must stay inside leaf workflows. External-work leaves with required planned MCP tools must emit matching mcp.call steps.
Output: { workflow: { dsl, name, workflows: [...] }, yaml: "...", meta: { model, attempt?, mode, mode_selection?, repair? } }
Features:
- Automatic MCP discovery: Connects to all configured MCP servers, lists their tools/prompts, and injects them into the planning prompt so the LLM knows what's available.
- MCP pre-filter: Uses a lightweight LLM call to select only the MCP servers/tools relevant to the task instruction — reduces prompt size and cost.
- Full DSL reference injection: The LLM receives the complete DSL documentation (step types, expressions, error handling) so it can generate valid workflows.
- Policy enforcement: Generated workflows are validated against allowed/denied step types and max step limits.
- Full validation before acceptance:
workflow.planruns the validator, compiler, and semantic checks before returning a plan. This catches non-fatal validator diagnostics such as unknown step types, invalid container shapes, future step references, conditional branch/loop mapping errors, and invaliddata.steps.<id>.response.<field>mappings. - Structured repair diagnostics: Validation and
dry_runfailures include machine-readabledetails["diagnostics"]entries with stable codes, locations, hints, expected shapes, allowed paths when available, andllm_guidancefor reprompt repair. - Optional dry-run validation: Set
validate.dry_run: trueto execute the generated workflow once with deterministic fake LLM, MCP, human-input, and routing providers. This catches runtime input-resolution errors such as free-formllm.call.textbeing used where a number is required. The dry-run never calls real LLMs or MCP tools. - MCP output contracts: MCP discovery injects complete
input_schema,output_schema, andexample_responsemetadata into the planning prompt.output_schema/example_responsedefine which fields may be read frommcp.callsingle-toolresponseobjects. - MCP request normalization: During
workflow.planvalidation, staticmcp.call.input.requestvalues are normalized against discoveredinput_schemacontracts. Numeric, integer, and boolean YAML strings are converted to typed JSON values when the schema allows it, including nested objects, arrays, additional properties, and matchingoneOf/anyOfobject variants. - Nullable MCP request guardrails: Required MCP request fields reject nullable structured-output expressions such as
string|nullunless the exact value is first refined withassert.non_nullor guarded on the same call. - Self-correction: If the generated YAML is invalid (parse error, policy violation, compilation error, or semantic mapping error), the error is sent back to the LLM for automatic correction.
- OpenTelemetry tracing: Full GenAI convention traces for the planning LLM call, MCP discovery, and pre-filter phases.
Semantic mapping guardrails: generated plans must not read data.steps.<id>.* from steps produced only inside a switch case, an if-guarded step, or a loop body unless that value is first mapped into a guaranteed location. Function arguments are evaluated eagerly, so coalesce(data.steps.fix.value, data.steps.question.value) is still unsafe when either step may not have executed. Prefer a common workflow-level output alias in every branch, or a guaranteed normalization step with a stable output schema.
workflow.execute — Execute a Planned Workflow
Executes a workflow that was dynamically generated by workflow.plan.
- id: plan
type: workflow.plan
input:
generator:
model: gpt-4o
instruction: "${data.inputs.task}"
- id: execute
type: workflow.execute
input:
from_step: plan # References the workflow.plan step that produced the YAML
The plan + execute pattern is the foundation of agentic workflows: the user describes a goal in natural language, the LLM plans the steps, and the engine executes them.
Typed Inputs
Workflow inputs support rich type declarations with validation at runtime.
Supported types: string, number, boolean, array, object, dictionary, any
workflows:
main:
inputs:
# Simple scalar
name:
type: string
required: true
description: The user's name
# With default value
mode:
type: string
required: false
default: standard
# Array with typed items
tags:
type: array
items: { type: string }
required: false
default: []
# Nested object
config:
type: object
properties:
timeout: { type: number }
retries: { type: number }
required: false
# Dictionary (string keys, typed values)
headers:
type: dictionary
additionalProperties: { type: string }
Typed Outputs
Workflow outputs support type annotations and descriptions. This enables:
- Self-documenting workflow contracts
- Automatic JSON Schema generation (for MCP tool exposure)
- Nested type descriptors for arrays, objects, and dictionaries
Short form (expression only)
outputs:
result: "${data.steps.step1.text}"
Long form (with type and description)
outputs:
summary:
expr: "${data.steps.llm_summary.text}"
type: string
description: LLM-generated summary text
items_processed:
expr: "${data.steps.process.count}"
type: number
description: Number of items processed
success:
expr: "${data.steps.result.ok}"
type: boolean
description: Whether the workflow succeeded
Complex types
outputs:
# Array of strings
tags:
expr: "${data.steps.extract.tags}"
type: array
items: { type: string }
description: Extracted tags
# Typed object
report:
expr: "${data.steps.build.report}"
type: object
properties:
title: { type: string }
score: { type: number }
description: Structured report
# Dictionary
metrics:
expr: "${data.steps.collect.metrics}"
type: dictionary
additionalProperties: { type: number }
description: Named metrics map
JSON Schema generation
OutputDef types are convertible to JSON Schema via JsonSchemaConverter.OutputsToJsonSchema(outputs), used for MCP tool exposure and API documentation.
Expressions ${...}
Expressions are embedded in strings using ${...} syntax. They are JavaScript-style expressions evaluated by the in-tree JS-subset interpreter in gnougo_flow_core._jsmini.
Data access
data.inputs.*— workflow input parametersdata.steps.<step_id>.*— output of a previously executed stepdata.env.*— environment variables- Optional chaining:
data.steps.maybe_skipped?.value
Operators
&& || ! == != < <= > >= + - * / % ??
Built-in functions
| Function | Description |
|---|---|
exists(val) |
true if val is non-null |
coalesce(a, b, ...) |
Returns first non-null argument |
len(val) |
Length of string or array (0 for null) |
length(val) |
Alias for len(val) |
lower(s) |
Lowercase string |
upper(s) |
Uppercase string |
trim(s) |
Trims whitespace |
contains(s, sub) |
true if string s contains sub |
startsWith(s, prefix) |
true if s starts with prefix |
endsWith(s, suffix) |
true if s ends with suffix |
replace(s, old, new) |
Replaces all occurrences |
substring(s, start) |
Characters from position start to end |
substring(s, start, len) |
len characters starting at start |
toNumber(val) |
Converts to number |
json(val) |
Serializes value to JSON string |
pick(obj, ...keys) |
Returns a new object containing only the requested keys; keys may be separate arguments or an array |
omit(obj, ...keys) |
Returns a new object with the requested keys removed; keys may be separate arguments or an array |
fromJson(s) |
Parses a JSON string into a node |
now() |
Returns the current local date/time as an ISO-8601 string |
base64(val) |
Encodes the UTF-8 string value as Base64 |
formatDate(dateStr, fmt) |
Formats a date string (default: yyyy-MM-dd) |
JavaScript-style expression support
- Ternary:
${data.inputs.mode == "fast" ? 0.0 : 0.7} - Template literals:
${`Hello ${data.inputs.name}`} - Array methods:
${data.inputs.items.filter(i => i.active).length}
Runtime limits
Expression evaluation is sandboxed through ExecutionLimits:
| Property | Default | Description |
|---|---|---|
max_expression_ast_nodes |
500 |
Parser/validator complexity limit. |
max_expression_statements |
100000 |
JS-subset interpreter statement budget. |
expression_timeout_seconds |
15 |
Evaluation timeout. |
expression_memory_limit_bytes |
50000000 |
Parity configuration value; the Python in-tree interpreter currently enforces node/statement/time/call-depth limits. |
Increase these limits only for trusted workflows; prefer simplifying expressions or moving complex logic to WFScript functions.
WFScript — Custom JavaScript Functions
Define reusable functions in the functions: block (document-level or workflow-level).
When workflow.plan generates custom functions, each generated function must be immediately preceded by JSDoc with typed @param entries for every parameter and a typed @returns entry for the output:
version: 1
name: smart-triage
functions: |
/**
* Classifies a message by urgency and issue type.
*
* @param {string} text - Message text to classify.
* @returns {string} Routing label: "critical", "bug", or "general".
*/
function classify(text) {
if (contains(lower(text), "urgent")) return "critical";
if (contains(lower(text), "bug")) return "bug";
return "general";
}
/**
* Truncates text to a maximum visible length.
*
* @param {string} text - Text to truncate.
* @param {number} maxLen - Maximum number of characters.
* @returns {string} Original or truncated text.
*/
function truncate(text, maxLen) {
if (len(text) <= maxLen) return text;
return text.substring(0, maxLen) + "...";
}
workflows:
main:
inputs:
message: { type: string, required: true }
steps:
- id: route
type: switch
input:
expr: "${functions.classify(data.inputs.message)}"
cases:
- value: critical
steps:
- id: escalate
type: human.input
input:
mode: text
prompt: "URGENT: ${functions.truncate(data.inputs.message, 100)}"
- value: bug
steps:
- id: triage_bug
type: llm.call
input:
model: gpt-4o-mini
prompt: "Triage this bug report: ${data.inputs.message}"
Error Handling
Retry
Automatically retries a step on transient (retryable) errors:
retry:
max: 3 # Maximum attempts
backoff_ms: 1000 # Initial delay between retries
backoff_mult: 2.0 # Multiplier for exponential backoff
jitter_ms: 100 # Random jitter added to each delay
on_error
Evaluated after retries are exhausted (or immediately for non-retryable errors):
on_error:
cases:
- if: "${error.code == \"LLM_TIMEOUT\" || error.code == \"LLM_NETWORK\"}"
action: continue
set_output:
text: "Temporary LLM issue — using fallback"
- if: "${error.code == \"INPUT_VALIDATION\"}"
action: stop # Stop the workflow immediately
- action: stop # Default: stop on unknown errors
Error context variables: error.code, error.message, error.retryable, step.id, step.type
Actions: continue (skip the step, optionally set a fallback output) | stop (abort the workflow)
Common error codes
| Code | Retryable | Description |
|---|---|---|
INPUT_VALIDATION |
No | Missing or malformed input |
LLM_TIMEOUT |
Yes | LLM request timed out |
LLM_NETWORK |
Yes | Network error reaching the LLM |
MCP_CONNECTION_ERROR |
Yes | Cannot connect to MCP server |
MCP_TOOL_ERROR |
No | MCP tool returned an error |
TEMPLATE_PLAN |
No | workflow.plan failed to generate valid YAML |
TEMPLATE_POLICY |
No | Generated workflow violates policy constraints |
HUMAN_INPUT_TIMEOUT |
No | User didn't respond within timeout_ms |
NO_HITL_PROVIDER |
No | No human input provider configured |
Full example — resilient LLM call with fallback
- id: summarize
type: llm.call
input:
model: gpt-4o-mini
prompt: "Summarize: ${json(data.inputs)}"
retry:
max: 3
backoff_ms: 1000
backoff_mult: 2
jitter_ms: 100
on_error:
cases:
- if: "${error.code == \"LLM_TIMEOUT\" || error.code == \"LLM_NETWORK\"}"
action: continue
set_output:
text: "Summary temporarily unavailable."
- action: stop
Model Metadata Catalog
The Python runtime includes a model metadata catalog aligned with the .NET implementation. It centralizes:
- token limits:
context_window_tokens,max_input_tokens,max_output_tokens - pricing:
input_per_1m_tokens,output_per_1m_tokens - capabilities: temperature, reasoning effort, structured output, tools, JSON mode, vision, embeddings
- aliases and user-provided extensions
When the package is used inside the GnOuGo mono-repo, the Python runtime automatically reads the shared builtin catalog from src/GnOuGo.AI.Core/Telemetry/model-metadata.json. This keeps the Python and .NET providers aligned on provider-specific limits, pricing, and capabilities.
WorkflowEngine.sanitize_llm_request() removes unsupported optional request fields before calling the configured LLM client. This prevents provider crashes such as sending temperature to reasoning models that reject it.
Pricing uses the same metadata resolver. try_get_pricing() and estimate_cost() read builtin pricing by default and can also use LLMOptions.model_metadata_files / LLMOptions.model_overrides when passed explicitly.
from gnougo_flow_core import WorkflowEngine, LLMOptions, LLMModelMetadata, ModelCapabilityMetadata
engine = WorkflowEngine()
engine.llm_options = LLMOptions(
model_metadata_files=["config/my-models.json"],
model_overrides={
"my-local-model:latest": LLMModelMetadata(
provider_type="ollama",
context_window_tokens=32768,
max_output_tokens=8192,
capabilities=ModelCapabilityMetadata(
supports_temperature=True,
supports_reasoning_effort=False,
supports_structured_output=False,
supports_tools=False,
),
)
},
)
External metadata files can also use .NET-style camelCase field names and provider-qualified keys such as openai/gpt-4o or copilot/gpt-4o when the same model id exists on multiple providers:
{
"models": {
"openai/model-id": {
"providerType": "openai",
"contextWindowTokens": 128000,
"maxOutputTokens": 16384,
"pricing": { "inputPer1MTokens": 0.15, "outputPer1MTokens": 0.60 },
"capabilities": {
"supportsTemperature": true,
"supportsReasoningEffort": false,
"supportsStructuredOutput": true,
"supportsTools": true
}
}
},
"aliases": { "short-name": "openai/model-id" }
}
Metadata precedence is:
builtin catalog < model_metadata_files < model_overrides < heuristics for missing fields
CLI
The published package exposes the gnougo-flow command.
# Validate a workflow (check syntax, types, compilation)
gnougo-flow validate examples/triage.yaml
# Inspect the structure (workflows, steps, inputs, outputs)
gnougo-flow inspect examples/triage.yaml
# Execute with key=value inputs
gnougo-flow run examples/triage.yaml -i message=hello -i priority=normal
# Execute with full JSON input
gnougo-flow run examples/triage.yaml -j '{"message":"hello","priority":"normal"}'
# Execute with full JSON input loaded from a file
gnougo-flow run examples/triage.yaml -j @inputs.json
When running directly from the repository with uv, prefix commands with uv run:
uv run gnougo-flow validate examples/triage.yaml
uv run gnougo-flow inspect examples/triage.yaml
uv run gnougo-flow run examples/triage.yaml -i message=hello
Python Runtime Notes
The Python package is not a NativeAOT binary; it is a Python 3.10+ library and CLI. It still follows the same design goals as GnOuGo.Flow.Core:
- YAML parsing uses PyYAML and typed Python models.
- JSON-like workflow data stays in Python dictionaries/lists/scalars.
- Templating is implemented in-tree with a minimal Mustache-compatible renderer.
- Expression interpolation and WFScript use
gnougo_flow_core._jsmini, an in-tree JavaScript-subset interpreter with execution limits. - Runtime services are injected through protocols instead of concrete infrastructure dependencies.
- MCP helpers live in
gnougo_flow_core.integrations:InMemoryMcpClientFactoryandMockMcpServerConfigfor tests and demos.ConfiguredMcpClientFactoryandMcpSessionAdapterfor injected MCP sessions.RoutingLLMClientAdapterfor adapting a routing LLM client.
WorkflowEngine.mcp_cachedefaults toMcpCacheHelper, a 1-hour sliding TTL cache for MCP tools/resources/prompts per server. Set it toNoneto disable capability caching.WorkflowEngine.resume_async,WorkflowCheckpointer, andlimits.run_idsupport resumable workflow execution. Development commands:
uv sync --extra dev
uv run --extra dev pytest
uv run --extra dev ruff check .
python -m pip install --upgrade build
python -m build
The release pipeline injects the generated repository version into pyproject.toml before building and publishing the package to PyPI.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gnougo_flow_core-0.11.2.tar.gz.
File metadata
- Download URL: gnougo_flow_core-0.11.2.tar.gz
- Upload date:
- Size: 281.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0cd7056136b435132936b255fb419fc753169f459b2c16b65681ea431b3a6f7d
|
|
| MD5 |
a4966455c849d7359a21b122a42ba349
|
|
| BLAKE2b-256 |
52338f079cf4f0f75ceb017b7df9b5e57f1aa764b4569e34b335ee6a32659dfd
|
File details
Details for the file gnougo_flow_core-0.11.2-py3-none-any.whl.
File metadata
- Download URL: gnougo_flow_core-0.11.2-py3-none-any.whl
- Upload date:
- Size: 217.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a4a59f89eb93be85ccc09afa3a6cb416c2100dd84ee5e1f3a7dcab28f49b9ceb
|
|
| MD5 |
4a8f7d0c753665645b8128cb9c0d8277
|
|
| BLAKE2b-256 |
45670f869cad8b3a606fe932602e03992463486f27ffa45ceaef8d7a80d09e90
|