Lightweight LLM workflow orchestration โ a hardened evolution of PocketFlow
Project description
PocoFlow
Lightweight LLM workflow orchestration. A hardened evolution of PocketFlow.
Built with love by Claude & digital-duck ๐ฆ
What It Is
PocoFlow is a minimal framework for building LLM pipelines as directed graphs of nano-ETL nodes communicating through a shared, typed Store.
It keeps PocketFlow's best idea โ the prep | exec | post abstraction โ and fixes
the weaknesses that surface in production:
| Weakness | PocoFlow fix |
|---|---|
| Raw dict store โ no type safety | Store with optional schema + TypeError on bad writes |
Ambiguous >> edge API |
Single clear API: .then("action", next_node) |
| No built-in async support | AsyncNode.exec_async() โ framework handles asyncio.run() |
| No observability | Hook system: node_start / node_end / node_error / flow_end |
| No checkpointing | JSON snapshots + SQLite backend with full event log |
| No long-running support | run_background() โ RunHandle with status, wait, cancel |
| Inconsistent logging | dd-logging integration โ structured, file-backed, namespaced |
| No workflow visibility | Streamlit monitor UI โ live runs table, timeline, store inspector |
Dependencies: pocketflow + dd-logging
Install
# Core
pip install pocoflow
# With Streamlit monitor UI
pip install "pocoflow[ui]"
# Local dev (from the digital-duck monorepo)
pip install -e ~/projects/digital-duck/dd-logging
pip install -e ~/projects/digital-duck/pocoflow"[ui,dev]"
Quick Start
from pocoflow import Node, Flow, Store
class SummariseNode(Node):
def prep(self, store):
return store["document"]
def exec(self, text):
return llm.summarise(text) # your LLM call here
def post(self, store, prep, summary):
store["summary"] = summary
return "done"
store = Store({"document": "...", "summary": ""})
Flow(start=SummariseNode(), db_path="pocoflow.db", flow_name="summarise").run(store)
print(store["summary"])
Then open the monitor:
streamlit run pocoflow/ui/monitor.py -- pocoflow.db
Core Concepts
Node โ nano-ETL
Every node is a three-phase processing unit that maps directly to Extract โ Transform โ Load:
prep(store) โ Extract: read what this node needs from the store
exec(prep_result) โ Transform: do the work (pure โ no store side-effects)
post(store, prep, exec) โ Load: write results back, return next action string
| Phase | ETL step | Purity |
|---|---|---|
prep |
Extract | reads store |
exec |
Transform | pure function โ retryable, testable without a store |
post |
Load + Route | writes store, returns action string |
from pocoflow import Node
class CallLLMNode(Node):
max_retries = 3 # retry exec() automatically on failure
retry_delay = 1.0 # seconds between retries
def prep(self, store):
return store["prompt"]
def exec(self, prompt):
return llm.call(prompt) # retried up to 3ร on exception
def post(self, store, prep, response):
store["response"] = response
return "done"
Store โ typed shared state
from pocoflow import Store
store = Store(
data={"query": "", "result": ""},
schema={"query": str, "result": str}, # type-checked on every write
name="my_pipeline",
)
store["query"] = "explain quantum entanglement"
store["query"] = 42 # โ raises TypeError immediately
# Observer: fired on every write (logging, tracing, UI updates)
store.add_observer(lambda key, old, new: print(f"{key}: {old!r} โ {new!r}"))
# JSON snapshot / restore (lightweight backup)
store.snapshot("/tmp/run_42/step_002.json")
store2 = Store.restore("/tmp/run_42/step_002.json")
Flow โ directed graph with hooks
from pocoflow import Flow, Store
# Wire nodes with unambiguous named edges
a.then("ok", b)
a.then("error", c)
a.then("*", fallback) # wildcard: matches any unhandled action
# Build with SQLite persistence
flow = Flow(
start=a,
flow_name="my_pipeline", # label shown in the monitor UI
db_path="pocoflow.db", # SQLite: runs, events, checkpoints
checkpoint_dir="/tmp/ckpt", # also write JSON snapshots (optional)
max_steps=50, # guard against infinite loops
)
# Hooks โ wire to any logger, metrics sink, or progress bar
flow.on("node_start", lambda name, store: print(f"โถ {name}"))
flow.on("node_end", lambda name, action, elapsed, store:
print(f"โ {name} โ {action} ({elapsed:.2f}s)"))
flow.on("node_error", lambda name, exc, store: alert(name, exc))
flow.on("flow_end", lambda steps, store: print(f"Done in {steps} steps"))
store = Store({"query": "..."})
flow.run(store)
AsyncNode โ parallel sub-tasks
from pocoflow import AsyncNode
import asyncio
class FetchNode(AsyncNode):
def prep(self, store):
return store["urls"]
async def exec_async(self, urls):
return await asyncio.gather(*[fetch(u) for u in urls])
def post(self, store, prep, results):
store["pages"] = results
return "done"
Implement exec_async() โ the framework calls it via asyncio.run().
Use asyncio.gather() inside for true parallel sub-tasks.
SQLite Backend
When db_path is set, every run is fully recorded in a SQLite database:
pf_runs โ one row per flow execution (run_id, status, timing, error)
pf_checkpoints โ Store snapshot after every node (restorable at any step)
pf_events โ ordered event log (flow_start โ node_start/end/error โ flow_end)
from pocoflow import WorkflowDB
db = WorkflowDB("pocoflow.db")
# List all runs
for run in db.list_runs():
print(run["run_id"], run["status"], run["total_steps"])
# Inspect events for a run
for event in db.get_events("my_pipeline-3f9a1b2c"):
print(event["event"], event["node_name"], event["elapsed_ms"])
# Restore Store from any checkpoint
store = db.load_checkpoint("my_pipeline-3f9a1b2c", step=2)
WAL mode is enabled so the Streamlit monitor can poll while a flow is running.
Long-Running Workflows
For flows that take minutes or hours, use run_background() to avoid blocking:
flow = Flow(start=my_node, db_path="pocoflow.db", flow_name="research")
# Returns immediately โ flow runs in a daemon thread
handle = flow.run_background(store)
print(handle.run_id) # e.g. "research-3f9a1b2c"
print(handle.status) # "running" (reads live from SQLite)
# Block until done (optional timeout)
result = handle.wait(timeout=300)
print(handle.status) # "completed"
# Cooperative cancel โ stops between nodes
handle.cancel()
Resume after crash
from pocoflow import WorkflowDB, Flow
db = WorkflowDB("pocoflow.db")
# Find the failed run
runs = [r for r in db.list_runs() if r["status"] == "failed"]
failed = runs[0]
# Restore store from the last successful checkpoint
checkpoints = db.get_checkpoints(failed["run_id"])
last = checkpoints[-1]
store = db.load_checkpoint(failed["run_id"], step=last["step"])
# Resume from the node after the last checkpoint
flow = Flow(start=my_flow_start, db_path="pocoflow.db")
flow.run(store, resume_from=node_after_crash)
Streamlit Monitor UI
Visualise and manage all workflow runs from a browser.
Standalone:
streamlit run pocoflow/ui/monitor.py -- pocoflow.db
Embedded in any Streamlit page:
from pocoflow.ui.monitor import render_workflow_monitor
render_workflow_monitor("pocoflow.db")
Features:
- Runs table โ run ID, flow name, status badge (โ ๐ โ), started time, duration, step count
- Auto-refresh โ toggle on with 5 / 10 / 30 s intervals; updates live while flows run
- Timeline tab โ ordered event log per run: node names, actions, per-node latency (ms), errors
- Store Inspector tab โ step slider to view the Store state at any checkpoint as a key/value table + raw JSON
- Resume tab โ generates a ready-to-paste Python code snippet for resuming from the selected checkpoint
Logging
PocoFlow uses dd-logging for structured, namespaced, file-backed log output.
from pocoflow.logging import setup_logging, get_logger
# Set up once at app start (e.g. in CLI entry point or Streamlit cache_resource)
log_path = setup_logging("run", log_level="debug", adapter="openrouter")
# โ logs/run-openrouter-20260217-143022.log
# In any module
_log = get_logger("nodes.summarise") # โ pocoflow.nodes.summarise
_log.info("summarising len=%d", len(text))
Logger hierarchy:
pocoflow
โโโ pocoflow.store
โโโ pocoflow.node
โโโ pocoflow.flow
โโโ pocoflow.db
โโโ pocoflow.runner
Migrating from PocketFlow
# Before
from pocketflow import Node, Flow
node_a >> node_b # creates "default" edge โ causes UserWarning
node_a - "action" >> node_b # named edge (correct but inconsistent)
shared = {} # raw dict โ no type safety
# After
from pocoflow import Node, Flow, Store
node_a.then("action", node_b) # single unambiguous API, always
shared = Store(data=shared_dict) # typed, observable, checkpointable
flow.run(shared) # plain dict also accepted for backward compat
Project Layout
pocoflow/
__init__.py โ public API: Store, Node, AsyncNode, Flow, WorkflowDB, RunHandle
store.py โ typed, observable, JSON-checkpointable shared state
node.py โ Node (sync) + AsyncNode (async) + retry
flow.py โ directed graph runner: hooks, JSON + SQLite checkpoints, background
db.py โ WorkflowDB: SQLite schema, CRUD for runs / checkpoints / events
logging.py โ dd-logging wrapper (pocoflow.* namespace)
runner.py โ RunHandle: status, wait, cancel
ui/
monitor.py โ Streamlit workflow monitor (standalone + embeddable)
examples/
hello.py โ minimal two-node flow with hooks
tests/
test_pocoflow.py โ 25 tests: Store, Node, Flow, WorkflowDB, RunHandle
docs/
design.md โ architecture, design decisions, migration guide
Comparison with PocketFlow
| Feature | PocketFlow | PocoFlow v0.2 |
|---|---|---|
| Core size | ~100 lines | ~600 lines |
| Shared state | raw dict | typed Store with schema |
| Edge API | >> and - "action" >> (confusing) |
.then("action", node) only |
| Async nodes | manual asyncio.run() per node |
AsyncNode.exec_async() |
| Observability | none | 4-event hook system |
| Checkpointing | none | JSON + SQLite (WorkflowDB) |
| Event log | none | pf_events table โ full audit trail |
| Long-running | none | run_background() โ RunHandle |
| Retry | none | max_retries + retry_delay on any Node |
| Wildcard edges | none | .then("*", fallback) |
| Logging | manual | dd-logging (pocoflow.* namespace) |
| Monitor UI | none | Streamlit monitor with auto-refresh |
| External deps | 0 | pocketflow + dd-logging (both stdlib-only) |
Relationship to PocketFlow
PocoFlow is spiritually a child of PocketFlow. We kept:
- The
prep | exec | postnano-ETL abstraction โ beautiful and correct - Zero vendor lock-in โ bring your own LLM client
- No framework magic โ every behaviour is traceable to code you can read in minutes
We added what production LLM workflows actually demand:
- Typed, observable, checkpointable
Store - Unambiguous
.then()edge API (no moreUserWarning) AsyncNodewithexec_async()- Hook system for pluggable observability
- SQLite backend โ full audit log, queryable checkpoints, crash recovery
run_background()for long-running agentic workflows- Streamlit monitor โ see every run, every node, every store state
- dd-logging โ structured, file-backed, namespaced logs out of the box
PocketFlow stays listed as a dependency โ as a nod to its inspiration and to ease migration for projects already using it.
License
MIT โ see LICENSE. Copyright ยฉ 2026 digital-duck.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pocoflow-0.2.1.tar.gz.
File metadata
- Download URL: pocoflow-0.2.1.tar.gz
- Upload date:
- Size: 34.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
96cc81b89d8fbc59f6038c5ab4ae41f4a484161893185e4f2064435a6be2e519
|
|
| MD5 |
3889cf2a478cc6479b82be4e63c1e636
|
|
| BLAKE2b-256 |
907c861e89d64627fcd4c304deced968ecf0ac68d13f24a451ca527af4f25101
|
File details
Details for the file pocoflow-0.2.1-py3-none-any.whl.
File metadata
- Download URL: pocoflow-0.2.1-py3-none-any.whl
- Upload date:
- Size: 31.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
80e23bc340848fb66f9d65f27ba4e564063c3f2ba5d6b9fd8d9d047adf7bf95a
|
|
| MD5 |
5e4f9e3a889c147e671e9498127dcdf1
|
|
| BLAKE2b-256 |
a32c3019fed29f79fe894d9e53eba401a97e74e7ec9c8c7b382d4f6b3ca790d3
|