A lightweight, modular, and developer-first workflow orchestration engine for AI/LLM pipelines.
Project description
Flowk 🌊
Flowk is a lightweight, high-performance workflow orchestration engine specifically designed for AI and LLM pipelines. It offers a simpler, developer-first alternative to complex frameworks while providing first-class support for complex routing, shared state memory, observability, and debugging.
🚀 Key Features
- Extremely Simple API: Turn standard Python functions into executable graph nodes seamlessly.
- Node Retries & Fallbacks: Built-in resilience out-of-the-box.
- Dynamic Routing: Direct your execution paths dynamically on the fly based on outputs.
- Stepping & Time Travel: Pausable execution steps and total trace replay capabilities.
- Telemetry & Visualization: Live terminal tracking, cost metric emulation, and highly readable CLI flow rendering.
- Pluggable Architecture: Tap into lifecycle hooks using Plugins effortlessly.
🛠️ Core Concepts
1. The Graph
The Graph is the brain of Flowk. It wires up nodes sequentially or through condition-based router intersections:
from flowk import Graph
g = Graph()
2. Nodes & State
Nodes are just typical Python functions decorated with @g.node(). An internal GraphState mutable dictionary is implicitly available across your pipeline.
# Pass `state` as an argument to read/write shared data across the lifecycle map
@g.node(retries=3)
def prepare_prompt(input_text: str, state: dict):
state["original_query"] = input_text
return input_text.upper()
3. Connections
Bind nodes synchronously. The Graph auto-detects the first configured node as the entrypoint. All data returned from Node A automatically gets piped into Node B as the input_text.
g.connect(prepare_prompt, call_llm)
4. Routing (Conditional Branching)
When execution forks depend on context (e.g., standard request vs. priority request), use g.route().
def check_priority(result_from_previous_node: str):
return "fast" if "URGENT" in result_from_previous_node else "standard"
# Map condition strings to actual handling Nodes
router_node = g.route(check_priority, {
"fast": priority_handler_node,
"standard": normal_handler_node
})
g.connect(prepare_prompt, router_node)
🔍 Tooling & Observability
Flowk ships with beautiful tooling crafted identically for both fast prototyping and robust production monitoring.
Visualizing Graphs
Check exactly how your configuration looks using g.show().
==================================================
📊 FLOWK EXECUTION FLOW
==================================================
[ prepare_prompt ]
│
▼
⟪ priority_check ⟫ (Router)
│
├─[fast]──────► [ priority_handler ]
│ │
│ ▼
│ [ cleanup ]
│
└─[standard]──► [ standard_handler ]
│
▼
[ cleanup ] 🔄 (already visited)
==================================================
Metrics Tracking
Built-in timing tracking per node alongside mock LLM tracking usage:
g.run("Hello!")
from flowk import MetricsRegistry
print(MetricsRegistry.get_summary())
🧠 Session Memory Management
Flowk supports native execution memory persistence across multiple .run() calls via the session_id parameter. This is critically useful for multi-turn chat workflows where the LLM needs to continually append messages to the GraphState instead of wiping the slate clean!
# Turns persist data appended into state automatically
r1 = g.run("Hello", session_id="user_john")
r2 = g.run("Are you there?", session_id="user_john")
r3_anon = g.run("Who am I?") # Anonymous runs use empty states
⚡ Async, Streaming, and Parallel Execution (v2)
Flowk utilizes high-performance asynchronous primitives to match enterprise scale:
- Define any node as
async defand Flowk natively awaits it without blocking the thread pool. - Use
g.arun()for standard async resolution. - Broadcast real-time node outputs manually using
async for event in g.astream(...). This is extremely optimal for mapping LLM outputs into WebSocket frontends! - Fan-Out Parallelism: If a node splits into multiple separate nodes, Flowk executes all concurrent branches exactly concurrently using
asyncio.gather.
🛑 Human-in-The-Loop (Breakpoints)
Need a human to review an action before it commits to a database? Interrupt the graph!
# 1. Compile the graph with a breakpoint
g.compile(interrupt_before=["commit_to_database"])
# 2. Execution will stop and exit when reaching the node
for event in g.astream(input_data, session_id="user_1"):
if event["type"] == "interrupt":
print("Waiting for human...")
# 3. Later, resume using the exact same session_id!
g.arun(None, session_id="user_1")
🛡️ Pydantic Safe-State Validation
Never let a silent property typo crash a 20-minute LLM pipeline again. Wrap your shared state in a Pydantic schema:
from pydantic import BaseModel
class AgentState(BaseModel):
messages: list
cost: float
g = Graph(state_schema=AgentState)
# Flowk will validate `AgentState(**state)` between EVERY node execution.
Debug & Time Travel
Encountering bugs in a complex run? Flowk saves runs by default!
- To run with highly verbose sequential logging, replace
g.run()withg.debug(). - To sequentially replay historic traces visually in terminal, grab the
run_idoutputted from any run:
g.replay("run-123-abc")
🧩 Plugins (Extensions)
Under the hood, flow runs evaluate through hooks (on_run_start, on_node_start, on_node_end, on_run_end). Check flowk.plugins.base.Plugin to extend the system yourself—like intercepting runs to store trace files via FileStoragePlugin!
from flowk.plugins.base import PluginManager
from flowk.plugins.storage import FileStoragePlugin
PluginManager.register(FileStoragePlugin("server_logs.jsonl"))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowk-0.1.0.tar.gz.
File metadata
- Download URL: flowk-0.1.0.tar.gz
- Upload date:
- Size: 17.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eacbe35db2f5f96f62b6c8101975864eef80f441002c9e97e1995eb5a28fb001
|
|
| MD5 |
a9c613b7d1b46e958ea18c178cc30593
|
|
| BLAKE2b-256 |
8f46e8aa8b7df4d583b5d1d4835873cf69ad860d2ca9c3d8a190d77411d1ff62
|
File details
Details for the file flowk-0.1.0-py3-none-any.whl.
File metadata
- Download URL: flowk-0.1.0-py3-none-any.whl
- Upload date:
- Size: 17.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
af548822974772f80d068fe01c7d32c7ef7048377755f6272a3661720b7e23bc
|
|
| MD5 |
89d82b231901e3657af27bcafc8afdd2
|
|
| BLAKE2b-256 |
aa849bb871f367cf639d6c535f4baa54b17c1d485dfc974aa0b2751a75d05bf8
|