Skip to main content

Async agent orchestration primitives.

Project description

PenguiFlow 🐧❄️

PenguiFlow logo

Async-first orchestration library for multi-agent and data pipelines

PenguiFlow is a lightweight Python library to orchestrate agent flows. It provides:

  • Typed, async message passing (Pydantic v2)
  • Concurrent fan-out / fan-in patterns
  • Routing & decision points
  • Retries, timeouts, backpressure
  • Dynamic loops (controller nodes)
  • Runtime playbooks (callable subflows with shared metadata)

Built on pure asyncio (no threads), PenguiFlow is small, predictable, and repo-agnostic. Product repos only define their models + node functions — the core stays dependency-light.


✨ Why PenguiFlow?

  • Orchestration is everywhere. Every Pengui service needs to connect LLMs, retrievers, SQL, or external APIs.
  • Stop rewriting glue. This library gives you reusable primitives (nodes, flows, contexts) so you can focus on business logic.
  • Typed & safe. Every hop validated with Pydantic.
  • Lightweight. Only depends on asyncio + pydantic. No broker, no server, no threads.

🏗️ Core Concepts

Message

Every payload is wrapped in a Message with headers and metadata.

from pydantic import BaseModel
from penguiflow.types import Message, Headers

class QueryIn(BaseModel):
    text: str

msg = Message(
    payload=QueryIn(text="unique reach last 30 days"),
    headers=Headers(tenant="acme")
)

Node

A node is an async function wrapped with a Node. It validates inputs/outputs (via ModelRegistry) and applies NodePolicy (timeout, retries, etc.).

from penguiflow.node import Node

class QueryOut(BaseModel):
    topic: str

async def triage(m: QueryIn) -> QueryOut:
    return QueryOut(topic="metrics")

triage_node = Node(triage, name="triage")

Flow

A flow wires nodes together in a directed graph. Edges are called Floes, and flows have two invisible contexts:

  • OpenSea 🌊 — ingress (start of the flow)
  • Rookery 🐧 — egress (end of the flow)
from penguiflow.core import create

flow = create(
    triage_node.to(packer_node)
)

Running a Flow

from penguiflow.registry import ModelRegistry

registry = ModelRegistry()
registry.register("triage", QueryIn, QueryOut)
registry.register("packer", QueryOut, PackOut)

flow.run(registry=registry)

await flow.emit(msg)          # emit into OpenSea
out = await flow.fetch()      # fetch from Rookery
print(out.payload)            # PackOut(...)
await flow.stop()

🧭 Design Principles

  1. Async-only (asyncio).

    • Flows are orchestrators, mostly I/O-bound.
    • Async tasks are cheap, predictable, and cancellable.
    • Heavy CPU work should be offloaded inside a node (process pool, Ray, etc.), not in PenguiFlow itself.
    • v1 intentionally stays in-process; scaling out or persisting state will arrive with future pluggable backends.
  2. Typed contracts.

    • In/out models per node are defined with Pydantic.
    • Validated at runtime via cached TypeAdapters.
    • flow.run(registry=...) verifies every validating node is registered so misconfigurations fail fast.
  3. Reliability first.

    • Timeouts, retries with backoff, backpressure on queues.
    • Nodes run inside error boundaries.
  4. Minimal dependencies.

    • Only asyncio + pydantic.
    • No broker, no server. Everything in-process.
  5. Repo-agnostic.

    • Product repos declare their models + node funcs, register them, and run.
    • No product-specific code in the library.

📦 Installation

pip install -e ./penguiflow

Requires Python 3.12+.

🧭 Repo Structure

penguiflow/ init.py core.py # runtime orchestrator, retries, controller helpers, playbooks node.py types.py registry.py patterns.py middlewares.py viz.py README.md pyproject.toml # build metadata tests/ # pytest suite examples/ # runnable flows (fan-out, routing, controller, playbooks)


🚀 Quickstart Example

