Skip to main content

YAML-driven AI workflow engine powered by LangGraph — build conversational multi-step AI apps without boilerplate

Project description

FlowGraph-AI

PyPI version Python License: MIT LangGraph

FlowGraph-AI is a YAML-driven AI workflow engine for building conversational, multi-step AI applications. Define your entire workflow in a simple YAML file — the engine handles LLM calls, user input collection, validation, routing, and state persistence automatically.

Built on top of LangGraph with native interrupt/resume support, SQLite/PostgreSQL persistence, and a built-in FastAPI server.


Why FlowGraph-AI?

Most AI frameworks make you write hundreds of lines of code to build a simple conversational flow. FlowGraph-AI flips this — you write a YAML file, the engine does the rest.

workflow: support_ticket
description: "Create a support ticket"

state:
  name: str
  email: str
  issue: str

start_node: collect_name

nodes:
  collect_name:
    type: data_collector
    initial_message: "What is your name?"
    field: name
    next: collect_email
    on_max_retry: error_response

  collect_email:
    type: data_collector
    initial_message: "What is your email address?"
    field: email
    validation:
      method: regex
      pattern: "^[\\w.-]+@[\\w.-]+\\.\\w+$"
    next: submit
    on_max_retry: error_response

  submit:
    type: custom_action
    method: "actions.tickets.create"
    result_field: ticket_id
    routes:
      - value: "created"
        next: success
      - default: error_response

  success:
    type: response
    template: "Ticket created! ID: {state.ticket_id}. We'll email {state.email}."

  error_response:
    type: response
    error_field: error

That is the entire application. Run it with flowgraph run support_ticket or expose it via the built-in API server.


Features

  • 3-node architecturedata_collector, custom_action, response — simple enough to reason about, powerful enough for production
  • YAML-first — define complete multi-step AI workflows without boilerplate
  • Python builder API — fluent code alternative to YAML for programmatic workflow construction
  • LangGraph-powered — native interrupt/resume, stateful execution, checkpointer persistence
  • SQLite + PostgreSQL — conversations persist across restarts, zero extra config for SQLite
  • Multi-language — auto-detects user language, responds in kind across all LLM calls
  • Intent classification — automatically routes users to the right workflow based on what they say
  • Built-in validation — regex, length, numeric, one_of, not_empty — plus custom Python validators
  • Guardrails — LLM safety post-processing on every response, configurable
  • 5 LLM providers — Claude, OpenAI, OpenRouter, Ollama, LM Studio; per-node overrides
  • FastAPI server — production-ready REST API out of the box
  • Developer CLI — scaffold, validate, run, serve — all from flowgraph
  • Observability — Langfuse (LLM tracing) + Jaeger (distributed tracing) with one command setup

Installation

With pip

pip install flowgraph-ai

With uv (recommended)

uv add flowgraph-ai

From source

git clone https://github.com/simpletoolsindia/flowgraphai
cd flowgraph-ai
uv sync

Quick Start

1. Set your LLM provider

# Claude (Anthropic)
export ANTHROPIC_API_KEY=sk-ant-...
flowgraph llm set claude

# OpenAI
export OPENAI_API_KEY=sk-...
flowgraph llm set openai

# Local (Ollama, no key needed)
flowgraph llm set ollama

2. Create a workflow

flowgraph new my_workflow

Or write your own workflows/my_workflow.yaml.

3. Run it

# Interactive CLI
flowgraph run my_workflow

# API server
flowgraph serve

# Full dev mode (API + observability stack)
flowgraph dev

Core Concepts

Node Types

FlowGraph-AI has exactly 3 node types. That is intentional — simplicity over abstraction.

data_collector — Collect information from the user

Asks a question, waits for the user's reply, validates it, and routes to the next node.

collect_sr:
  type: data_collector
  role: "You are a helpful support agent."
  initial_message: "Please provide your SR number (format: SR-XXXXX)."
  field: sr_number
  description: "SR number in format SR-XXXXX"
  max_retry: 3
  next: process
  on_max_retry: error_response
  allow_intent_escape: true    # Answer off-topic questions before re-asking
  validation:
    method: "validations.sr.check_sr_format"

Multi-field collection (collects multiple fields in one prompt):

