Skip to main content

A lightweight, modular, and developer-first workflow orchestration engine for AI/LLM pipelines.

Project description

Flowk 🌊

Flowk is a lightweight, high-performance workflow orchestration engine specifically designed for AI and LLM pipelines. It offers a simpler, developer-first alternative to complex frameworks while providing first-class support for complex routing, shared state memory, observability, and debugging.


🚀 Key Features

  • Extremely Simple API: Turn standard Python functions into executable graph nodes seamlessly.
  • Node Retries & Fallbacks: Built-in resilience out-of-the-box.
  • Dynamic Routing: Direct your execution paths dynamically on the fly based on outputs.
  • Stepping & Time Travel: Pausable execution steps and total trace replay capabilities.
  • Telemetry & Visualization: Live terminal tracking, cost metric emulation, and highly readable CLI flow rendering.
  • Pluggable Architecture: Tap into lifecycle hooks using Plugins effortlessly.

🛠️ Core Concepts

1. The Graph

The Graph is the brain of Flowk. It wires up nodes sequentially or through condition-based router intersections:

from flowk import Graph
g = Graph()

2. Nodes & State

Nodes are just typical Python functions decorated with @g.node(). An internal GraphState mutable dictionary is implicitly available across your pipeline.

# Pass `state` as an argument to read/write shared data across the lifecycle map
@g.node(retries=3)
def prepare_prompt(input_text: str, state: dict):
    state["original_query"] = input_text
    return input_text.upper()

3. Connections

Bind nodes synchronously. The Graph auto-detects the first configured node as the entrypoint. All data returned from Node A automatically gets piped into Node B as the input_text.

g.connect(prepare_prompt, call_llm)

4. Routing (Conditional Branching)

When execution forks depend on context (e.g., standard request vs. priority request), use g.route().

def check_priority(result_from_previous_node: str):
    return "fast" if "URGENT" in result_from_previous_node else "standard"

# Map condition strings to actual handling Nodes
router_node = g.route(check_priority, {
    "fast": priority_handler_node,
    "standard": normal_handler_node
})

g.connect(prepare_prompt, router_node)

🔍 Tooling & Observability

Flowk ships with beautiful tooling crafted identically for both fast prototyping and robust production monitoring.

Visualizing Graphs

Check exactly how your configuration looks using g.show().

==================================================
📊 FLOWK EXECUTION FLOW
==================================================

[ prepare_prompt ]
  │
  ▼
⟪ priority_check ⟫ (Router)
  │
  ├─[fast]──────► [ priority_handler ]
  │                 │
  │                 ▼
  │               [ cleanup ]
  │
  └─[standard]──► [ standard_handler ]
                    │
                    ▼
                  [ cleanup ] 🔄 (already visited)

==================================================

Metrics Tracking

Built-in timing tracking per node alongside mock LLM tracking usage:

g.run("Hello!")

from flowk import MetricsRegistry
print(MetricsRegistry.get_summary())

🧠 Session Memory Management

Flowk supports native execution memory persistence across multiple .run() calls via the session_id parameter. This is critically useful for multi-turn chat workflows where the LLM needs to continually append messages to the GraphState instead of wiping the slate clean!

# Turns persist data appended into state automatically
r1 = g.run("Hello", session_id="user_john")
r2 = g.run("Are you there?", session_id="user_john")

r3_anon = g.run("Who am I?") # Anonymous runs use empty states

⚡ Async, Streaming, and Parallel Execution (v2)

Flowk utilizes high-performance asynchronous primitives to match enterprise scale:

  • Define any node as async def and Flowk natively awaits it without blocking the thread pool.
  • Use g.arun() for standard async resolution.
  • Broadcast real-time node outputs manually using async for event in g.astream(...). This is extremely optimal for mapping LLM outputs into WebSocket frontends!
  • Fan-Out Parallelism: If a node splits into multiple separate nodes, Flowk executes all concurrent branches exactly concurrently using asyncio.gather.

🛑 Human-in-The-Loop (Breakpoints)

Need a human to review an action before it commits to a database? Interrupt the graph!

# 1. Compile the graph with a breakpoint
g.compile(interrupt_before=["commit_to_database"])

# 2. Execution will stop and exit when reaching the node
for event in g.astream(input_data, session_id="user_1"):
    if event["type"] == "interrupt":
        print("Waiting for human...")
        
# 3. Later, resume using the exact same session_id!
g.arun(None, session_id="user_1")

🛡️ Pydantic Safe-State Validation

Never let a silent property typo crash a 20-minute LLM pipeline again. Wrap your shared state in a Pydantic schema:

from pydantic import BaseModel
class AgentState(BaseModel):
    messages: list
    cost: float

g = Graph(state_schema=AgentState)
# Flowk will validate `AgentState(**state)` between EVERY node execution.

Debug & Time Travel

Encountering bugs in a complex run? Flowk saves runs by default!

  • To run with highly verbose sequential logging, replace g.run() with g.debug().
  • To sequentially replay historic traces visually in terminal, grab the run_id outputted from any run:
    g.replay("run-123-abc")

🧩 Plugins (Extensions)

Under the hood, flow runs evaluate through hooks (on_run_start, on_node_start, on_node_end, on_run_end). Check flowk.plugins.base.Plugin to extend the system yourself—like intercepting runs to store trace files via FileStoragePlugin!

from flowk.plugins.base import PluginManager
from flowk.plugins.storage import FileStoragePlugin

PluginManager.register(FileStoragePlugin("server_logs.jsonl"))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowk-0.1.0.tar.gz (17.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowk-0.1.0-py3-none-any.whl (17.8 kB view details)

Uploaded Python 3

File details

Details for the file flowk-0.1.0.tar.gz.

File metadata

  • Download URL: flowk-0.1.0.tar.gz
  • Upload date:
  • Size: 17.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for flowk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 eacbe35db2f5f96f62b6c8101975864eef80f441002c9e97e1995eb5a28fb001
MD5 a9c613b7d1b46e958ea18c178cc30593
BLAKE2b-256 8f46e8aa8b7df4d583b5d1d4835873cf69ad860d2ca9c3d8a190d77411d1ff62

See more details on using hashes here.

File details

Details for the file flowk-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: flowk-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 17.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for flowk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 af548822974772f80d068fe01c7d32c7ef7048377755f6272a3661720b7e23bc
MD5 89d82b231901e3657af27bcafc8afdd2
BLAKE2b-256 aa849bb871f367cf639d6c535f4baa54b17c1d485dfc974aa0b2751a75d05bf8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page