from pydantic import BaseModel
from penguiflow import Headers, Message, ModelRegistry, Node, NodePolicy, create


class TriageIn(BaseModel):
    text: str


class TriageOut(BaseModel):
    text: str
    topic: str


class RetrieveOut(BaseModel):
    topic: str
    docs: list[str]


class PackOut(BaseModel):
    prompt: str


async def triage(msg: TriageIn, ctx) -> TriageOut:
    topic = "metrics" if "metric" in msg.text else "general"
    return TriageOut(text=msg.text, topic=topic)


async def retrieve(msg: TriageOut, ctx) -> RetrieveOut:
    docs = [f"doc_{i}_{msg.topic}" for i in range(2)]
    return RetrieveOut(topic=msg.topic, docs=docs)


async def pack(msg: RetrieveOut, ctx) -> PackOut:
    prompt = f"[{msg.topic}] summarize {len(msg.docs)} docs"
    return PackOut(prompt=prompt)


triage_node = Node(triage, name="triage", policy=NodePolicy(validate="both"))
retrieve_node = Node(retrieve, name="retrieve", policy=NodePolicy(validate="both"))
pack_node = Node(pack, name="pack", policy=NodePolicy(validate="both"))

registry = ModelRegistry()
registry.register("triage", TriageIn, TriageOut)
registry.register("retrieve", TriageOut, RetrieveOut)
registry.register("pack", RetrieveOut, PackOut)

flow = create(
    triage_node.to(retrieve_node),
    retrieve_node.to(pack_node),
)
flow.run(registry=registry)

message = Message(
    payload=TriageIn(text="show marketing metrics"),
    headers=Headers(tenant="acme"),
)

await flow.emit(message)
out = await flow.fetch()
print(out.prompt)  # PackOut(prompt='[metrics] summarize 2 docs')

await flow.stop()

Patterns Toolkit

PenguiFlow ships a handful of composable patterns to keep orchestration code tidy without forcing you into a one-size-fits-all DSL. Each helper is opt-in and can be stitched directly into a flow adjacency list:

  • map_concurrent(items, worker, max_concurrency=8) — fan a single message out into many in-memory tasks (e.g., batch document enrichment) while respecting a semaphore.
  • predicate_router(name, mapping) — route messages to successor nodes based on simple boolean functions over payload or headers. Perfect for guardrails or conditional tool invocation without building a full controller.
  • union_router(name, discriminated_model) — accept a Pydantic discriminated union and forward each variant to the matching typed successor node. Keeps type-safety even when multiple schema branches exist.
  • join_k(name, k) — aggregate k messages per trace_id before resuming downstream work. Useful for fan-out/fan-in batching, map-reduce style summarization, or consensus.

All helpers are regular Node instances under the hood, so they inherit retries, timeouts, and validation just like hand-written nodes.

Dynamic Controller Loops

Long-running agents often need to think, plan, and act over multiple hops. PenguiFlow models this with a controller node that loops on itself:

  1. Define a controller Node with allow_cycle=True and wire controller.to(controller).
  2. Emit a Message whose payload is a WM (working memory). PenguiFlow increments the hops counter automatically and enforces budget_hops + deadline_s so controllers cannot loop forever.
  3. The controller can attach intermediate Thought artifacts or emit PlanSteps for transparency/debugging. When it is ready to finish, it returns a FinalAnswer which is immediately forwarded to Rookery.

Deadlines and hop budgets turn into automated FinalAnswer error messages, making it easy to surface guardrails to downstream consumers.


Playbooks & Subflows

Sometimes a controller or router needs to execute a mini flow — for example, retrieval → rerank → compress — without polluting the global topology. call_playbook spawns a brand-new PenguiFlow on demand and wires it into the parent message context:

  • Trace IDs and headers are reused so observability stays intact.
  • The helper respects optional timeouts and always stops the subflow (even on cancel).
  • The first payload emitted to the playbook's Rookery is returned to the caller, allowing you to treat subflows as normal async functions.