collect_dates:
  type: data_collector
  role: "You are an HR assistant."
  initial_message: "Please provide your leave start and end dates."
  fields:
    - field: start_date
      description: "Leave start date (YYYY-MM-DD)"
      validation:
        method: regex
        pattern: "^\\d{4}-\\d{2}-\\d{2}$"
    - field: end_date
      description: "Leave end date (YYYY-MM-DD)"
      validation:
        method: regex
        pattern: "^\\d{4}-\\d{2}-\\d{2}$"
  max_retry: 3
  next: submit
  on_max_retry: error_response

custom_action — Run your Python code

Executes any Python function. The function receives the full state dict, mutates it, and returns the next node name.

check_account:
  type: custom_action
  method: "actions.billing.check_status"
  result_field: billing_result
  routes:
    - value: "active"
      next: success_response
    - value: "suspended"
      next: suspended_response
    - default: error_response

The Python function:

# actions/billing.py
def check_status(state: dict) -> str:
    account_id = state["account_id"]
    result = billing_api.lookup(account_id)
    state["billing_result"] = result.status
    state["output"] = f"Account {account_id}: {result.status}"
    return result.status   # matched against routes[]

response — Send the final reply

The terminal node. Applies guardrails automatically. Supports templates and field references.

success_response:
  type: response
  template: "Done! Your request {state.request_id} has been submitted."

# Or use fields set by custom_action
final:
  type: response
  output_field: output     # state["output"] → shown to user
  error_field: error       # state["error"]  → shown if set

Routing

Pattern YAML When to use
Unconditional next: node_name Always go to one node
On-failure on_max_retry: node_name data_collector max retries exceeded
Routes list routes: [{value: "x", next: "y"}, {default: "z"}] Match custom_action return value
Conditional conditional_edges: {field: "result", paths: {...}} Route on field value

State

Every workflow has auto-injected standard state fields. Add your own custom fields in the state: block.

state:
  # Your custom fields
  account_id: str
  invoice_total: float
  items: list

# Auto-injected (always available):
#   user_message   — the user's first message
#   user_language  — detected language (e.g. "English", "Spanish")
#   output         — final response to show user
#   error          — error message (shown instead of output if set)
#   messages       — full conversation history list

Access state in templates: {state.account_id}


YAML Workflow Reference

workflow: my_workflow          # Internal name (must match filename)
description: "..."             # Used by intent classifier to route users here

# Initial field to populate with the user's first message (default: user_message)
initial_input_field: user_message

# Custom state fields
state:
  my_field: str
  count: int
  items: list

# Entry point
start_node: first_node

nodes:
  first_node:
    type: data_collector | custom_action | response
    # ... node-specific config

Python Builder API

For programmatic workflow construction without YAML:

from engine.workflow_builder import workflow
from llm.provider import get_llm
from config import load_config
from storage.checkpointer import get_checkpointer

cfg = load_config()
llm = get_llm(cfg)
checkpointer = get_checkpointer(cfg)

graph = (
    workflow()
    .state(account_id="str", result="str")
    .collect(
        name="collect_id",
        field="account_id",
        ask="Please provide your account ID.",
        next_node="process",
        on_max_retry="error",
    )
    .action(
        name="process",
        method="actions.billing.check_status",
        result_field="result",
        routes=[{"value": "ok", "next": "done"}, {"default": "error"}],
    )
    .respond(name="done",  template="All good! Account: {state.account_id}")
    .respond(name="error", error_field="error")
    .start("collect_id")
    .compile(llm=llm, app_config=cfg, checkpointer=checkpointer)
)

Custom Actions

# actions/my_actions.py

def process_request(state: dict) -> str:
    """
    Receives full workflow state.
    Mutate state to set output values.
    Return a string matched against routes[] in YAML.
    """
    sr = state["sr_number"]
    email = state.get("email", "")

    try:
        result = my_api.submit(sr, email)
        state["output"] = f"Request {sr} submitted. Ref: {result.ref_id}"
        return "success"
    except MyAPIError as e:
        state["error"] = f"Failed to submit: {e.message}"
        return "error"
submit:
  type: custom_action
  method: "actions.my_actions.process_request"
  result_field: action_result
  routes:
    - value: "success"
      next: success_response
    - value: "error"
      next: error_response

Custom Validators

