Skip to main content

Lightweight LLM workflow orchestration โ€” a hardened evolution of PocketFlow

Project description

PocoFlow

Lightweight LLM workflow orchestration. A hardened evolution of PocketFlow.

Built with love by Claude & digital-duck ๐Ÿฆ†


What It Is

PocoFlow is a minimal framework for building LLM pipelines as directed graphs of nano-ETL nodes communicating through a shared, typed Store.

It keeps PocketFlow's best idea โ€” the prep | exec | post abstraction โ€” and fixes the weaknesses that surface in production:

Weakness PocoFlow fix
Raw dict store โ€” no type safety Store with optional schema + TypeError on bad writes
Ambiguous >> edge API Single clear API: .then("action", next_node)
No built-in async support AsyncNode.exec_async() โ€” framework handles asyncio.run()
No observability Hook system: node_start / node_end / node_error / flow_end
No checkpointing JSON snapshots + SQLite backend with full event log
No long-running support run_background() โ†’ RunHandle with status, wait, cancel
Inconsistent logging dd-logging integration โ€” structured, file-backed, namespaced
No workflow visibility Streamlit monitor UI โ€” live runs table, timeline, store inspector

Dependencies: pocketflow + dd-logging


Install

# Core
pip install pocoflow

# With Streamlit monitor UI
pip install "pocoflow[ui]"

# Local dev (from the digital-duck monorepo)
pip install -e ~/projects/digital-duck/dd-logging
pip install -e ~/projects/digital-duck/pocoflow"[ui,dev]"

Quick Start

from pocoflow import Node, Flow, Store

class SummariseNode(Node):
    def prep(self, store):
        return store["document"]

    def exec(self, text):
        return llm.summarise(text)          # your LLM call here

    def post(self, store, prep, summary):
        store["summary"] = summary
        return "done"

store = Store({"document": "...", "summary": ""})
Flow(start=SummariseNode(), db_path="pocoflow.db", flow_name="summarise").run(store)
print(store["summary"])

Then open the monitor:

streamlit run pocoflow/ui/monitor.py -- pocoflow.db

Core Concepts

Node โ€” nano-ETL

Every node is a three-phase processing unit that maps directly to Extract โ†’ Transform โ†’ Load:

prep(store)              โ†’ Extract:   read what this node needs from the store
exec(prep_result)        โ†’ Transform: do the work (pure โ€” no store side-effects)
post(store, prep, exec)  โ†’ Load:      write results back, return next action string
Phase ETL step Purity
prep Extract reads store
exec Transform pure function โ€” retryable, testable without a store
post Load + Route writes store, returns action string
from pocoflow import Node

class CallLLMNode(Node):
    max_retries = 3       # retry exec() automatically on failure
    retry_delay = 1.0     # seconds between retries

    def prep(self, store):
        return store["prompt"]

    def exec(self, prompt):
        return llm.call(prompt)   # retried up to 3ร— on exception

    def post(self, store, prep, response):
        store["response"] = response
        return "done"

Store โ€” typed shared state

from pocoflow import Store

store = Store(
    data={"query": "", "result": ""},
    schema={"query": str, "result": str},   # type-checked on every write
    name="my_pipeline",
)
store["query"] = "explain quantum entanglement"
store["query"] = 42          # โ† raises TypeError immediately

# Observer: fired on every write (logging, tracing, UI updates)
store.add_observer(lambda key, old, new: print(f"{key}: {old!r} โ†’ {new!r}"))

# JSON snapshot / restore (lightweight backup)
store.snapshot("/tmp/run_42/step_002.json")
store2 = Store.restore("/tmp/run_42/step_002.json")

Flow โ€” directed graph with hooks

from pocoflow import Flow, Store

# Wire nodes with unambiguous named edges
a.then("ok",    b)
a.then("error", c)
a.then("*",     fallback)   # wildcard: matches any unhandled action

# Build with SQLite persistence
flow = Flow(
    start=a,
    flow_name="my_pipeline",    # label shown in the monitor UI
    db_path="pocoflow.db",      # SQLite: runs, events, checkpoints
    checkpoint_dir="/tmp/ckpt", # also write JSON snapshots (optional)
    max_steps=50,               # guard against infinite loops
)

# Hooks โ€” wire to any logger, metrics sink, or progress bar
flow.on("node_start", lambda name, store: print(f"โ–ถ {name}"))
flow.on("node_end",   lambda name, action, elapsed, store:
                          print(f"โœ“ {name} โ†’ {action}  ({elapsed:.2f}s)"))
