Skip to main content

SPL-Flow — Declarative LLM orchestration via Structured Prompt Language

Project description

SPL-Flow

Declarative LLM Orchestration — AI Symphony via SPL + PocketFlow

SPL-Flow is a platform that translates free-form natural language into SPL (Structured Prompt Language), routes each sub-task to the world's best specialist language model in parallel, and synthesizes a composed final response — an AI Symphony where each model plays the instrument it does best.


The Vision: AI Symphony

Traditional AI tools call a single general-purpose model for every task. SPL-Flow follows a Mixture-of-Models (MoM) paradigm — the same way a symphony orchestra assigns each instrument to the right player, SPL-Flow assigns each cognitive sub-task to the right specialist LLM:

Task Specialist Why
CJK characters (Chinese / Japanese / Korean) qwen/qwen-2.5-72b-instruct Leads C-Eval, CMMLU, JP-LMEH
European languages (translate, etc.) mistralai/mistral-large-2411 Leads EU multilingual MT-Bench
Code generation / review / debugging deepseek/deepseek-coder-v2 Leads HumanEval, SWE-bench
Math / science / proofs deepseek/deepseek-r1 Leads MATH, AIME, GPQA
Long-form reasoning / analysis anthropic/claude-opus-4-6 Leads MMLU-Pro reasoning
Synthesis / composition (final output) anthropic/claude-opus-4-6 Coherent long-form writing

Just write USING MODEL auto in your SPL and the system automatically routes to the optimal model.


What's New (2026-02)

API-First Architecture

src/api.py is the first-class public interface — enabling system-to-system integration, agent-to-agent workflows, testing, and automation. The CLI and Streamlit UI are thin wrappers over three core functions:

from src import api

# Translate NL to SPL (preview, no execution)
result = api.generate("List 10 Chinese characters with water radical")

# Full pipeline: NL → SPL → validate → execute → deliver
result = api.run("Summarize this article", context_text=doc, adapter="openrouter")

# Execute pre-written SPL directly (batch / agent-to-agent)
result = api.exec_spl(spl_query, adapter="ollama", provider="deepseek")

RAG Context Store (ChromaDB)

Every valid (NL query, SPL) pair from real sessions is automatically captured to a ChromaDB vector store — gold-standard human-labeled data that improves future SPL generation via dynamic few-shot retrieval.

  • Digital twin flywheel: more usage → more captured pairs → better retrieval → better SPL → tighter human-AI partnership
  • Data quality tiers: human (gold, from sessions) > edited (gold+, user-corrected) > synthetic (silver, generated offline)
  • Human-in-the-loop curation via the RAG Store Streamlit page: review, deactivate noise, delete errors

USING MODEL auto + LLM Provider

Write USING MODEL auto in any SPL PROMPT and the model router automatically classifies the task (cjk / code / eu_lang / math / reasoning / synthesis) and resolves to the best specialist model.

LLM Provider preference lets orgs or users pin auto-routing to a specific provider's models:

# Company policy = "we use Anthropic"
api.run(query, adapter="openrouter", provider="anthropic")
# → every USING MODEL auto resolves to the best Claude model for that task

Provider preference only takes effect with openrouter (which can reach all providers). With claude_cli or ollama, the adapter-level best is used regardless.

BENCHMARK — Model Evaluation Loop

Write one SPL script and run it against N models in parallel. Every model receives an identical patched copy with its USING MODEL clause replaced. Wall-clock time ≈ slowest single model, not N × one model.

BENCHMARK compare_models
USING MODELS ['anthropic/claude-opus-4-6', 'openai/gpt-4o', auto]
PROMPT analysis
SELECT
    system_role('You are an expert analyst.'),
    GENERATE('Explain the CAP theorem in 3 bullet points.')
USING MODEL auto;

Or use CALL to reference an existing .spl file and keep the BENCHMARK block minimal:

BENCHMARK summarize_test
USING MODELS ['anthropic/claude-opus-4-6', 'openai/gpt-4o', auto]
USING ADAPTER openrouter
CALL summarize.spl(document=context.document)

Results include per-model: response, token counts, latency, cost, and a prompt_results[] breakdown for multi-CTE scripts. auto is a valid entry — the router resolves it at execution time, letting you validate your explicit choices against the router's recommendation.