# validations/my_validators.py

def validate_account_id(value: str, config: dict) -> tuple[bool, str]:
    """
    Returns (is_valid, error_message).
    error_message is shown to user on failure.
    """
    if not value.startswith("ACC-"):
        return False, "Account ID must start with ACC- (e.g. ACC-12345)"
    if len(value) != 9:
        return False, "Account ID must be exactly 9 characters (e.g. ACC-12345)"
    return True, ""
collect_account:
  type: data_collector
  field: account_id
  validation:
    method: "validations.my_validators.validate_account_id"

Built-in Validators

Validator Config Example
regex pattern: "..." pattern: "^\\d{5}$"
length min: N, max: N min: 5, max: 200
numeric min: N, max: N min: 1, max: 100
one_of options: [...] options: [Annual, Sick, Unpaid]
not_empty

CLI Reference

# Health & diagnostics
flowgraph health               # Full system health check (DB, LLM, workflows, observability)

# Database
flowgraph db set <url>         # Set checkpoint DB (postgresql://... or ./file.db or empty for memory)
flowgraph db test              # Test database connection

# LLM provider
flowgraph llm set <provider>   # Switch provider: claude | openai | openrouter | lmstudio | ollama
flowgraph llm set claude --model claude-opus-4-6
flowgraph llm test             # Send test prompt to LLM

# Workflows
flowgraph list                 # List all workflows with descriptions
flowgraph show <name>          # Show workflow node graph + state schema
flowgraph validate [name]      # Validate YAML syntax and routing
flowgraph new <name>           # Scaffold a new workflow interactively
flowgraph run <name>           # Chat with a workflow in the terminal
flowgraph run <name> --thread <id>  # Resume an existing conversation

# API server
flowgraph serve                # Start the FastAPI server (default: 0.0.0.0:8000)
flowgraph serve --port 9000 --reload

# Observability stack (Docker)
flowgraph stack up             # Start PostgreSQL + Langfuse + Jaeger
flowgraph stack down           # Stop the stack
flowgraph stack status         # Show container health + service URLs
flowgraph stack logs           # Tail all service logs
flowgraph stack logs langfuse  # Tail specific service logs
flowgraph stack configure      # Save Langfuse API keys + enable observability

# One-command full dev start
flowgraph dev                  # stack up + API server + show all URLs

REST API

Start with flowgraph serve then hit:

POST /workflow/start

Start a new conversation. Intent is auto-classified if workflow_name is omitted.

curl -X POST http://localhost:8000/workflow/start \
  -H "Content-Type: application/json" \
  -d '{"message": "I need to check my SR status"}'
{
  "session_id": "service_request:abc123",
  "workflow_name": "service_request",
  "status": "waiting_input",
  "question": {"field": "sr_number", "question": "Please provide your SR number (SR-XXXXX)."}
}

POST /message

Resume a session with the user's reply.

curl -X POST http://localhost:8000/message \
  -H "Content-Type: application/json" \
  -d '{"session_id": "service_request:abc123", "message": "SR-12345"}'

GET /session/{session_id}

Inspect current state of a session.

GET /workflows

List all available workflows.

GET /health

System health + configuration summary.

GET /workflow/{name}/mermaid

Get Mermaid diagram for a workflow (for visualization).


Configuration

config.yaml controls all runtime behaviour:

app:
  name: FlowGraph-AI
  version: 0.1.0

llm:
  provider: claude              # claude | openai | openrouter | lmstudio | ollama
  temperature: 0.7
  claude_model: claude-sonnet-4-6
  openai_model: gpt-4o-mini
  ollama_model: llama3.1:8b
  ollama_base_url: http://localhost:11434

engine:
  max_steps: 50                 # Max LangGraph execution steps per invocation

intent:
  fallback_workflow: greeting   # Workflow used when no intent matches

guardrails:
  enabled: true                 # LLM safety review on every response node output

language_detection:
  enabled: true                 # Auto-detect user language; respond in kind

checkpoint:
  connection_string: ./flowgraph.db       # SQLite (dev)
  # connection_string: postgresql://user:pass@localhost:5432/mydb  # PostgreSQL (prod)