flow.on("node_error", lambda name, exc, store: alert(name, exc))
flow.on("flow_end",   lambda steps, store: print(f"Done in {steps} steps"))

store = Store({"query": "..."})
flow.run(store)

AsyncNode โ€” parallel sub-tasks

from pocoflow import AsyncNode
import asyncio

class FetchNode(AsyncNode):
    def prep(self, store):
        return store["urls"]

    async def exec_async(self, urls):
        return await asyncio.gather(*[fetch(u) for u in urls])

    def post(self, store, prep, results):
        store["pages"] = results
        return "done"

Implement exec_async() โ€” the framework calls it via asyncio.run(). Use asyncio.gather() inside for true parallel sub-tasks.


SQLite Backend

When db_path is set, every run is fully recorded in a SQLite database:

pf_runs        โ€” one row per flow execution (run_id, status, timing, error)
pf_checkpoints โ€” Store snapshot after every node  (restorable at any step)
pf_events      โ€” ordered event log (flow_start โ†’ node_start/end/error โ†’ flow_end)
from pocoflow import WorkflowDB

db = WorkflowDB("pocoflow.db")

# List all runs
for run in db.list_runs():
    print(run["run_id"], run["status"], run["total_steps"])

# Inspect events for a run
for event in db.get_events("my_pipeline-3f9a1b2c"):
    print(event["event"], event["node_name"], event["elapsed_ms"])

# Restore Store from any checkpoint
store = db.load_checkpoint("my_pipeline-3f9a1b2c", step=2)

WAL mode is enabled so the Streamlit monitor can poll while a flow is running.


Long-Running Workflows

For flows that take minutes or hours, use run_background() to avoid blocking:

flow = Flow(start=my_node, db_path="pocoflow.db", flow_name="research")

# Returns immediately โ€” flow runs in a daemon thread
handle = flow.run_background(store)

print(handle.run_id)          # e.g. "research-3f9a1b2c"
print(handle.status)          # "running"   (reads live from SQLite)

# Block until done (optional timeout)
result = handle.wait(timeout=300)
print(handle.status)          # "completed"

# Cooperative cancel โ€” stops between nodes
handle.cancel()

Resume after crash

from pocoflow import WorkflowDB, Flow

db = WorkflowDB("pocoflow.db")

# Find the failed run
runs = [r for r in db.list_runs() if r["status"] == "failed"]
failed = runs[0]

# Restore store from the last successful checkpoint
checkpoints = db.get_checkpoints(failed["run_id"])
last = checkpoints[-1]
store = db.load_checkpoint(failed["run_id"], step=last["step"])

# Resume from the node after the last checkpoint
flow = Flow(start=my_flow_start, db_path="pocoflow.db")
flow.run(store, resume_from=node_after_crash)

Streamlit Monitor UI

Visualise and manage all workflow runs from a browser.

Standalone:

streamlit run pocoflow/ui/monitor.py -- pocoflow.db

Embedded in any Streamlit page:

from pocoflow.ui.monitor import render_workflow_monitor

render_workflow_monitor("pocoflow.db")

Features:

  • Runs table โ€” run ID, flow name, status badge (โœ… ๐Ÿ”„ โŒ), started time, duration, step count
  • Auto-refresh โ€” toggle on with 5 / 10 / 30 s intervals; updates live while flows run
  • Timeline tab โ€” ordered event log per run: node names, actions, per-node latency (ms), errors
  • Store Inspector tab โ€” step slider to view the Store state at any checkpoint as a key/value table + raw JSON
  • Resume tab โ€” generates a ready-to-paste Python code snippet for resuming from the selected checkpoint

Logging

PocoFlow uses dd-logging for structured, namespaced, file-backed log output.

from pocoflow.logging import setup_logging, get_logger

# Set up once at app start (e.g. in CLI entry point or Streamlit cache_resource)
log_path = setup_logging("run", log_level="debug", adapter="openrouter")
# โ†’ logs/run-openrouter-20260217-143022.log

# In any module
_log = get_logger("nodes.summarise")   # โ†’ pocoflow.nodes.summarise
_log.info("summarising  len=%d", len(text))

Logger hierarchy:

pocoflow
โ”œโ”€โ”€ pocoflow.store
โ”œโ”€โ”€ pocoflow.node
โ”œโ”€โ”€ pocoflow.flow
โ”œโ”€โ”€ pocoflow.db
โ””โ”€โ”€ pocoflow.runner