Multi-Page Streamlit UI

The app now uses Streamlit's pages/ multi-page pattern:

Page Purpose
app.py (Home) Architecture overview, RAG stats, recent captures
1_Pipeline.py Three-step pipeline: generate → review → execute
2_RAG_Store.py Review, curate, and manage the RAG context store
3_Benchmark.py Run one SPL script against N models in parallel; compare responses, tokens, latency, cost; mark winner

Architecture

User Query (free-form text)
        │
        ▼
  ┌─────────────┐
  │  Text2SPL   │  claude_cli LLM translates NL → SPL syntax
  │   Node      │  + RAG retrieval (dynamic few-shot examples)
  │             │◄── retry on parse failure (up to 3x)
  └──────┬──────┘
         │
         ▼
  ┌─────────────┐
  │  Validate   │  SPL parse + semantic analysis
  │   Node      │──► retry ──► Text2SPL
  └──────┬──────┘
         │ "execute"
         ▼
  ┌─────────────┐
  │   Execute   │  parse → analyze → optimize → run
  │   Node      │  USING MODEL auto → model router → specialist LLM
  │             │  (parallel CTE dispatch via asyncio)
  └──────┬──────┘
         │
    ┌────┴────┐
    │         │
    ▼         ▼
  Sync      Async
 Deliver   Deliver
(inline)  (/tmp file
           + email*)

*Email: SMTP integration planned for v0.2.

PocketFlow graph:

text2spl >> validate
validate - "execute" >> execute
validate - "retry"   >> text2spl
validate - "error"   >> sync_deliver
execute  - "sync"    >> sync_deliver
execute  - "async"   >> async_deliver
execute  - "error"   >> sync_deliver

Quickstart

1. Install dependencies

cd /home/papagame/projects/digital-duck/SPL-Flow
pip install -r requirements.txt
# For local dev against the sibling SPL engine repo:
pip install -e /home/papagame/projects/digital-duck/SPL

2. Run the Streamlit UI

streamlit run src/ui/streamlit/app.py

3. Use the CLI

# Translate a query to SPL (preview, no LLM execution)
python -m src.cli generate "List 10 Chinese characters with water radical"

# Full pipeline with provider preference
python -m src.cli run "Analyze this article" \
    --context-file article.txt \
    --adapter openrouter \
    --provider anthropic \
    --output result.md

# Execute a pre-written .spl file directly
python -m src.cli exec examples/query.spl \
    --adapter ollama \
    --param radical=# JSON output (full metrics: tokens, latency, cost)
python -m src.cli exec query.spl --json > result.json

# Quiet mode (result only — ideal for shell scripts)
python -m src.cli run "Explain X" --quiet --output answer.md

# Pipe from stdin
echo "Summarize the top 3 points" | python -m src.cli run -

# Benchmark one SPL script against multiple models in parallel
python -m src.cli benchmark query.spl \
    --model "anthropic/claude-opus-4-6" \
    --model "openai/gpt-4o" \
    --model auto \
    --adapter openrouter \
    --json > results.json

LLM Adapters

Adapter Description Setup
claude_cli (default) Local Claude CLI Install Claude CLI; no API key needed
openrouter 100+ models via OpenRouter API export OPENROUTER_API_KEY=...
ollama Local models (qwen2.5, mistral, etc.) ollama serve running locally

Note: Text2SPLNode always uses claude_cli for NL→SPL translation regardless of adapter selection. The adapter setting controls only the execution step.


Model Router

The routing table (src/utils/model_router.py) maps (task × provider/adapter) to concrete model names, sourced from HuggingFace Open LLM Leaderboard v2, LMSYS Chatbot Arena, and task-specific benchmarks (2026-02).

Task classification (heuristic, zero-cost)

Keyword / signal Task
CJK characters in text, or words like "chinese", "japanese", "kanji" cjk
"code", "function", "python", "refactor", "debug", "sql" code
"german", "french", "translate", "übersetz" eu_lang
"math", "equation", "proof", "calculate", "integral" math
"analyze", "compare", "reason", "argue", "infer" reasoning
Final PROMPT in a multi-PROMPT query synthesis
Everything else general