observability:
  langfuse:
    enabled: false              # Enable after: flowgraph stack configure
    host: http://localhost:3000
    public_key: ""              # or env: LANGFUSE_PUBLIC_KEY
    secret_key: ""              # or env: LANGFUSE_SECRET_KEY
  tracing:
    enabled: false              # Enable after: flowgraph stack up
    endpoint: http://localhost:4317
    service_name: flowgraph-ai

Per-node LLM override

Any node can use a different LLM provider:

collect_name:
  type: data_collector
  llm_provider: claude          # Override global provider for this node only
  field: name
  ...

Observability

FlowGraph-AI includes first-class observability with zero manual Docker setup.

Langfuse — LLM Tracing

Tracks every LLM call: full prompts, completions, token usage, cost, latency.

flowgraph stack up             # Starts Langfuse (+ PostgreSQL + Jaeger) via Docker
# Open http://localhost:3000 → create account → copy API keys
flowgraph stack configure      # Saves keys to config.yaml, enables tracing
flowgraph serve                # API now sends all LLM traces to Langfuse

Jaeger — Distributed Tracing

Traces every API request → workflow execution → node step as nested spans.

Starts automatically with flowgraph stack up. View traces at http://localhost:16686.

What gets tracked

System Captures
Langfuse LLM prompts, completions, tokens, cost, latency per call, session grouping
Jaeger API request spans, workflow spans, node spans, error recording

Adding a New Workflow

  1. Create workflows/my_workflow.yaml
  2. Add custom actions in actions/my_actions.py (if needed)
  3. Add custom validators in validations/my_validators.py (if needed)
  4. Test: flowgraph validate my_workflow then flowgraph run my_workflow

The intent classifier discovers new workflows automatically — no code changes needed.


Architecture

User / Client
    ↓
FastAPI  (api/server.py)
    ↓
Intent Classifier  (intent.py)  — routes message to correct workflow
    ↓
Graph Builder  (engine/graph_builder.py)  — builds LangGraph StateGraph from YAML
    ↓
LangGraph StateGraph
    ├── data_collector   — interrupt() / resume, field extraction, validation
    ├── custom_action    — executes Python method, routes on return value
    └── response         — renders output, applies LLM guardrails
    ↓
Checkpointer  (storage/checkpointer.py)  — SQLite / PostgreSQL / memory
    ↓
LLM Provider  (llm/provider.py)  — Claude / OpenAI / Ollama / ...
    ↓
Observability
    ├── Langfuse  — LLM call tracing (callbacks injected at invocation)
    └── Jaeger    — Distributed request tracing (OpenTelemetry spans)

Supported LLM Providers

Provider Key How
Anthropic Claude claude ANTHROPIC_API_KEY
OpenAI openai OPENAI_API_KEY
OpenRouter openrouter OPENROUTER_API_KEY
LM Studio lmstudio No key (local)
Ollama ollama No key (local)

License

MIT — see LICENSE


Contributing

Contributions are welcome. Please open an issue before submitting a PR.

Bug reports: include flowgraph health output + the full error message. Email: support@simpletools.in Issues: https://github.com/simpletoolsindia/flowgraphai/issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowgraph_ai-0.1.0.tar.gz (73.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowgraph_ai-0.1.0-py3-none-any.whl (78.3 kB view details)

Uploaded Python 3

File details

Details for the file flowgraph_ai-0.1.0.tar.gz.

File metadata

  • Download URL: flowgraph_ai-0.1.0.tar.gz
  • Upload date:
  • Size: 73.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for flowgraph_ai-0.1.0.tar.gz
Algorithm Hash digest
SHA256 28e83e7edc56c4aa502409a983e0e1b4c11adb7f9d5669f92c7e8e8a3b6a648f
MD5 f9bafc64860b33b49765a19e676f7ea3
BLAKE2b-256 687a9d0c03532de8eda73b9d450e8206db6d318cc461fb185d82d55cbc713d70

See more details on using hashes here.

File details

Details for the file flowgraph_ai-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: flowgraph_ai-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 78.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for flowgraph_ai-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 768ab885b1f8fed52cf6e974469a400a451cf23e590079d838634298972678d7
MD5 e98285940306f223e62d29c9914fef9f
BLAKE2b-256 da808a77a141f3a0cc3303dfed70231463bf2070e05deff9148a32ec1bb2b478

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page