from penguiflow import call_playbook
from penguiflow.types import Message

async def controller(msg: Message, ctx) -> Message:
    playbook_result = await call_playbook(build_retrieval_playbook, msg)
    return msg.model_copy(update={"payload": playbook_result})

Playbooks are ideal for deploying frequently reused toolchains while keeping the main flow focused on high-level orchestration logic.


🛡️ Reliability & Observability

  • NodePolicy: set validation scope plus per-node timeout, retries, and backoff curves.
  • Structured logs: enrich every node event with {ts, trace_id, node_name, event, latency_ms, q_depth_in, attempt}.
  • Middleware hooks: subscribe observers (e.g., MLflow) to the structured event stream.
  • See examples/reliability_middleware/ for a concrete timeout + retry walkthrough.

⚠️ Current Constraints

  • In-process runtime: there is no built-in distribution layer yet. Long-running CPU work should be delegated to your own pools or services.
  • Registry-driven typing: nodes default to validation. Provide a ModelRegistry when calling flow.run(...) or set validate="none" explicitly for untyped hops.
  • Observability: structured logs + middleware hooks are available, but integrations with third-party stacks (OTel, Prometheus) are DIY for now.
  • Roadmap: v2 targets streaming, distributed backends, richer observability, and test harnesses. Contributions and proposals are welcome!

🔮 Roadmap

  • v1 (current): safe core runtime, type-safety, retries, timeouts, routing, controller loops, playbooks via examples.
  • v2 (future): streaming support, per-trace cancel, deadlines/budgets, observability hooks, visualizer, testing harness.

🧪 Testing

pytest -q
  • Unit tests cover core runtime, type safety, routing, retries.
  • Example flows under examples/ are runnable end-to-end.

🐧 Naming Glossary

  • Node: an async function + metadata wrapper.
  • Floe: an edge (queue) between nodes.
  • Context: context passed into each node to fetch/emit.
  • OpenSea 🌊: ingress context.
  • Rookery 🐧: egress context.

📖 Examples

  • examples/quickstart/: hello world pipeline.
  • examples/routing_predicate/: branching with predicates.
  • examples/routing_union/: discriminated unions with typed branches.
  • examples/fanout_join/: split work and join with join_k.
  • examples/map_concurrent/: bounded fan-out work inside a node.
  • examples/controller_multihop/: dynamic multi-hop agent loop.
  • examples/reliability_middleware/: retries, timeouts, and middleware hooks.
  • examples/playbook_retrieval/: retrieval → rerank → compress playbook.

🤝 Contributing

  • Keep the library lightweight and generic.

  • Product-specific playbooks go into examples/, not core.

  • Every new primitive requires:

    • Unit tests in tests/
    • Runnable example in examples/
    • Docs update in README

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

penguiflow-1.0.2.tar.gz (24.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

penguiflow-1.0.2-py3-none-any.whl (17.9 kB view details)

Uploaded Python 3

File details

Details for the file penguiflow-1.0.2.tar.gz.

File metadata

  • Download URL: penguiflow-1.0.2.tar.gz
  • Upload date:
  • Size: 24.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.13

File hashes

Hashes for penguiflow-1.0.2.tar.gz
Algorithm Hash digest
SHA256 4e90fa008f92c159330b7003a3656ffdc4811edf28555fb47667302ad412b343
MD5 d772ac2ba59c2df07a938cd34de79ae3
BLAKE2b-256 96fae9584a2e986a2379566fc26ec104f1c4295dc59bd627fa4e20b8651e1759

See more details on using hashes here.

File details

Details for the file penguiflow-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: penguiflow-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 17.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.13

File hashes

Hashes for penguiflow-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 48fabbb192ca96dcf616fc0ff24f8e28597da95b6fbafc316eb196a42bcd4497
MD5 1982c0147b4457c9084b82993f7e4751
BLAKE2b-256 7e1e32592f549ed76792f2814de085543e4239be723edcb94981390ec99461b2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page