Skip to main content

A lightweight, modular, and developer-first workflow orchestration engine for AI/LLM pipelines.

Project description

Flowk 🌊

PyPI version Python CI License: MIT

Flowk is a lightweight, high-performance workflow orchestration engine specifically designed for AI and LLM pipelines. It offers a simpler, developer-first alternative to complex frameworks like LangGraph.

Everything you need to build Enterprise Agentic Workflows is packed into pure, readable Python: async execution, dynamic routing, CLI visualizers, SQLite/Redis time-travel, Pydantic type-safety, API deployments, streaming, and a local Observability UI.


🚀 All Features

Core Execution

  • Extremely Simple API: Turn standard Python functions into executable graph nodes effortlessly.
  • Node Resiliency: Configure Node retries, timeouts, and fallback policies automatically (@g.node(retries=3)).
  • Standard Routing: Route branch paths explicitly using standard Python functions (g.route()).
  • 🛡️ Type-Safety: Graph states are strictly validated upon every transition using Pydantic.
  • ⚡ Async & Streaming: Natively await APIs and stream real-time events (g.astream()).
  • Parallel Fan-Out: Split a node into three; Flowk natively runs them exactly concurrently via asyncio.gather.

Intelligence

  • 🧠 Zero-Boilerplate Auto-Routing: Eliminate if/else logic by letting OpenAI/Anthropic pick your exact graph branches using strictly validated zero-shot classification (@g.llm_router).
  • 📦 Multi-Agent Composition: Build nested agent networks by packaging entire sub-graphs as natively executable Nodes (g.as_node()).

Developer Experience & Tooling

  • 🛑 Human-in-the-Loop: Set breakpoints to pause execution and later resume the exact thread stacks.
  • 🚀 1-Click Platform Deployment: Turn any Flowk .py into a fully typed FastAPI instance (g.serve()) or spin up the full local workflow environment with flowk dev.
  • Terminal Visualization: Render beautiful CLI graphs of your execution layout (g.show()).
  • Time Travel Replays: Encounter a bug? Flowk traces everything via Event Sourcing. Replay historical executions in debug mode (g.replay()).
  • 📊 Observability Dashboard: Track sessions, step through State Diffs, and view the Event Log visually through the local React dashboard (flowk dev or flowk ui).
  • CLI Telemetry: Query your local .flowk/flowk.db persistence layer instantly using flowk runs list and flowk runs inspect <run_id>.
  • 🧩 Pluggable Metrics: Hook models (e.g. OpenAIPlugin) into MetricsRegistry to precisely track token consumption and cost.

📦 Installation

Flowk is modular by design.

# Core execution engine
pip install flowk

# Add-ons:
pip install "flowk[api]"    # 1-Click FastAPI Deployment & Async Jobs
pip install "flowk[ui]"     # React Observability Dashboard
pip install "flowk[llm]"    # Auto-Router & Token Metrics
pip install "flowk[redis]"  # Distributed Persistence

# Install Everything
pip install "flowk[all]"

⚡ Quick Start

Building your first AI agent pipeline with Flowk takes less than a minute.

import asyncio
from pydantic import BaseModel
from flowk import Graph

# 1. Define strict state
class AgentState(BaseModel):
    query: str
    processed: bool = False

g = Graph(state_schema=AgentState)

# 2. Define Nodes
@g.node(retries=3)  # Built-in resiliency
async def intake(query: str, state: dict):
    state["query"] = query
    print(f"📥 Received: {query}")
    return query

@g.node()
async def agent(query: str, state: dict):
    state["processed"] = True
    print("🤖 Processing context...")
    return f"Processed Output for {query}"

# 3. Connect nodes
g.connect(intake, agent)

# 4. View Architecture
g.show()

# 5. Run async pipeline
if __name__ == "__main__":
    result = asyncio.run(g.arun("Calculate the speed of light."))

🧠 Zero-Boilerplate LLM Auto-Routing

Why write manual if/else logic when LLMs can intelligently route workflows based directly on your docstrings? Flowk handles the prompts and the deterministic structured outputs for you.

@g.llm_router(
    model="gpt-4o-mini",
    targets={
        "math_node": "Use this if the query contains numbers or equations.",
        "search_node": "Use this if the user asks for real-time news."
    }
)
def supervisor_router(state: dict):
    return state.get("query", "")

g.connect(parse_input, supervisor_router) 

🚀 1-Click API Gen (FastAPI)