Migrating from PocketFlow

# Before
from pocketflow import Node, Flow

node_a >> node_b                 # creates "default" edge โ€” causes UserWarning
node_a - "action" >> node_b      # named edge (correct but inconsistent)
shared = {}                      # raw dict โ€” no type safety

# After
from pocoflow import Node, Flow, Store

node_a.then("action", node_b)    # single unambiguous API, always
shared = Store(data=shared_dict) # typed, observable, checkpointable
flow.run(shared)                 # plain dict also accepted for backward compat

Project Layout

pocoflow/
  __init__.py      โ€” public API: Store, Node, AsyncNode, Flow, WorkflowDB, RunHandle
  store.py         โ€” typed, observable, JSON-checkpointable shared state
  node.py          โ€” Node (sync) + AsyncNode (async) + retry
  flow.py          โ€” directed graph runner: hooks, JSON + SQLite checkpoints, background
  db.py            โ€” WorkflowDB: SQLite schema, CRUD for runs / checkpoints / events
  logging.py       โ€” dd-logging wrapper (pocoflow.* namespace)
  runner.py        โ€” RunHandle: status, wait, cancel
  ui/
    monitor.py     โ€” Streamlit workflow monitor (standalone + embeddable)
examples/
  hello.py         โ€” minimal two-node flow with hooks
tests/
  test_pocoflow.py โ€” 25 tests: Store, Node, Flow, WorkflowDB, RunHandle
docs/
  design.md        โ€” architecture, design decisions, migration guide

Comparison with PocketFlow

Feature PocketFlow PocoFlow v0.2
Core size ~100 lines ~600 lines
Shared state raw dict typed Store with schema
Edge API >> and - "action" >> (confusing) .then("action", node) only
Async nodes manual asyncio.run() per node AsyncNode.exec_async()
Observability none 4-event hook system
Checkpointing none JSON + SQLite (WorkflowDB)
Event log none pf_events table โ€” full audit trail
Long-running none run_background() โ†’ RunHandle
Retry none max_retries + retry_delay on any Node
Wildcard edges none .then("*", fallback)
Logging manual dd-logging (pocoflow.* namespace)
Monitor UI none Streamlit monitor with auto-refresh
External deps 0 pocketflow + dd-logging (both stdlib-only)

Relationship to PocketFlow

PocoFlow is spiritually a child of PocketFlow. We kept:

  • The prep | exec | post nano-ETL abstraction โ€” beautiful and correct
  • Zero vendor lock-in โ€” bring your own LLM client
  • No framework magic โ€” every behaviour is traceable to code you can read in minutes

We added what production LLM workflows actually demand:

  • Typed, observable, checkpointable Store
  • Unambiguous .then() edge API (no more UserWarning)
  • AsyncNode with exec_async()
  • Hook system for pluggable observability
  • SQLite backend โ€” full audit log, queryable checkpoints, crash recovery
  • run_background() for long-running agentic workflows
  • Streamlit monitor โ€” see every run, every node, every store state
  • dd-logging โ€” structured, file-backed, namespaced logs out of the box

PocketFlow stays listed as a dependency โ€” as a nod to its inspiration and to ease migration for projects already using it.


License

MIT โ€” see LICENSE. Copyright ยฉ 2026 digital-duck.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pocoflow-0.2.1.tar.gz (34.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pocoflow-0.2.1-py3-none-any.whl (31.1 kB view details)

Uploaded Python 3

File details

Details for the file pocoflow-0.2.1.tar.gz.

File metadata

  • Download URL: pocoflow-0.2.1.tar.gz
  • Upload date:
  • Size: 34.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.5

File hashes

Hashes for pocoflow-0.2.1.tar.gz
Algorithm Hash digest
SHA256 96cc81b89d8fbc59f6038c5ab4ae41f4a484161893185e4f2064435a6be2e519
MD5 3889cf2a478cc6479b82be4e63c1e636
BLAKE2b-256 907c861e89d64627fcd4c304deced968ecf0ac68d13f24a451ca527af4f25101

See more details on using hashes here.

File details

Details for the file pocoflow-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: pocoflow-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 31.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.5

File hashes

Hashes for pocoflow-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 80e23bc340848fb66f9d65f27ba4e564063c3f2ba5d6b9fd8d9d047adf7bf95a
MD5 5e4f9e3a889c147e671e9498127dcdf1
BLAKE2b-256 a32c3019fed29f79fe894d9e53eba401a97e74e7ec9c8c7b382d4f6b3ca790d3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page