Provider resolution

openrouter + provider set → pick provider's best model for task
openrouter + no provider  → pick best-of-breed for task
claude_cli / ollama       → adapter-level best (provider ignored)

RAG Context Store

Auto-capture

Every valid (NL query, SPL) pair is automatically saved to ChromaDB with metadata:

  • source: "human" (from real sessions), "edited" (user-corrected), "synthetic" (generated offline)
  • user_id: scope records per user (default: shared store)
  • active: soft-delete flag — inactive records are excluded from retrieval but not deleted
  • timestamp: ISO 8601 UTC

Dynamic few-shot retrieval

When translating a new query, the top-5 most similar historical pairs are retrieved by cosine similarity and injected into the Text2SPL prompt as dynamic few-shot examples — more accurate than static hardcoded examples.

Streamlit curation UI

The RAG Store page lets you:

  • View all captured pairs with source, adapter, and timestamp
  • Filter by source (human / edited / synthetic), status (active / inactive), and keyword
  • Deactivate records (soft-delete, reversible) to exclude noise from retrieval
  • Activate previously deactivated records
  • Delete records permanently
  • Bulk actions: deactivate all shown / delete all shown

Python API

from src.rag.factory import get_store
store = get_store("chroma")                  # default: ./data/rag

# Search top-5 similar pairs
records = store.search("Chinese characters water radical", k=5)

# Upsert a record
from src.rag.store import RAGRecord
store.upsert(RAGRecord(id="abc", nl_query="...", spl_query="...", source="human"))

# Soft-delete (exclude from retrieval)
store.set_active(record_id, False)

# Per-user store
store = get_store("chroma", collection_name="spl_rag_alice")

Project Structure

SPL-Flow/
├── README.md
├── README-TEST.md             # Step-by-step testing guide
├── requirements.txt
├── .gitignore
├── data/                      # ChromaDB persist dir (gitignored)
│   └── rag/
├── src/
│   ├── api.py                 # ★ Public API (first-class interface)
│   ├── cli.py                 # Click CLI (generate / run / exec / benchmark)
│   ├── flows/
│   │   ├── spl_flow.py        # PocketFlow graph builder
│   │   └── benchmark_flow.py  # Single-node benchmark flow
│   ├── nodes/
│   │   ├── text2spl.py        # NL → SPL (+ RAG few-shot retrieval)
│   │   ├── validate_spl.py    # Parse + semantic validation
│   │   ├── execute_spl.py     # SPL engine execution + model auto-routing
│   │   ├── deliver.py         # Sync + Async delivery
│   │   └── benchmark.py       # BENCHMARK node + patch_model + _run_one
│   ├── ui/
│   │   └── streamlit/         # Streamlit UI (MVP / POC layer)
│   │       ├── app.py         # Home page
│   │       └── pages/
│   │           ├── 1_Pipeline.py   # Three-step pipeline page
│   │           ├── 2_RAG_Store.py  # RAG context store curation page
│   │           └── 3_Benchmark.py  # Multi-model benchmark page
│   ├── rag/
│   │   ├── store.py           # RAGRecord dataclass + VectorStore ABC
│   │   ├── chroma_store.py    # ChromaDB backend (default)
│   │   ├── faiss_store.py     # FAISS backend (local fallback)
│   │   └── factory.py         # get_store() factory
│   └── utils/
│       ├── model_router.py    # ROUTING_TABLE + detect_task + auto_route
│       ├── page_helpers.py    # Shared sidebar, session state, RAG cache
│       └── spl_templates.py   # Text2SPL few-shot prompt builder
└── tests/                     # (planned — see README-TEST.md)

API Reference

api.generate(query, context_text="", *, save_to_rag=True, user_id="") → GenerateResult

Translate NL → SPL without executing. Safe to call for preview and testing.

{
    "spl_query":    str,    # generated SPL
    "spl_warnings": list,   # parser/analyzer warnings
    "retry_count":  int,    # LLM call attempts
    "error":        str,    # non-empty if failed
}

api.run(query, *, adapter, provider, delivery_mode, ...) → RunResult

Full pipeline: NL → SPL → validate → execute → deliver.