Skip writing API boilerplate. Flowk automatically converts your Graph and Pydantic models into a fully validated FastAPI instance with /docs, /invoke, and /stream.

# Launch app
g = Graph(state_schema=MyState)
g.connect(A, B)

if __name__ == "__main__":
    g.serve(host="0.0.0.0", port=8000)

Invoke your pipeline instantaneously:

curl -X POST "http://localhost:8000/invoke" \
     -H "Content-Type: application/json" \
     -d '{"initial_state": {"user_id": 123}, "input_data": "Search for X"}'

🛑 Human-in-The-Loop (Interrupts)

Create breakpoints in your workflows. Execution suspends gracefully to allow human review (e.g. paying an invoice), letting you resurrect the session precisely where you left off.

# Set visual breakpoint
g.compile(interrupt_before=["commit_payment_node"])

# Run pipeline until suspended
for event in g.astream(input_data, session_id="user_john"):
    if event["type"] == "interrupt":
        print("Payment halted. Waiting for human approval...")

# Resume from checkpoint using identical session_id
g.arun(None, session_id="user_john")


📊 Observability Dashboard & Persistence

Flowk effortlessly saves run-histories and execution events. By default, Flowk standardizes its local storage to a .flowk/flowk.db directory structure, automatically keeping your projects clean.

# Native Memory Configurations
g = Graph()                                         # Auto-creates .flowk/flowk.db
g = Graph(checkpoint_db="local_traces.db")          # Custom SQLite Storage
g = Graph(checkpoint_db="redis://localhost:6379/0") # Redis

Spin up the native Production-Grade Dashboard (v2) to review these checkpoints visually with interactive graph topology and state diffing. The recommended workflow is using the dev command to launch the API, the UI, and automatically open your browser:

flowk dev
# Automatically opens http://localhost:8502

The new dashboard provides:

  • Interactive SVG Graphs: Visualize your workflow logic and execution paths.
  • Event Sourcing Timeline: See every immutable node transition as it happened.
  • State Diff Engine: Compare state snapshots perfectly step-by-step.
  • Session History: Browse and resume historical agent runs directly from SQLite/Redis.

📦 Multi-Agent Composition

Build powerful hierarchical orchestrations by compiling smaller sub-graphs and mounting them identically as nodes within a massive supervisor pipeline.

# Internal Research Graph
research_graph = Graph()
research_graph.connect(search_web, summarize)

# Packaged perfectly as a Node
research_node = research_graph.as_node(state_key="research_metadata")

# Plugged into Chief Editor Agent
main_graph = Graph()
main_graph.connect(plan_outline, research_node)

🐞 Time Travel & Execution Telemetry

If a run fails in production, you can trace exactly what inputs hit what nodes.

# Run your pipeline in debug mode
g.debug("input", session_id="user_1")

# Encountered a crash? Replay the precise global trajectory:
g.replay("run_id_outputted_by_telemetry")

# Track Cost Metrics via extensible Plugins
from flowk.plugins.llm import OpenAIPlugin
from flowk import MetricsRegistry

PluginManager.register(OpenAIPlugin(model="gpt-4o"))
print(MetricsRegistry.get_summary()) # => Evaluated 4040 tokens ($0.12)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowk-0.4.6.tar.gz (38.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowk-0.4.6-py3-none-any.whl (40.4 kB view details)

Uploaded Python 3

File details

Details for the file flowk-0.4.6.tar.gz.

File metadata

  • Download URL: flowk-0.4.6.tar.gz
  • Upload date:
  • Size: 38.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for flowk-0.4.6.tar.gz
Algorithm Hash digest
SHA256 93c0b2fff5c56a77501e012afbe3d37194a5bb5f9c62a6e9b0b522f9660362b4
MD5 09c9ec94f837b40ca6824cbc9ce7882d
BLAKE2b-256 ae891df7b584da01609d724b6ebc16296929add4cf15c16ca15722ee099d5f6b

See more details on using hashes here.

File details

Details for the file flowk-0.4.6-py3-none-any.whl.

File metadata

  • Download URL: flowk-0.4.6-py3-none-any.whl
  • Upload date:
  • Size: 40.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for flowk-0.4.6-py3-none-any.whl
Algorithm Hash digest
SHA256 02c7bf05f19dcbca7431d1520d2e72fbb3d6859ee992ba21676765d574ca15e3
MD5 4151b520c12e95a834def5d4289de71d
BLAKE2b-256 62591e75d7f4e27c95d099b7db9f03f899175ab405a61e1d65bbb62567bdecde

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page