YAML-driven AI workflow engine powered by LangGraph — build conversational multi-step AI apps without boilerplate
Project description
FlowGraph-AI
FlowGraph-AI is a YAML-driven AI workflow engine for building conversational, multi-step AI applications. Define your entire workflow in a simple YAML file — the engine handles LLM calls, user input collection, validation, routing, and state persistence automatically.
Built on top of LangGraph with native interrupt/resume support, SQLite/PostgreSQL persistence, and a built-in FastAPI server.
Table of Contents
- Why FlowGraph-AI?
- Features
- Quick Start
- Installation
- Core Concepts — 3 Node Types
- Smart Features
- Kanban Board
- Task Tracking & Correlation IDs
- YAML Workflow Reference
- Routing Reference
- State Reference
- Custom Actions
- Custom Validators
- CLI Reference
- REST API Reference
- Python Builder API
- Configuration Reference
- LLM Provider Setup
- Checkpointing & Persistence
- Observability
- Visual Workflow Builder
- Project Structure
- Developer & Agent Reference
- Testing
- Architecture
- Changelog
- Roadmap
- License
Why FlowGraph-AI?
Most AI frameworks make you write hundreds of lines of code to build a simple conversational flow. FlowGraph-AI flips this — you write a YAML file, the engine does the rest.
| Normal chatbot | FlowGraph-AI |
|---|---|
| Free-form conversation, no structure | YAML-defined flow with clear start → end |
| Developer writes all conversation logic | Define fields + routing in YAML |
| Hard to maintain, one giant prompt | Modular nodes, each does one thing |
| Ask questions one at a time | Extract 3 fields from one natural message |
| Safety logic must be custom-built | Built-in guardrails on every output |
| No trace / observability | Langfuse + Jaeger built in |
| User must re-answer if they said it already | Pre-extraction: bot never re-asks what it knows |
workflow: support_ticket
description: "Create a support ticket"
state:
name: str
email: str
issue: str
start_node: collect_info
nodes:
collect_info:
type: data_collector
initial_message: "I'll get that ticket created. What's your name, email, and issue?"
fields:
- field: name
description: "Customer full name"
- field: email
description: "Email address"
validation:
method: regex
pattern: "^[\\w.-]+@[\\w.-]+\\.\\w+$"
- field: issue
description: "Brief description of the issue"
max_retry: 3
next: submit
on_max_retry: error_response
submit:
type: custom_action
method: "actions.tickets.create"
routes:
- value: "created"
next: success
- default: error_response
success:
type: response
template: "Ticket created! We'll email {state.email} shortly."
error_response:
type: response
error_field: error
That is the entire application. Run with flowgraph run support_ticket or flowgraph serve.
Features
- 3-node architecture —
data_collector,custom_action,response— simple enough to reason about, powerful enough for production - Pre-extraction — if the user's first message already contains a field value, the bot extracts it immediately and never re-asks
- Tools in DataCollector — LLM can call Python tools (DB lookups, APIs, system info) during data collection
- Follow-up handling — after a workflow ends, users can keep asking questions; the bot answers in context
- Hallucination detection — two-stage grounding check with auto-correction, configurable per node via
hallucination_check: - LLM parameter customization — per-node
llm_params:overridestemperature,max_tokens,top_p,streaming,timeout, andmodel args_from_state:for custom actions — clean kwarg-based function calling; maps state fields directly to function parameters- Real-time Kanban board — React app (
frontend/) showing all workflow executions across 8 status columns with live SSE updates - Task tracking — SQLite-backed task store with pub/sub and correlation IDs linking API, Kanban, Jaeger, and Langfuse
- YAML-first — define complete multi-step AI workflows without boilerplate
- Python builder API — fluent code alternative to YAML
- LangGraph-powered — native interrupt/resume, stateful execution, checkpointer persistence
- SQLite + PostgreSQL — conversations persist across restarts
- Multi-language — auto-detects user language, responds in kind
- Intent classification — automatically routes users to the right workflow
- Built-in validation — regex, length, numeric, one_of, not_empty + custom Python validators
- Guardrails — LLM safety post-processing on every response, configurable at 3 levels
- 5 LLM providers — Claude, OpenAI, OpenRouter, Ollama, LM Studio; per-node overrides
- FastAPI server — production-ready REST API out of the box
- Developer CLI —
flowgraph init --provider <name>to scaffold, plus validate, run, serve — all fromflowgraph - Observability — Langfuse (LLM tracing) + Jaeger (distributed tracing) with one-command setup
- Visual builder — drag-and-drop workflow editor (React app in
workflow-builder/)
Quick Start
Option 1 — Scaffold a new project (recommended)
pip install flowgraph-ai
# Scaffold a ready-to-run project
flowgraph init my-chatbot --provider ollama
cd my-chatbot
# Start Ollama and pull a model
ollama pull llama3.1:8b
# Chat with the example order tracking workflow
flowgraph run order_tracking
Try saying: "track my order ORD-001 from yesterday" — both fields extracted at once, no follow-up questions.
Option 2 — From source
git clone https://github.com/simpletoolsindia/flowgraphai
cd flowgraph-ai
uv sync
# Set your LLM
flowgraph llm set claude # or ollama, openai, etc.
# Run a workflow
flowgraph run order_tracking
Option 3 — API server
flowgraph serve
# API at http://localhost:8000 — see /docs for Swagger UI
curl -X POST http://localhost:8000/workflow/start \
-H "Content-Type: application/json" \
-d '{"message": "track my order ORD-001 from yesterday"}'
Installation
With pip
pip install flowgraph-ai
With uv (recommended)
uv add flowgraph-ai
With UI extras (Streamlit)
pip install "flowgraph-ai[ui]"
From source
git clone https://github.com/simpletoolsindia/flowgraphai
cd flowgraph-ai
uv sync
Core Concepts — 3 Node Types
FlowGraph-AI has exactly 3 node types. That is intentional.
data_collector — Collect information from the user
Asks questions, waits for replies, validates, extracts, and routes.
Single-field:
collect_sr:
type: data_collector
role: "You are a helpful support agent."
initial_message: "Please provide your SR number (format: SR-XXXXX)."
field: sr_number
description: "SR number in format SR-XXXXX"
max_retry: 3
next: process
on_max_retry: error_response
allow_intent_escape: true
validation:
method: "validations.sr.check_sr_format"
Multi-field (collects multiple fields in a single conversation turn):
collect_order:
type: data_collector
role: "You are a helpful order support agent."
initial_message: "Please share your order ID and order date."
max_retry: 3
next: lookup_order
on_max_retry: error_response
allow_intent_escape: true
fields:
- field: order_id
description: "Order ID (e.g. ORD-12345)"
- field: order_date
description: "Order date or approximate date (e.g. yesterday, 10th Dec)"
With tools (LLM calls Python functions for external data):
collect_system_info:
type: data_collector
field: cpu_temp
description: "Current CPU temperature in Celsius"
tools:
- method: "actions.system_tools.get_cpu_temp"
name: "get_cpu_temp"
description: "Get current CPU temperature. Call when user asks about CPU or system stats."
next: respond
All data_collector options:
| Option | Default | Description |
|---|---|---|
field |
— | Single field to collect |
fields |
— | List of fields (multi-field mode) |
description |
field name | Tells LLM what to extract |
initial_message |
LLM-generated | First question shown to user |
role |
"You are a helpful assistant." | System role for LLM extraction |
max_retry |
3 |
Max attempts before routing to on_max_retry |
next |
"END" |
Next node on success |
on_max_retry |
"END" |
Node if max retries exceeded |
allow_intent_escape |
false |
Answer off-topic questions inline before re-asking |
humanize |
false |
Run bot messages through humanizer LLM |
validation |
— | Validator config (see Custom Validators) |
tools |
[] |
LangChain tools LLM can call during extraction |
guardrails |
global | Per-node guardrails override |
llm_provider |
global | Per-node LLM provider override |
custom_action — Run your Python code
Executes any Python function. The function receives the full state dict, may mutate it, and returns the next node name.
check_account:
type: custom_action
method: "actions.billing.check_status"
result_field: billing_result
routes:
- value: "active"
next: success_response
- value: "suspended"
next: suspended_response
- default: error_response
# actions/billing.py
def check_status(state: dict) -> str:
account_id = state["account_id"]
result = billing_api.lookup(account_id)
state["billing_result"] = result.status
state["output"] = f"Account {account_id}: {result.status}"
return result.status # matched against routes[]
args_from_state — Function keyword arguments (New in v0.2.0)
Clean, kwarg-based function calling. Maps state fields directly to function parameters. The function return value is stored in result_field.
lookup_order:
type: custom_action
method: "actions.order_tools.get_order_status"
args_from_state:
order_id: order_id # function_kwarg: state_field
result_field: order_status # stores return value
next: success_response # fixed routing
on_error: error_response # routing on exception
# actions/order_tools.py
def get_order_status(order_id: str) -> str:
"""Explicit args, no state dict needed."""
status = db.query(order_id)
return status # Stored in state["order_status"]
response — Send the final reply
The terminal node. Applies guardrails automatically. Supports templates and field references.
success_response:
type: response
template: "Done! Your request {state.request_id} has been submitted."
# Or use fields set by custom_action
final:
type: response
output_field: output # state["output"] → shown to user
error_field: error # state["error"] → shown if set
Smart Features
Pre-Extraction — No Repeat Questions
Before asking the user anything, the data_collector node checks if the user's opening message already contains the needed field values. If found, extraction happens immediately — no interrupt() is triggered.
Example:
- User sends:
"I want to track my order ORD-12345 placed yesterday" - Node needs:
order_id+order_date - Result: Both fields extracted from the first message — bot never asks either question
- If only
order_idis found: bot asks only fororder_date
This works for both single-field and multi-field nodes. Validation runs on pre-extracted values; if validation fails, the node falls back to asking normally.
Tools in DataCollector
When a field value requires external data (live system info, DB lookups, API calls), define tools: on the node. The LLM decides when to call them based on the user's message.
collect_info:
type: data_collector
field: server_status
description: "Current server status (online/offline/degraded)"
tools:
- method: "actions.ops_tools.check_server_status"
name: "check_server_status"
description: "Check if a server is online. Provide server_name as argument."
initial_message: "Which server would you like to check?"
next: respond
# actions/ops_tools.py
def check_server_status(server_name: str) -> str:
"""Check server health via API."""
result = ops_api.ping(server_name)
return result.status # e.g. "online", "offline", "degraded"
Tool call flow:
- User says something that needs external data
- LLM calls the tool (up to 4 rounds of tool calls supported)
- Tool result is injected into the extraction context
- LLM uses the tool result to populate the field
Tools also work in off-topic answers — if allow_intent_escape: true and the user's off-topic question needs live data, the same tools are available.
Follow-Up After Workflow Ends
After a workflow completes, users can continue asking related questions. The bot answers in the context of the completed conversation.
API behavior: POST /message on a completed session_id returns status: "follow_up" instead of "already completed".
CLI behavior: User can keep typing; the bot answers in context. Only starts a new workflow if the topic is clearly different.
Example:
Bot: Your order ORD-12345 is processing. Delivery expected Dec 15.
[workflow completes]
User: Why is it taking so long?
Bot: I understand your concern. Your order was placed yesterday and is currently
in the processing stage — this typically takes 1-2 business days. Would you
like me to raise a priority query?
User: Yes please
Bot: I've flagged your order for review. You'll receive an update within 24 hours.
Guardrails
Every response node output passes through an LLM safety review before being shown to the user. Configurable at 3 levels (highest priority wins):
node-level guardrails > workflow-level guardrails > global config.yaml
# Global (config.yaml)
guardrails:
enabled: true
tone: "warm and professional"
# Workflow-level (top of workflow YAML)
guardrails:
tone: "casual and friendly"
custom_rules:
- "Never mention competitor products"
- "Always offer to escalate to human support"
# Node-level (inside any response or data_collector node)
my_response:
type: response
guardrails:
enabled: false # Disable for this node only
# OR:
custom_prompt: | # Fully replaces all guardrail logic
You are a strict reviewer. Return only the corrected response.
Intent Escape
When allow_intent_escape: true, users can ask off-topic questions during data collection. The bot answers inline, then re-asks the original question.
collect_order:
type: data_collector
allow_intent_escape: true
field: order_id
# ...
Example:
Bot: Please provide your order ID.
User: Do you ship to Canada?
Bot: Yes, we ship to Canada! Standard delivery is 5-7 business days.
Now, could you please provide your order ID?
Language Detection
FlowGraph-AI auto-detects the user's language from their first message and injects language instructions into all subsequent LLM prompts. The bot responds in the user's language without any configuration.
Supported: any language the underlying LLM supports (Claude, GPT-4o, Llama 3.1, etc.)
Hallucination Detection
Every data collector node can check extracted values for hallucinations before accepting them.
collect_order_info:
type: data_collector
hallucination_check: true # default: true
fields:
- field: order_id
description: "Order ID in format ORD-XXXXX"
The check runs in two stages:
- Deterministic — substring match verifies the extracted value actually appeared in the user's message.
- LLM fact-check — if no substring match, an LLM grounding call confirms the value is supported by the conversation context.
Example:
- User says: "My order is ORD-12345"
- LLM extracts:
order_id: "ORD-99999"(hallucination) - Engine:
- Sees "ORD-99999" is NOT in "My order is ORD-12345".
- Runs
check_groundingLLM call. - LLM confirms "ORD-99999" is not supported.
- Result: Re-extracts or re-asks the user.
When hallucination is detected:
- Auto-correction using LLM-suggested value (if available)
- Strict re-extraction with deterministic prompt
- Retry question to user if both stages fail
Disable per node with hallucination_check: false.
LLM Parameter Customization
Customize LLM behaviour globally in config.yaml or per-node in workflow YAML.
Global (config.yaml):
llm:
provider: "claude"
temperature: 0.7
max_tokens: 2048
top_p: 1.0
streaming: false
timeout: 30
Per-node (workflow YAML):
collect_order_info:
type: data_collector
llm_params:
temperature: 0.1 # precise extraction
max_tokens: 512
model: "gpt-4o" # override model for this node only
Any llm_params: key overrides the global config for that node only. All other nodes continue to use global settings.
Kanban Board
Track agent execution in real time with the built-in React Kanban board.
Setup
cd frontend
npm install
npm run dev # Opens on http://localhost:5173 (usually)
Note: Ensure the backend API is running (flowgraph serve) on port 8000.
Usage
The board connects to the backend API (http://localhost:8000) and shows all workflow executions across 8 status columns: Backlog, Planned, In Progress, Waiting For Input, Tool Running, Retry/Recovery, Completed, Failed.
Features:
- Live SSE updates (no polling)
- Click any card to see the full activity timeline
- Correlation ID + Trace ID linking to Jaeger/Langfuse
- Filter by task title, workflow name, or task ID
Every workflow start automatically creates a task card. The card advances through columns as the workflow progresses. The SSE endpoint is GET /tasks/stream/events.
Task Tracking & Correlation IDs
Every workflow execution automatically creates a task card backed by SQLite. Tasks are linked across all systems via a correlation_id.
API Response
└── correlation_id ─────┬──→ Kanban board card
├──→ Jaeger trace root span
└──→ Langfuse trace session
API usage
Every POST /workflow/start and POST /message response includes:
{
"session_id": "order_tracking:abc123",
"status": "waiting_input",
"task_id": "A1B2C3D4",
"correlation_id": "3f8a9c12e4b07d56a1f2e3d4c5b6a7f8"
}
Use task_id to track execution:
# Get task + full event timeline
curl http://localhost:8000/tasks/A1B2C3D4
# List all tasks (filter by status)
curl "http://localhost:8000/tasks?status=in_progress"
# Live SSE stream (used by Kanban board)
curl -N http://localhost:8000/tasks/stream/events
Task lifecycle
Tasks move through these statuses automatically:
planned → in_progress → waiting_input ↔ in_progress → completed or failed
When hallucination is detected: retry_recovery → back to in_progress
When a tool runs: tool_running → back to in_progress
Configure persistence
# config.yaml
tracking:
db_path: "./tasks.db" # SQLite file — omit for in-memory only
YAML Workflow Reference
# ── Required ──────────────────────────────────────────────────────────────────
workflow: my_workflow # Must match the .yaml filename
description: "..." # Used by intent classifier — describe what this handles
# ── Optional ──────────────────────────────────────────────────────────────────
initial_input_field: user_message # Which state field gets the user's opening message
# Workflow-level guardrails (override global config.yaml)
guardrails:
enabled: true
tone: "casual and friendly"
# Entry point
start_node: first_node
# ── Custom state fields (auto-injected fields do not need to be listed) ───────
state:
order_id: str
order_date: str
order_status: str
count: int
items: list
metadata: dict
# ── Nodes ─────────────────────────────────────────────────────────────────────
nodes:
# data_collector — single field
collect_order_id:
type: data_collector
role: "You are a helpful support agent."
initial_message: "Please provide your order ID."
field: order_id
description: "Order ID (e.g. ORD-12345)"
max_retry: 3
next: process
on_max_retry: error_response
allow_intent_escape: true
humanize: false
validation:
method: regex
pattern: "^ORD-\\d{5}$"
tools:
- method: "actions.my_module.my_tool"
name: "my_tool"
description: "When and why to call this tool"
guardrails:
tone: "very formal"
# data_collector — multi-field
collect_contact:
type: data_collector
role: "You are an HR assistant."
initial_message: "Please provide your name and email."
max_retry: 3
next: submit
on_max_retry: error_response
fields:
- field: customer_name
description: "Full name"
- field: customer_email
description: "Email address"
validation:
method: regex
pattern: "^[\\w.-]+@[\\w.-]+\\.\\w+$"
- field: preferred_date
description: "Preferred contact date"
default: "any day" # Optional: used when not provided
# custom_action
process:
type: custom_action
method: "actions.my_actions.process_request"
result_field: action_result
next: success # Unconditional (if no routes)
routes:
- value: "success"
next: success_response
- value: "error"
next: error_response
- default: error_response # Fallback for any other return value
conditional_edges: # Alternative to routes
field: action_result
paths:
"ok": success_response
"fail": error_response
# response
success_response:
type: response
template: "Done! Your request {state.order_id} has been submitted."
output_field: output # Read from state["output"] if set
error_field: error # Read from state["error"] if set (shown instead)
guardrails:
enabled: false
error_response:
type: response
error_field: error
output_field: output
Routing Reference
| Pattern | Where | YAML |
|---|---|---|
| Unconditional | data_collector, custom_action |
next: node_name |
| On failure | data_collector |
on_max_retry: node_name |
| Routes list | custom_action |
routes: [{value: "x", next: "y"}, {default: "z"}] |
| Conditional | custom_action |
conditional_edges: {field: "f", paths: {v1: n1, v2: n2}} |
| Direct return | custom_action |
Return value IS the next node name |
State Reference
Auto-injected fields (always available, no need to declare)
| Field | Type | Description |
|---|---|---|
user_message |
str | User's opening message |
user_language |
str | Auto-detected language |
output |
str | Final output shown to user |
error |
str | Error message (shown instead of output if set) |
messages |
list | Full conversation history |
_dc_next_node |
str | Internal routing signal |
_dc_failed |
bool | Internal: max retry exceeded |
_dc_node |
str | Internal: which node hit max retry |
Template syntax
template: "Your order {state.order_id} placed on {state.order_date} is ready."
All variants work:
{state.field}— primary (V3){{state.field}}— also supported{field}— backward compat
Custom Actions
# actions/my_actions.py
def process_request(state: dict) -> str:
"""
Receives full workflow state.
Mutate state to set output/error values.
Return a string matched against routes[] in YAML.
"""
sr = state["sr_number"]
email = state.get("email", "")
try:
result = my_api.submit(sr, email)
state["output"] = f"Request {sr} submitted. Ref: {result.ref_id}"
return "success"
except MyAPIError as e:
state["error"] = f"Failed: {e.message}"
return "error"
submit:
type: custom_action
method: "actions.my_actions.process_request"
result_field: action_result
routes:
- value: "success"
next: success_response
- value: "error"
next: error_response
Custom Validators
# validations/my_validators.py
def validate_account_id(value: str, config: dict) -> tuple[bool, str]:
"""
Returns (is_valid, error_message).
error_message is shown to the user on failure.
"""
if not value.startswith("ACC-"):
return False, "Account ID must start with ACC- (e.g. ACC-12345)"
return True, ""
collect_account:
type: data_collector
field: account_id
validation:
method: "validations.my_validators.validate_account_id"
Built-in validators
| Validator | Config | Example |
|---|---|---|
regex |
pattern: "..." |
pattern: "^\\d{5}$" |
length |
min: N, max: N |
min: 5, max: 200 |
numeric |
min: N, max: N |
min: 1, max: 100 |
one_of |
options: [...] |
options: [Annual, Sick, Unpaid] |
not_empty |
— | — |
CLI Reference
# ── Project scaffold ──────────────────────────────────────────────────────────
flowgraph init [dir] [--provider ollama|claude|openai|...]
# Creates workflows/, actions/, validations/, config.yaml, .env.example
# Includes 2 ready-to-run examples (order tracking + FAQ)
# ── Health & diagnostics ──────────────────────────────────────────────────────
flowgraph health # Full check: DB, LLM, workflows, observability
# ── Database ──────────────────────────────────────────────────────────────────
flowgraph db set <url> # postgresql://... | ./file.db | (empty = memory)
flowgraph db test # Test database connection
# ── LLM provider ──────────────────────────────────────────────────────────────
flowgraph llm set <provider> # claude | openai | openrouter | lmstudio | ollama
flowgraph llm set claude --model claude-opus-4-6
flowgraph llm test # Send a test prompt
# ── Workflows ─────────────────────────────────────────────────────────────────
flowgraph list # List all workflows with descriptions
flowgraph show <name> # Show node graph + state schema
flowgraph validate [name] # Validate YAML syntax and routing
flowgraph new <name> # Scaffold a new workflow interactively
flowgraph run <name> # Chat with a workflow in the terminal
flowgraph run <name> --thread <id> # Resume an existing conversation
# ── API server ────────────────────────────────────────────────────────────────
flowgraph serve # Start FastAPI server (default: 0.0.0.0:8000)
flowgraph serve --port 9000 --reload
# ── Observability stack (Docker) ──────────────────────────────────────────────
flowgraph stack up # Start PostgreSQL + Langfuse + Jaeger
flowgraph stack down # Stop the stack
flowgraph stack down --volumes # Stop + delete all data
flowgraph stack status # Container health + service URLs
flowgraph stack logs # Tail all service logs
flowgraph stack logs langfuse # Tail specific service
flowgraph stack configure # Save Langfuse API keys + enable observability
# ── Dev mode ──────────────────────────────────────────────────────────────────
flowgraph dev # stack up + API server + show all URLs
# ── Testing ───────────────────────────────────────────────────────────────────
flowgraph test # Run unit tests
flowgraph test --unit # Unit tests only
flowgraph test --integration # Integration tests (needs DB + LLM)
flowgraph test --coverage # With coverage report
flowgraph test -p tests/unit/test_intent.py # Specific file
flowgraph test -v # Verbose output
REST API Reference
Start with flowgraph serve, then use the Swagger UI at http://localhost:8000/docs.
POST /workflow/start
Start a new conversation. Intent auto-classified if workflow_name is omitted.
curl -X POST http://localhost:8000/workflow/start \
-H "Content-Type: application/json" \
-d '{"message": "I need to check my SR status"}'
# With explicit workflow
curl -X POST http://localhost:8000/workflow/start \
-H "Content-Type: application/json" \
-d '{"message": "track ORD-001 from yesterday", "workflow_name": "order_tracking"}'
Response:
{
"session_id": "order_tracking:abc123",
"workflow_name": "order_tracking",
"status": "waiting_input",
"question": {"field": "order_id", "question": "Please provide your order ID."}
}
Status values: waiting_input | completed | error
POST /message
Resume a session or send a follow-up after completion.
curl -X POST http://localhost:8000/message \
-H "Content-Type: application/json" \
-d '{"session_id": "order_tracking:abc123", "message": "ORD-12345"}'
When session is already completed, returns status: "follow_up" with a contextual answer.
GET /session/{session_id}
Inspect current state of a session.
curl http://localhost:8000/session/order_tracking:abc123
GET /workflows
List all workflows with descriptions.
GET /health
System health + configuration summary.
GET /workflow/{name}/mermaid
Mermaid diagram for a workflow (for visualization).
GET /tasks
List all tasks (workflow execution cards). Supports optional status query param filter.
GET /tasks/{task_id}
Get a single task by ID including its full event timeline.
GET /tasks/session/{session_id}
Get all tasks associated with a session ID.
GET /tasks/stream/events
Server-Sent Events stream. The Kanban board connects here for live updates. Each event is a JSON-encoded task or task-event object.
All WorkflowResponse objects (from /workflow/start and /message) now include task_id and correlation_id fields.
Python Builder API
Programmatic workflow construction without YAML:
from flowgraph_ai.engine.workflow_builder import workflow
from flowgraph_ai.llm.provider import get_llm
from flowgraph_ai.config import load_config
from flowgraph_ai.storage.checkpointer import get_checkpointer
cfg = load_config()
llm = get_llm(cfg)
checkpointer = get_checkpointer(cfg)
graph = (
workflow()
.state(account_id="str", result="str")
.collect(
name="collect_id",
field="account_id",
ask="Please provide your account ID.",
next_node="process",
on_max_retry="error",
)
.action(
name="process",
method="actions.billing.check_status",
result_field="result",
routes=[{"value": "ok", "next": "done"}, {"default": "error"}],
)
.respond(name="done", template="All good! Account: {state.account_id}")
.respond(name="error", error_field="error")
.start("collect_id")
.compile(llm=llm, app_config=cfg, checkpointer=checkpointer)
)
Configuration Reference
config.yaml — controls all runtime behaviour:
app:
name: FlowGraph-AI
version: 0.1.0
# ── LLM Provider ──────────────────────────────────────────────────────────────
llm:
provider: ollama # claude | openai | openrouter | lmstudio | ollama
temperature: 0.7
max_tokens: 2048 # Global default (per-node llm_params: overrides)
top_p: 1.0
streaming: false
timeout: 30 # Seconds
ollama_model: llama3.1:8b
ollama_base_url: http://localhost:11434
claude_model: claude-sonnet-4-6
# ANTHROPIC_API_KEY env var
openai_model: gpt-4o-mini
# OPENAI_API_KEY env var
openrouter_model: openai/gpt-4o-mini
openrouter_base_url: https://openrouter.ai/api/v1
# OPENROUTER_API_KEY env var
lmstudio_model: local-model
lmstudio_base_url: http://localhost:1234/v1
# ── Engine ────────────────────────────────────────────────────────────────────
engine:
max_steps: 50 # Max LangGraph execution steps per invocation
# ── Intent Classifier ─────────────────────────────────────────────────────────
intent:
fallback_workflow: greeting # Workflow used when no intent matches
# custom_prompt: | # Optional: replace the classification prompt
# ── Guardrails ────────────────────────────────────────────────────────────────
guardrails:
enabled: true
tone: "warm and professional"
# custom_rules:
# - "Never mention competitor products"
# custom_prompt: | # Fully replaces tone + custom_rules
# You are a strict reviewer...
# ── Language Detection ────────────────────────────────────────────────────────
language_detection:
enabled: true # Detect language + respond in kind
# ── Logging ───────────────────────────────────────────────────────────────────
logging:
level: WARNING # DEBUG | INFO | WARNING | ERROR
format: text # text | json
# file: logs/app.log
# ── Checkpointing ────────────────────────────────────────────────────────────
checkpoint:
connection_string: "" # Empty = in-memory (dev)
# connection_string: "./flowgraph.db" # SQLite
# connection_string: "postgresql://user:pass@localhost:5432/mydb" # PostgreSQL
# ── Task Tracking ─────────────────────────────────────────────────────────────
tracking:
db_path: "./tasks.db" # SQLite file for task history; omit = in-memory
# ── Observability ─────────────────────────────────────────────────────────────
observability:
langfuse:
enabled: false
host: http://localhost:3000
public_key: "" # or env: LANGFUSE_PUBLIC_KEY
secret_key: "" # or env: LANGFUSE_SECRET_KEY
tracing:
enabled: false
endpoint: http://localhost:4317 # Jaeger OTLP gRPC
service_name: flowgraph-ai
Environment variables
| Variable | Purpose |
|---|---|
ANTHROPIC_API_KEY |
Claude API key |
OPENAI_API_KEY |
OpenAI API key |
OPENROUTER_API_KEY |
OpenRouter API key |
LANGFUSE_PUBLIC_KEY |
Langfuse public key |
LANGFUSE_SECRET_KEY |
Langfuse secret key |
FLOWGRAPH_LOG_LEVEL |
Override log level |
Set these in a .env file in the project root — auto-loaded at startup.
Per-node LLM override
my_node:
type: data_collector
llm_provider: claude # Override global provider for this node only
field: name
LLM Provider Setup
Ollama (local, free — default)
brew install ollama # macOS
curl -fsSL https://ollama.ai/install.sh | sh # Linux
ollama pull llama3.1:8b
ollama serve
llm:
provider: ollama
ollama_model: llama3.1:8b
Recommended models:
llama3.1:8b— fast, works for most flowsllama3.1:70b— much better JSON extraction (needed for multi-field on small models)mistral:7b— good structured output
Claude (best quality)
echo "ANTHROPIC_API_KEY=sk-ant-..." >> .env
llm:
provider: claude
claude_model: claude-sonnet-4-6
OpenAI
echo "OPENAI_API_KEY=sk-..." >> .env
llm:
provider: openai
openai_model: gpt-4o-mini
OpenRouter (access any model via one API)
echo "OPENROUTER_API_KEY=sk-or-..." >> .env
llm:
provider: openrouter
openrouter_model: anthropic/claude-3.5-sonnet
openrouter_base_url: https://openrouter.ai/api/v1
LM Studio (local)
Start LM Studio, load a model, then:
llm:
provider: lmstudio
lmstudio_model: local-model
lmstudio_base_url: http://localhost:1234/v1
Checkpointing & Persistence
Conversations persist across server restarts via LangGraph checkpointers.
# SQLite (dev — zero setup, auto-created)
flowgraph db set ./flowgraph.db
# PostgreSQL (production)
flowgraph db set postgresql://user:pass@localhost:5432/mydb
# In-memory (testing only — lost on restart)
flowgraph db set ""
# Test connection
flowgraph db test
Observability
Quick setup
flowgraph stack up # Start PostgreSQL + Langfuse + Jaeger via Docker
# Open http://localhost:3000 → sign up → create project → copy API keys
flowgraph stack configure # Enter keys → saved to config.yaml
flowgraph dev # Full dev mode: stack + API server
Services
| Service | URL | Purpose |
|---|---|---|
| Langfuse UI | http://localhost:3000 | LLM prompts, completions, tokens, cost, latency |
| Jaeger UI | http://localhost:16686 | Distributed traces: API → workflow → node spans |
| OTLP gRPC | localhost:4317 | App sends OTel spans here |
| PostgreSQL | localhost:5433 | Langfuse storage (separate from app DB) |
What gets tracked
Langfuse — every LLM call:
- Full prompt + completion
- Token usage + cost estimate
- Latency per call
- Session grouping by
thread_id - Trace name:
workflow:<name>
Jaeger — every workflow execution:
- Root span per API request
- Node-level child spans
- Workflow status + session ID attributes
Visual Workflow Builder
A drag-and-drop React app for building YAML workflows visually — no code required.
cd workflow-builder
npm install
npm run dev
# Open http://localhost:5174
Features:
- Drag-and-drop canvas (n8n-style)
- Three node types: Data Collector (blue), Custom Action (orange), Response (green)
- Click any node to configure in a side panel
- Export to YAML → drop in
workflows/directory - Dark/light mode
- Pre-built templates for common patterns
Building a workflow in the UI
- Drag node types from the sidebar onto the canvas
- Connect nodes by dragging from bottom handle to top handle
- Click a node to configure it (name, fields, routes, validation, etc.)
- Click Export YAML → save to
workflows/my_workflow.yaml - Test with
flowgraph run my_workflow
Project Structure
flowgraph-ai/
├── flowgraph_ai/
│ ├── engine/
│ │ ├── graph_builder.py # Builds LangGraph StateGraph from YAML
│ │ ├── workflow_loader.py # YAML loader + normaliser + validator
│ │ ├── workflow_builder.py # Programmatic workflow builder API
│ │ ├── guardrails.py # LLM safety post-processing (3-level)
│ │ ├── hallucination.py # Two-stage hallucination grounding check
│ │ ├── template.py # {state.field} template rendering
│ │ └── json_repair.py # Auto-repair malformed LLM JSON
│ ├── nodes/
│ │ ├── data_collector.py # Pre-extraction, tools, interrupt/resume
│ │ ├── custom_action.py # Business logic execution + routing
│ │ └── response_node.py # End node with guardrails
│ ├── tracking/
│ │ ├── models.py # Task + event data models
│ │ ├── tracker.py # SQLite task store + pub/sub
│ │ └── middleware.py # FastAPI middleware for auto task creation
│ ├── validators/
│ │ ├── builtin.py # regex, length, numeric, one_of, not_empty
│ │ └── registry.py # run_validation() dispatch
│ ├── actions/ # Example custom actions (incl. store.py)
│ ├── workflows/ # YAML workflow definitions (auto-discovered)
│ ├── storage/
│ │ └── checkpointer.py # SqliteSaver / PostgresSaver / MemorySaver
│ ├── llm/
│ │ └── provider.py # get_llm() — 5 providers + llm_params support
│ ├── api/
│ │ └── server.py # FastAPI: workflow + task endpoints (10 total)
│ ├── observability/
│ │ ├── langfuse_handler.py # Langfuse callback factory
│ │ └── tracing.py # OTel / Jaeger spans (incl. hallucination + follow_up)
│ ├── docker/ # docker-compose.yml + init-db.sql
│ ├── core/
│ │ ├── language.py # Language detection + caching
│ │ └── history.py # Conversation history helpers
│ ├── cli/
│ │ └── app.py # All CLI commands (Typer)
│ ├── validations/ # Your custom validators go here
│ ├── config.py # load_config() — YAML + .env
│ ├── config.yaml # Global runtime config
│ ├── intent.py # classify_intent() → workflow name
│ └── main.py # Interactive CLI entry point
├── frontend/ # Real-time Kanban board (React)
├── workflows/ # YAML files (add yours here)
├── actions/ # Your Python business logic goes here
├── tests/
│ ├── unit/ # Unit tests (no LLM/DB required)
│ └── integration/ # Integration tests
├── workflow-builder/ # Visual drag-and-drop builder (React)
├── .github/workflows/ci.yml # GitHub Actions CI/CD
└── README.md # ← This file (single source of truth)
As a developer, you only work in:
workflows/— define conversations in YAMLactions/— write Python business logicvalidations/— write custom validatorsconfig.yaml— configure LLM, guardrails, logging
Developer & Agent Reference
For AI agents: this section is the authoritative reference. Read before starting any work.
Workspace
- Root:
/Users/sridhar/projects/langgrpah/flowgraph-ai/ - Package manager:
uv(pyproject.toml) - Python: 3.12+
- Venv:
.venv/
Run commands
cd flowgraph-ai
# Interactive CLI
uv run python flowgraph_ai/main.py
# API server
uv run uvicorn flowgraph_ai.api.server:app --reload
# Run unit tests
uv run pytest tests/unit/ -v
Key files for agents
| File | Purpose |
|---|---|
flowgraph_ai/nodes/data_collector.py |
Pre-extraction, tools, single/multi-field logic |
flowgraph_ai/engine/graph_builder.py |
Builds LangGraph StateGraph from workflow config |
flowgraph_ai/engine/workflow_loader.py |
Loads and normalises YAML workflows |
flowgraph_ai/engine/guardrails.py |
3-level guardrails merge + apply |
flowgraph_ai/engine/hallucination.py |
Two-stage hallucination check + auto-correction |
flowgraph_ai/tracking/tracker.py |
SQLite task store + pub/sub |
flowgraph_ai/tracking/middleware.py |
FastAPI middleware for auto task creation |
flowgraph_ai/api/server.py |
FastAPI endpoints + follow-up + task endpoints |
flowgraph_ai/main.py |
CLI entry point + follow-up in CLI |
flowgraph_ai/cli/app.py |
All CLI commands including flowgraph init |
flowgraph_ai/intent.py |
Intent classification |
flowgraph_ai/llm/provider.py |
LLM provider factory + llm_params support |
flowgraph_ai/storage/checkpointer.py |
Checkpointer singleton |
Adding a new workflow
- Create
workflows/my_workflow.yaml - Add custom actions in
actions/my_actions.py(if needed) - Add custom validators in
validations/my_validators.py(if needed) - Test:
flowgraph validate my_workflow→flowgraph run my_workflow - Update this README if architecture changes
For detailed session-by-session development history, architectural decisions, and file change logs, see agent_work_logs.md.
Testing
# Unit tests (no LLM or DB required — all LLM calls are mocked)
uv run pytest tests/unit/ -v
uv run pytest tests/unit/ --cov=flowgraph_ai --cov-report=term-missing
# Integration tests (requires running LLM + DB)
uv run pytest tests/integration/ -v
# Via CLI
flowgraph test
flowgraph test --coverage
flowgraph test --integration
Tests live in:
tests/unit/— fast, isolated, no API keys needed (754 tests, 86% coverage)tests/integration/— test full graph execution with real LLM + DB
Architecture
User / Client
↓
FastAPI (api/server.py)
↓
Intent Classifier (intent.py) — routes message to correct workflow YAML
↓
Graph Builder (engine/graph_builder.py) — builds LangGraph StateGraph from YAML
↓
LangGraph StateGraph
├── data_collector — interrupt() / resume, pre-extraction, tools, validation
├── custom_action — executes Python, routes on return value
└── response — renders output, applies LLM guardrails
↓
Checkpointer (storage/checkpointer.py) — SQLite / PostgreSQL / memory
↓
LLM Provider (llm/provider.py) — Claude / OpenAI / Ollama / OpenRouter / LM Studio
↓
Observability
├── Langfuse — LLM call tracing (callbacks injected at invocation)
└── Jaeger — Distributed request tracing (OpenTelemetry spans)
Changelog
See CHANGELOG.md for the full history.
[0.2.0] — 2026-03-12
- Real-time Kanban board (
frontend/) — 8-column board with SSE live updates and activity timeline - Task tracking (
flowgraph_ai/tracking/) — SQLite pub/sub task store; auto-creates a card on every workflow start - Hallucination detection (
engine/hallucination.py) — two-stage grounding check with auto-correction - LLM parameter customization — per-node
llm_params:fortemperature,max_tokens,top_p,model, etc. args_from_statefor custom_action — clean kwarg-based function calling; maps state fields to function params- Correlation IDs — links API response, Kanban card, Jaeger trace, and Langfuse trace
- 4 new task API endpoints —
GET /tasks,GET /tasks/{id},GET /tasks/session/{id},GET /tasks/stream/events - 754 unit tests passing; 86% code coverage
- Backward compatible — no breaking changes from v0.1.0
[0.1.0] — 2026-03-12
- Pre-extraction — extracts field values from the user's first message before asking
- Tools in DataCollector — LLM calls Python tools during data collection (multi-round loop)
- Follow-up handling —
POST /messageon a completed session returnsstatus: "follow_up" flowgraph initcommand — scaffolds a complete project with two ready-to-run examples- GitHub Actions CI/CD — unit tests, lint, package build on push/PR
- 155 unit tests passing
[0.0.1] — 2026-03-11
Initial public release — 3 node types, LangGraph engine, FastAPI, 5 LLM providers, observability.
Roadmap
| Item | Priority |
|---|---|
| Push to remote git repo | High |
| Multi-field extraction improvement for small models (8B) | Medium |
| Redis checkpointer (horizontal scaling) | Medium |
| Workflow versioning | Low |
| Analytics dashboard (Langfuse-backed) | Low |
License
MIT — see LICENSE
Contributing
Contributions are welcome. Open an issue before submitting a PR.
Bug reports: include flowgraph health output + full error message.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowgraph_ai-0.2.1.tar.gz.
File metadata
- Download URL: flowgraph_ai-0.2.1.tar.gz
- Upload date:
- Size: 129.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8b7e37e1027ec59c880d81b640cd015af4d03da6a314a13641dd8e44ff854a32
|
|
| MD5 |
1a5b7100576bd131040f62417fcd9afd
|
|
| BLAKE2b-256 |
434fb11f6857652cb6d7974760a643956212d0c6d7190eba40e4b76bd5ff8014
|
File details
Details for the file flowgraph_ai-0.2.1-py3-none-any.whl.
File metadata
- Download URL: flowgraph_ai-0.2.1-py3-none-any.whl
- Upload date:
- Size: 112.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1a0a3ec64f488b89490638599d2327e5fa677ef3b3eb67634685edcc084654a3
|
|
| MD5 |
58e07ffa98010b86d40fa866b95089f0
|
|
| BLAKE2b-256 |
b55e1d385cea0722c4d8603fc09e0329da38c98e779d07ad022a3927eea3057d
|