{
    "spl_query":         str,
    "spl_warnings":      list,
    "primary_result":    str,           # final PROMPT content
    "execution_results": list[dict],    # per-PROMPT metrics
    "output_file":       str,           # async mode only
    "email_sent":        bool,
    "delivered":         bool,
    "error":             str,
}

api.exec_spl(spl_query, *, adapter, provider, spl_params, cache_enabled) → ExecResult

Execute pre-written SPL directly (no NL→SPL step).

{
    "primary_result":    str,
    "execution_results": list[dict],
    "error":             str,
}

Each execution_results entry:

{
    "prompt_name":  str,
    "content":      str,
    "model":        str,
    "input_tokens": int,
    "output_tokens": int,
    "total_tokens": int,
    "latency_ms":   float,
    "cost_usd":     float | None,
}

api.benchmark(spl_query, *, models, adapter, provider, spl_params, cache_enabled) → BenchmarkResult

Run one SPL script against each model in models in parallel. All N copies execute concurrently.

{
    "benchmark_name": str,
    "adapter":        str,
    "timestamp":      str,        # ISO 8601 UTC
    "spl_hash":       str,        # sha256[:32] of spl_query
    "params":         dict,
    "winner":         str | None, # set after human review
    "runs": [
        {
            "model_id":       str,
            "resolved_from":  "explicit" | "auto",
            "resolved_model": str | None,   # concrete model when auto
            "input_spl":      str,          # patched SPL actually sent
            "response":       str,          # final PROMPT output
            "input_tokens":   int,
            "output_tokens":  int,
            "total_tokens":   int,
            "latency_ms":     float,
            "cost_usd":       float | None,
            "prompt_results": list[dict],   # per-CTE breakdown
            "error":          str,
        },
        ...
    ],
}

input_spl makes every run independently reproducible: api.exec_spl(run["input_spl"]) replays any single run exactly.


Delivery Modes

Mode Behavior
sync (default) Result rendered in UI / printed to stdout immediately
async Result saved to /tmp/spl_flow_result_<timestamp>.md; download button shown

Design Philosophy

human×AI — multiplicative, not additive.

SPL-Flow is modeled after Data Copilot (a RAG app for data professionals), generalized into a platform for any LLM user. The key principles:

  • API-first: every capability is accessible programmatically — no UI required
  • Declarative: SPL separates what to compute from how to compute it
  • Mixture-of-Models: routing the right task to the right specialist beats a single monolithic model
  • Human-in-the-loop: real usage data (captured as RAG records) continuously improves the system — the more you use it, the better it gets
  • Digital twin flywheel: personal usage data → personalized retrieval → personalized responses

Roadmap

Version Focus
v0.1 MVP API-first, Text2SPL+RAG, MoM routing, BENCHMARK, multi-page UI (current)
v0.2 SMTP email delivery, routing-store winner persistence, OpenRouter cost tracking
v0.3 Multi-turn conversation, SPL template library, USING PARAMS grid search
v0.4 Team workspaces, scheduled jobs, API gateway, digital twin profiles
Platform Per-user RAG collections, fine-tuned Text2SPL, SPL marketplace

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spl_flow-0.1.0.tar.gz (73.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spl_flow-0.1.0-py3-none-any.whl (87.2 kB view details)

Uploaded Python 3

File details

Details for the file spl_flow-0.1.0.tar.gz.

File metadata

  • Download URL: spl_flow-0.1.0.tar.gz
  • Upload date:
  • Size: 73.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.5

File hashes

Hashes for spl_flow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 bd471b0b3e8bbedbae4c8202870b81b0ef2505ebd20d5e3a7ac54fc9874e949d
MD5 5898d9862e412b0bf4855c3f00e77444
BLAKE2b-256 69189df24c53dd7cc182f740cbd7bd6855c0ed004fb2d01f21c1306f483485c0

See more details on using hashes here.

File details

Details for the file spl_flow-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: spl_flow-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 87.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.5

File hashes

Hashes for spl_flow-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a96653726cc326fb321a6ef9c25479b167a8941a9f03fc7cb143afdd532200e7
MD5 ef4748d5f635ae386acef4afee139ff6
BLAKE2b-256 0b0e506797726b578252dd3e5a9895767ad5bc6a5884678f6bf1e86a38922a44

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page