Framework for authoring custom agentic applications, treating agents as first-class functions.
Project description
netflux is a minimalist Python framework for authoring custom agentic applications. Its core idea is simple but powerful: treat agents exactly like functions — with inputs, outputs, composition (by calling other functions), and side‑effects on stateful structures.
Our goal is a framework that is:
- semantically flexible enough to express workflows or dynamic, open‑ended problem solvers, or any hybrid of the two.
- Agents are first‑class functions. They take typed arguments, return results, can call other functions, and leave a trace of their work.
- Task Decomposition by design: Compose higher-level behavior from cohesive, reusable building blocks—mirroring how we structure libraries and helper functions in traditional programming. This hierarchy is key to building reliable agents with current LLM limitations.
Exceptions for agents: let agents raise or bubble up exceptions, or attempt to handle and recover, just like traditional code.
Quick-Start Demos & Development
To build an agentic app on netflux, just add the pypi dependency to your project. The demos/ are a good guide to rapidly getting started.
To run the demos/, you will need the provider-specific dependencies installed.
Once you make a venv, see demos/README.md to run any demo.
python -m venv .venv
source .venv/bin/activate
# Install the library in "editable" mode (`-e`), meaning your source code changes
# are immediately reflected. It also installs the `test` and `all` dependency groups,
# which include `pytest` and all the provider SDKs (Anthropic, Gemini, etc).
pip install -e .[test,all]
# Run all tests.
pytest tests/ -v
The central idea: Function
Everything in netflux is a Function. There are two concrete kinds:
-
CodeFunction— Deterministic Python code (your callable) with a declared signature. Think basic utilities, orchestrators, and agent decorators.Example utility:
TextEditor(func_lib/text_editor.py) provides file viewing and editing commands. -
AgentFunction— An LLM‑backed function with a schema (arguments), a system prompt, and an initial user prompt template. Under the hood it runs an agent loop and can call other Functions in-between thinking. We casually say “Agent” to mean an instance ofAgentFunction.Example agent:
ApplyDiffPatch(func_lib/apply_diff.py) applies unified diff patches (including diffs fenced in markdown) to files. It focuses on applying a patch — tolerating small whitespace/indentation drift and other minor fuzz — while keeping patch creation as a separate concern. This separation lets you review diffs or delegate patch generation to a different agent, avoiding context window dilution for the patch producer (an example of what we mean by "task decomposition").
Because both kinds are just Functions, any Function can call any Function:
- code → code (classic programming),
- code → agent,
- agent → code,
- agent → agent.
Calling an agent is simply invoking a function whose implementation happens to be an LLM reasoning-with-tools loop.
When we say an agent “invokes tools,” that’s physically the LLM issuing tool calls (Anthropic calls it "tools", Gemini calls it "functions"); semantically in netflux, those tools are just
Functions made available to the agent. They may map toAgentFunctions orCodeFunctions.
Why treat agents like functions? Because then composition is uniform: code can call code or agents; agents can call code or other agents. Classic programming already covers “code → code”; netflux enables the other three combinations in a principled way that looks consistent.
What the framework gives you
- A clear convention for specifying agents, workflows, orchestrators, utilities, etc (your building blocks)
- A minimal execution runtime for running, monitoring, debugging, and tracing agent instances.
- Library of well-tested built-ins (
func_lib).
Task decomposition, by design
Good programs decompose behavior into cohesive functions. We encourage the same pattern for agents:
- A high‑level
AgentFunctionbreaks work into sub‑tasks and delegates to more specialized sub‑Functions (including other agents). - This disciplined decomposition matters for agents because focused sub‑tasks with deliberate, limited context are often the bottleneck to reliable LLM execution.
- Circular references are forbidden and recursion is disallowed to prevent runaway scenarios.
There is no notion of “top” vs. “non‑top” functions. Any Function (code or agent) can be invoked as the top‑level entry from your app, or it can be a deeply nested sub‑task inside a broader agent.
Long‑running, workflow‑like agents typically act as orchestrators, breaking problems into sub‑tasks that can change as progress is made.
What defines an Agent
Every AgentFunction specifies:
- Invocation schema: typed args, so it can be called like a regular function. Seen by other agents.
- Short description: purpose and usage explanation. Seen by other agents.
- System prompt (usually static).
- First (and only) user turn (templated; substitutions using the input args).
- How to inject specifics—typically string substitution, but any deterministic transform is fine as long as args ⇒ concrete prompt is well‑defined.
- Allowed
Functions it may call (task decomposition, sub‑agents, actuators i.e. leaf tools). - Opt-in to the built‑in
RaiseExceptionfunction so the agent can proactively signal failure by raising anAgentException.
Design note: The agent’s logical reasoning replaces a function’s fixed code body. Otherwise, we treat agents and code functions uniformly—which is the foundation of netflux.
Example: find_bug_agent
find_bug_agent is an instance of AgentFunction (notice that usually it is not even necessary to subtype AgentFunction). It is an agent that inspects one or more source files in a repository, given an error message, searches for likely root causes, and writes a short report under /tmp. It returns the absolute path to the report file it created.
This is an example of task decomposition: the same agent need not be concerned with both RCA'ing the bug and resolving it, in case either or both tasks require substantial effort. This minimizes the context rot and dilution that one sub-task would have on the other, much like how a human organization may delegate these tasks separately.
It uses:
- the built‑in
text_editortool to read/write files, - the built‑in
raise_exceptionto fail honestly when appropriate. - a
repo_searchfunction (impl not shown for brevity),
find_bug_agent = AgentFunction(
name="find_bug_agent",
desc="Inspects an error message (with as many details as possible e.g. stacktrace), "
"searches workspace for context, and authors an in-depth bug report. "
"Returns the absolute path to the report written under `/tmp`. "
"Raises if details are insufficient, has trouble exploring the workspace, or lacks confidence."
args=[
FunctionArg("root", str, "Absolute path to the project or a finer-scoped dir subtree."),
FunctionArg("error_message", str, "Observed error/stack trace or description of failing behavior."),
],
system_prompt=(
"You are an extremely thorough bug investigator agent (non-conversational). "
"When invoked with an investigation, run autonomously for an extended period "
"to explore the project subtree provided, and use extreme critical thinking to "
# ... (mirror the intent of the `desc`; make sure your prompts are a superset)
),
user_prompt_template=(
# User prompt is usually composed of the instance-specific details made from `args`.
"Target project subtree: {root}\n"
"Observed error: {error_message}\n---\n"
),
# Here we can list any instances of `AgentFunction` or `CodeFunction`.
# Each callee's schematized entrypoint is automatically exposed to the agent as a tool/function.
uses=[text_editor, repo_search, raise_exception],
# Every LLM has types of tasks it is top-ranked for. Set a default or
# set based on your availability constraints. Can override on each `ctx.invoke()`.
default_model=Provider.Gemini,
)
What exactly is a CodeFunction?
Many frameworks use the word tool for what we call leaf CodeFunctions: file viewers, string replacers, shell runners, etc. These are your lowest‑level building blocks. But higher‑level CodeFunctions are just as important:
- Deterministic orchestrators: fixed logic that fans out work to one or more
AgentFunctions (or to otherCodeFunctions via direct call, or via the runtime just for observability). - Decorators/wrappers around agents that enhance behavior. The prime example is
Ensemble(func_lib/ensemble.py): it decorates anyAgentFunction; when invoked, it launches multiple independent agent runs (optionally across providers) and then forces a follow-up reconciliation of the alternative responses for enhanced results.
Example: fix_bug_workflow (a CodeFunction orchestrator)
This orchestrator does two steps determinstically in order:
- Invoke
find_bug_agent(wrapped in an Ensemble) to produce a report path under/tmp. - Invoke
bugfixer_agentto generate a unified diff.
But the second agent does another step one or more times:
- Agent→agent: have
bugfixer_agentwrite the diff to a file and invoke the built‑inapply_diff_patchagent itself to apply it. Ifapply_diff_patchraises,bugfixer_agentsees the exception, can revise the diff, and retry.
First we define another agent needed, the bugfixer_agent, and then we complete the workflow.
bugfixer_agent = AgentFunction(
name="bugfixer_agent",
desc=(
"Given a bug report and project (sub-)dir, emit a minimal unified diff (`git`-like) that fixes the issue, "
"save it to a file, and apply it by calling the built-in `apply_diff_patch` agent."
),
args=[
FunctionArg("root", str, "Absolute path to the project or a finer-scoped dir subtree."),
FunctionArg("bug_report", str, "Absolute path to the bug report."),
],
system_prompt=(
# Author a prompt that clearly instructs the agent to **plan** a fix
# and consider alternatives, before proceeding to implement the fix.
# This usually leads to much better results on difficult problems.
# Emphasize: no workarounds, minimal changes but keeping cohesive architecture, etc.
# Then instruct to use the `apply_diff_patch` as sub-agent to apply the diff.
# This lets this agent focus on the implementation instead of getting bogged down
# by file editor calls and without needing whitespace perfection (sub-agent handles well).
# If `apply_diff_patch` fails, this agent can re-try.
),
user_prompt_template=(
"Target project subtree: {root}\n"
"Bug report path: {bug_report}\n---\n"
),
uses=[text_editor, apply_diff_patch, raise_exception],
default_model=Provider.Anthropic,
)
# Wrap `find_bug_agent` with an Ensemble-of-Answers for extra reliability.
find_bug_ensemble = Ensemble(
agent=find_bug_agent,
instances={Provider.Anthropic: 1, Provider.Gemini: 3},
allow_fail={Provider.Anthropic: 0, Provider.Gemini: 1},
)
def _fix_bug_workflow(ctx: RunContext, *, root: str, error_message: str) -> str:
# 1) Find the bug (ensembled)
report_node = ctx.invoke(find_bug_ensemble, {
"root": root,
"error_message": error_message,
})
report_path = report_node.result() # path to report returned by the agent
# 2) Fix the bug: generate a unified diff, write it to a file, and apply it by invoking `apply_diff_patch`.
fix_node = ctx.invoke(bugfixer_agent, {
"root": root,
"bug_report": report_path,
})
summary = fix_node.result() # e.g. "Target successfully patched N hunks." (and/or patch path)
return f"{summary}\nReport: {report_path}"
# `CodeFunction`s are often just defined as instances, but sometimes it is convenient to define
# them as subclasses of `CodeFunction` (see `func_lib/text_editor.py` example).
fix_bug_workflow = CodeFunction(
name="fix_bug_workflow",
desc="Find the bug (ensembled), generate a minimal diff, and apply it.",
args=[
FunctionArg("root", str, "Absolute path to the project or a finer-scoped dir subtree."),
FunctionArg("error_message", str, "Observed error (stack trace or message)."),
],
callable=_fix_bug_workflow,
# For `CodeFunction`s, the `uses` should still be populated because it
# helps enforce function hierarchy to prevent risk of agent causing
# infinite cycles or recursion runaway.
uses=[find_bug_ensemble, bugfixer_agent, apply_diff_patch],
)
runtime: definition and execution
We model each Function invocation as a task executed by a Node.
A Node represents the state and history of a call both while it is executing and after it completes.
We often use the terms "function invocation", "task", and
Nodeinterchangeably.
-
CodeNoderuns aCodeFunction(a single Python call). -
AgentNoderuns anAgentFunction(the provider‑specific LLM loop), tracking:- the ordered sequence of child
Functioncalls it makes, - a full transcript of the LLM session (user/model messages, tool calls and results, and thinking blocks when available) for traceability,
- token usage accumulated throughout the loop.
- the ordered sequence of child
A tree of Nodes represents a top‑level task and all of its sub‑tasks. This tree persists after completion (until you delete it) so you can debug and trace what happened.
At any point, if you snapshot the call hierarchy, it looks like a traditional call stack—except a frame may be an agent instead of a piece of deterministic code. Deeper frames tend to be more specialized, and at the bottom you’ll typically find leaf CodeFunctions (e.g., file IO, text replacement, running a shell command).
Key perspectives
- The logical reasoning of an agent replaces the fixed code logic of a function, but otherwise we treat them the same.
- At any moment, a snapshot of the invocation tree reads like a traditional call stack—except that stack frames can be agents or code.
- Highly specialized agents often use only leaf tools (e.g., read/write files) or no tools at all (analysis‑only). Such agents appear deeper in the call stack.
Observability: NodeView and snapshots
External consumers (e.g., your UI) do not read Node objects directly—those mutate as tasks run. Instead, you consume NodeView, an immutable, consistent, atomic snapshot of a node and its entire subtree at a single global version.
A minimal watch loop facility is provided for event-driven UI and looks like this:
from netflux.core import NodeState
prev = 0
while True:
view = node.watch(as_of_seq=prev) # blocks until there is a newer snapshot
prev = view.update_seqnum
# read view.state / view.outputs / view.exception / view.children safely
print(f"[{view.update_seqnum}] node={view.id} state={view.state.name}")
if view.state in TerminalNodeStates:
break
This ensures your UI only sees consistent views of the task tree.
SessionBag & Objects
In OOP, methods are functions that mutate an object’s state. netflux supports similar patterns with a SessionBag, a scoped object store with the lifetime of a task (and handy access to parent/root scopes). There are two important use cases in mind:
Functions can stash and retrieve objects to act like methods scoped to their parent or root ancestor. A good example of this is an agent that usesbashto perform its task, where it needs to persist aBashSessionacross command invocations (in this case, theBashSessionis kept in the agent-scopeSessionBagand the children invocations retrieve it).Functions may use theSessionBagobjects to pass input/output across sub‑tasks without serializing through text.
Concurrency (fan‑out)
A task and its sub‑tasks form a tree where each node’s children are ordered by invocation (child edges record the call sequence). Like Futures, you can launch multiple children in parallel and defer collecting each node.result() until you’re ready to block. For AgentFunctions, this also works when the underlying model supports parallel tool calling.
Top‑level tasks vs. sub‑tasks
A top‑level task is any Function call initiated by your app, which is external to and consuming the framework (e.g., a web handler or CLI tool). It can be a coarse orchestrator or a fine‑grained utility; there’s no special status—any Function can be called from the top or from deep inside a tree.
Long‑running, workflow‑like agents usually act as orchestrators, dynamically redefining sub‑tasks as progress is made. Highly specialized agents tend to live deeper in the stack and may use only leaf CodeFunctions (or even no tools if purely analytical). However, nothing stops your from invoking specialized agents directly to create top-level tasks — the framework is agnostic to this.
Exceptions
Another core idea is that Exceptions flow fluidly through Functions as in regular programming:
- A
CodeFunctioncanraisefor ordinary reasons (bad args, invariants, business logic). They may be raised by the framework during argument validation. Or they may be raised by the function during biz logic execution. They will be presented to agents in function call results. Most LLM providers support some way to indicate error, and we put theExceptionstringification in the response (providers/extensions must do this correctly). - An
AgentFunctioncan decide to fail by calling the built‑inraise_exception(), which raises anAgentException. Example reasons include missing context, unavailable sub‑functions, irrecoverable or repeated child errors, or determining the task is unsolvable. This reduces hallucinations by encouraging honest failure. - Downstream code can catch and handle exceptions, or let them bubble to the caller. This is true for both
CodeFunctions via normal try/catch, andAgentFunctions which can be instructed how to handle various problems or not. The smarter models get, the more they can handle exceptions autonomously, provided the messages are sufficiently descriptive.
See the detailed Exception Model below for guidance on when agents should raise, bubble, or retry.
Cooperative Cancellation
Cooperative Cancellation uses cancellation token chaining, similar to that seen in languages like C# (CancellationTokens) and Go (Contexts). For now we simply use mp.Event for these. Since tasks can be long-running, especially when they are agents, it's imperative to be able to timely interrupt entire trees or sub-trees to save resources when agents are not going in the desired direction or progress is not meeting time deadlines, and also just for responsive user experience.
By "cooperative", we refer to the pattern of cancellation chaining that requires framework consumers to properly adhere to the pattern in order to get the benefit. This means:
- New
providers/extensions (AgentNodesubtypes) should check for cancellation at opportune times (before invoking children; before initial remote model invocation or before following up with function results).AgentNodes shouldpost_cancel()in their agent loops and then simply exit the loop (return), or alternatively they can raisecancellationException. Seeproviders/anthropic.pyfor an example. CodeFunctioncallables should similarly be responsive toself.is_cancel_requested()and simply raisecancellationExceptionas opportune times.- Always check for cancellation before invoking children tasks.
- Always collect running children (e.g. block on each child
node.result()) before responding to a cancellation request. - If an agent loop or callable is able to determine a success/exception outcome at or near the same time it would respond to cancellation, it should always prioritize concluding with success/exception instead of reacting to the cancellation request. This is because the work was done anyway, so you want the transcripts to show whatever was actually done at the time of cancellation.
Providers
Providers are subtypes of AgentNode that bridge the framework’s pattern to each model’s SDK (Anthropic, Gemini, etc.). It is the driver that runs the agent loop and manages function calls, forwarding them through the framework. Adding a new provider means implementing a new AgentNode subtype. See more details below on writing a new providers/ extension.
Token accounting
Each agent instance tracks token usage over its lifetime (updated on every request/response), including input tokens (e.g., cache hits/writes vs. regular), and output tokens (e.g., reasoning vs. final text where available). See the section below for the full accounting fields and how to access them.
A tiny end‑to‑end example
Below we reuse the earlier definitions:
find_bug_agent(AgentFunction)bugfixer_agent(AgentFunction)find_bug_ensemble(CodeFunction as decorator/wrapper around AgentFunction)fix_bug_workflow(CodeFunction as simple static workflow orchestrator)- built‑ins:
text_editor,apply_diff_patch,raise_exception
…and wire them up in a minimal end‑to‑end run. We also show a simple watcher using NodeView to print progress.
# --- Imports from netflux ---
from netflux.core import NodeState
from netflux.providers import Provider
from netflux.runtime import Runtime
# Built-ins
from netflux.func_lib.text_editor import text_editor # CodeFunction (leaf tool)
from netflux.func_lib.apply_diff import apply_diff_patch # AgentFunction (built-in)
from netflux.func_lib.raise_exception import raise_exception # CodeFunction (to raise AgentException)
from netflux.func_lib.ensemble import Ensemble # CodeFunction decorator
# Introduced earlier in the sections above (already defined):
# - repo_search (CodeFunction)
# - find_bug_agent (AgentFunction)
# - bugfixer_agent (AgentFunction)
# - find_bug_ensemble = Ensemble(agent=find_bug_agent, ...)
# - fix_bug_workflow (CodeFunction orchestrator)
# Demo auth factories (reads api keys for Anthropic & Gemini from file).
# Consumer must always specify the factory functions to create the LLM SDK clients
# since this configures endpoint, authorization mechanism, etc.
from netflux.demos.client_factory import CLIENT_FACTORIES
# Register everything we intend to use.
runtime = Runtime(
specs=[
# Our app's custom building blocks:
repo_search, find_bug_agent, bugfixer_agent, find_bug_ensemble, fix_bug_workflow,
# Built-ins we depend on:
text_editor, apply_diff_patch, raise_exception
],
client_factories=CLIENT_FACTORIES,
)
# Invoke the top-level `CodeFunction` task.
# We also could directly invoke any of the agents if we wanted.
root = runtime.get_ctx().invoke(
fix_bug_workflow,
{"root": "/repos/my_repo/sub/problem_library", "error_message": "..."}
)
# Optional: simple watcher to show progress (consistent snapshots via NodeView).
prev = 0
while True:
view = root.watch(as_of_seq=prev)
prev = view.update_seqnum
print(f"[{view.update_seqnum}] node={view.id} fn={view.fn.name} state={view.state.name}")
for child in view.children:
print(f" └─ child id={child.id} fn={child.fn.name} state={child.state.name}")
if view.state in TerminalNodeStates:
break
# Finally, print the result or surface the exception.
try:
print(root.result()) # blocks until done; returns output or raises the exception.
except Exception as e:
print(f"Workflow failed: {e}")
This refined example shows:
- agent → code (
find_bug_agentandfix_bug_agentusetext_editor,repo_search, andraise_exception) - code → agent (the
fix_bug_workfloworchestrator calls both agents and the built-inapply_diff_patch) - agent → agent (
fix_bug_agentinvokingapply_diff_patchone or more times) - decorator CodeFunction (
Ensemble) wrapping an agent to improve reliability - watcher loop using
NodeViewto print consistent progress updates
Tips & Tricks
Context Engineering
- The framework tries to make it easy to do effective context engineering. Usually higher-level agents will have the role of orchestration or workflow. Lower-level agents will solve more concrete patterns of problem. As LLMs become more sophisticated, a single agent can take on a broader set of responsibilities (more Functions as its disposal, and longer-running). Partition sub-tasks as fine-grained as needed but don't over-partition unnecessarily as this can degrade your evals.
- user prompt: (1) all the agent-specific context of the generic problem background (even if common to all instances this is still not system prompt), (2) the specific problem instance the agent is being invoked to do now.
- may help to be slightly repetitive of system prompt elements in user prompt to get better adherence of critical instructions.
- system prompts kept focused, relevant, small and stable: agent’s role declaration, generic task explanation, non‑negotiable rules/guardrails, output contract, meticulosity, verbosity/brevity, tool‑use policy (steer how often and when to use certain tools, beyond tool schema).
- "you focus on performance optimization of the algorithm already select; do not propose new algorithms, just optimize impl using the one chosen."
- "you must use tools to test performance and confirm speedups. You cannot just be speculative -- your results need to be backed up by numbers and you can admit lack of improvement."
- agents may use files for input(s)/output(s). Input filepaths would be given as args, and output filepaths may be returned (Write File tool used prior to returning).
- Allows the same intermediate output to be re-used by multiple Functions without needing to repeat tokens.
Misc
- structured outputs are discouraged since LLMs are sophisticated enough to parse unstructured outputs from their sub-tasks. However, sometimes strict structured outputs are critical, and this can be enforced by defining
CodeFunctions where the arguments are the schema and the Callable performs serialization and/or verification, depending on the reason for the structured output, and returns empty or provides a filepath with the serialized data, etc. You can leverage the framework's Exceptions Model to propagate an Exception if verification fails. - When authoring agents, place the common prompt before the specifics of the task instance. This is because LLMs are known to pay greater attention to the beginning and end of the context window. Particularly, when giving background information, whatever most heavily will influence the specific actions the agent will take should be placed closer to the end of the prompt.
human_in_loop()- becomes blocking for human input. Human can interject and this content will present forward guidance in the "function" output.
- implement the hook to human UI using whatever mechanism particular to your application.
- various reasons why model may choose to invoke: a. sign-off at key points b. lacking confidence and need guidance on the task c. on the verge of raise_exception() and seeking opinion of what to try before doing so.
Entities & Architecture
Function
Function is the central abstraction whereby code or agents are both abstracted as merely being kinds of function calls. Specification / metadata describing the agent or code.
- Concrete subtypes must override abstract property
uses() -> List[Function], specifying anyFunctions that can beinvoke()d by theFunction. AgentFunction: can be invoked by anyFunction.- user subtypes to define their own agents (could use abstract properties that user must override).
- subtype must specify: input vars, system prompt, templated user prompt (var substitution). Each var may be given as strings or filepaths (upon instantiation of the agent, files would have to be loaded and then substitution done by the runner infra instead of asking the agent to do it).
- subtype specifies
uses: List[Function]— theFunctions that the agent may invoke.
CodeFunction: can also be invoked by anyFunction.- some framework built-in subtypes (
Ensemble,ThinkMoreDecorator). - mostly user subtypes to define any plain python functions that do some deterministic logic, intended to be invoked most often by
AgentFunctions or as the top-level request, to coordinate sub-agents doing sub-tasks. - may also invoke another
CodeFunctionwithin their code although this will be less common. - points to a python function Callable. First arg is a
RunContextwhich is used to invoke the framework to run aFunction. - spec gives the arguments (names, types, description) without the
RunContext. Framework will later check that the Callable matches the spec + theRunContextarg present. For now only allow basic primitive types (string, int, float, bool). Use python primitives to indicate types. - in user python code (inside the Callable), user can invoke other
Functionby doing this:- invoke another
CodeFunctionvia:- Just call the callable directly (regular python code calling a function); framework does not see this happening and it's perfectly allowed.
- in the case of
Ensemble, user could theoretically use:<Ensemble instance>.callableafter they have one. - Pass the same
RunContextthrough. - No need to include the invoked function in the
uses()property.
- in the case of
- Use the framework runner infra to invoke, via
RunContext.invoke(). Possible invocation of aCodeFunctionmust be declared inuses.
- Just call the callable directly (regular python code calling a function); framework does not see this happening and it's perfectly allowed.
- invoke an
AgentFunction:- use the framework runner infra to invoke, via
RunContext.invoke(). Possible invocation of anAgentFunctionmust be declared inuses.
- use the framework runner infra to invoke, via
- invoke another
- some framework built-in subtypes (
Runtime
Runtime is the top-level runner responsible for execution of trees and managing their state.
- Framework-provided object encapsulating all runner infra
- Created with a collection of user-defined
Functions (hierarchy) that may be invoked directly or indirectly; Author definesFunctions fully before creating aRuntime.runtime = Runtime(specs: List[Function], client_factories: Mapping[Provider, Callable[[], Any]])client_factoriesare factory functions that return an instance of the client type expected by each provider.- Used in
AgentNodeprovider-specific subtypes. - Support pluggable configuration and authentication mechanisms unique to the consumer's app.
- Used in
runtime.get_ctx() -> RunContext: return a specialRunContextthat is outside the scope of any Task (Functioninvocation).
- During registration, the runtime automatically performs a BFS over each Function's
usesgraph to discover and register all transitively referenced Functions. Consumers may seed with a partial set; transitives are added automatically. Duplicate names that point to different Function instances are rejected. - Responsible for creating trees of
Nodes that executeFunctions.RunContext.invoke()postsFunctioninvocations to theRuntime.Runtimecreates childNodefor the invocation, updates the relationship in theNodecaller, and updates its ownNode-indexing data structures.Runtimedrives the childNodeto start when resources are available (i.e. agent concurrency control is managed by the runtime).CodeNodewill always be started immediately.
- Provides consumer interface for querying trees of
Nodes.- e.g. visualization
list_toplevel_views() -> List[NodeView]: get consistent snapshot of all root tasksget_view(node_id: int) -> NodeView: get latest snapshot for any node without blockingwatch(node: Node | int, as_of_seq: int = 0, *, timeout: Optional[float] = None) -> Optional[NodeView]: block until newer snapshot available; iftimeoutelapses, returnsNone(read more aboutNodeViews below).- To prevent race conditions, consumers should use
Runtimeto query state viaNodeViews and do top-level invocations.
RunContext
RunContext is a common framework interface used by both framework consumers and framework internal impl to invoke Functions.
- serves at least as the interface for:
- top-level task invocation; called by an app that is consuming the framework and a collection of
Functions (the app or someone else may define these); access viaRuntime.get_ctx(). - python code for user-defined or framework-builtin
CodeFunctions that invoke otherFunctions. - when some framework component needs to handle agents doing tool calls, that component delegates invocation to the
RunContext.
- e.g. they all use:
ctx.invoke(fn: Function, args: Dict[str, Any], provider: Optional[Provider] = None) -> Node
- top-level task invocation; called by an app that is consuming the framework and a collection of
- every
Functioninvocation has aRunContextgiven to it, providing the interface, but also tracking the particularFunctionusing it.- when a
Functioninvokes anotherFunction(including when framework handlesAgentFunctioninvoking anyFunctionvia tool call), theRunContextknows its associated invokingNode(identity of the caller) and causes creation of the invokedNode.- this information is used to construct the directed edges relationships of the
Nodetree. A single top-level task invocation is the parentNodeof a tree.
- this information is used to construct the directed edges relationships of the
- when a
- Each top-level
ctx.invoke()(by consuming app) initiates one tree ofNodes where the parentNodeof the tree is the top-level Task.- Each top-level Task is an independent tree with
Nodes disjoint from those originating from other top-level tasks. - Each top-level Task may originate from the invocation of any
Functionthat was registered with theRuntimeconstructor, thus being coarse-grained tasks or fine-grained tasks at the top level.- General idea: Fine-grained top-level Tasks would appear as shallow trees, that may be comparable to the deepest subtrees of a coarse-grained top-level Task that decomposes into the former -- the latter being a broader-scope task that needs to solve the former's scope of problem perhaps as a mere sub-sub-Task.
- Each top-level Task is an independent tree with
- Fields:
node: Optional[Node]: a reference to the particularNodeidentifying this specificFunctioninvocation.Nonefor top-level contexts.runtime: Runtime: a reference to the sharedRuntime.object_bags: Dict[SessionScope, SessionBag]: references to session bags accessible at different scopes.cancel_event: Optional[Event]: cooperative cancellation token inherited from the caller unless explicitly overridden by the caller.
- Methods:
invoke(fn: Function, args: Dict[str, Any], provider: Optional[Provider] = None, cancel_event: Optional[Event] = None) -> Node: invoke aFunction, optionally overriding the cancellation scope, and return the createdNode.post_status_update(state: NodeState): update the current node's status.post_success(outputs: Any): mark the current node as successful with given outputs.post_exception(exception: Exception): mark the current node as failed with given exception.post_cancel(): mark the current node as terminally canceled.cancel_requested() -> bool: helper to check whether the associated cancellation token has been triggered. This does not mean that theNodeis already canceled and in canceled state -- it means there is active signaled intention to cancel.
- Narrow Scope:
RunContextis just a mechanism to pass onFunctioninvocation directives to theRuntimeto act on them.
Node
Node is an core abstract object that represents the invocation of a Function (which we also call a "Task").
AgentNode: represents and manages the state and running of anAgentFunctioninvocation.AnthropicAgentNode- particular implementation when the
AgentFunctionis invoked with Anthropic LLM (e.g. Opus 4.1).
- particular implementation when the
GeminiAgentNode- particular implementation when the
AgentFunctionis invoked with Gemini LLM (e.g. Gemini Pro 2.5).
- particular implementation when the
- Tracks history of LLM session thus far (which it also uses in tool cycle when doing follow-up request)
- Subtypes
AnthropicAgentNodeandGeminiAgentNodestore and use the SDK-specific types in their internal impl.
- Subtypes
node.get_transcript() -> List[TranscriptPart]- Subtypes must implement; they must convert the SDK-specific types in the transcription they are tracking to the framework-common
TranscriptParts. They never convert types in the reverse direction. - For external observers (UIs, tools), prefer
NodeView.transcript: tuple[TranscriptPart, ...]which is an immutable snapshot captured at publish time.node.get_transcript()returns a copy of the live list and should not be used concurrently from outside the node’s thread.
- Subtypes must implement; they must convert the SDK-specific types in the transcription they are tracking to the framework-common
- Child
Functioninvocations are tracked innode.children: List[Node]property.- Always ordered to reflect the sequence in which
Functions were invoked. - For consumers outside the framework, use
NodeView.children: tuple[NodeView, ...]instead to access child information safely.
- Always ordered to reflect the sequence in which
- Has states (Waiting, Running, Success, Error, Canceled) but also sub-state including tool use (
Functioninvocation) that it is waiting on. AgentNodeis completed once it returns final assistant text or the model decides toRaiseException(if it has been given as an option).TokenUsagecumulative accounting must be reportable by everyAgentNodeand kept up to date throughout the agent loop (updated on every request/response iteration).- Subtype implementations must use the provider SDK's token usage meta to track the accumulation.
CodeNode: represents and manages the state and running of aCodeFunctioninvocation.- Simpler than
AgentFunctionbecause it is just a function call (unlike LLM session complexity). Authors invokeFunctions directly from within theCallable. CodeNodeis completed once it either returns or raises.
- Simpler than
- Fields / Properties:
id: int: monotonically increasing unique identifier when Node is to be used as key in any lookup. This is one and the same as "task id".fn: Function: whichFunctiontheNodeis an instance of.inputs: Dict[str, Any]: What the inputs were for the invocation.outputs: Optional[Any]: What the output(s) were from the run (if finished). Usually just an unstructured string.exception: Optional[Exception]: the exception, if there was an exception.state: NodeState: (Waiting, Running, Success, Error, Canceled) enumchildren: List[Node]: ordered list of childFunctioninvocations made by thisNode.- Note: External consumers should access this information through
NodeViewinstead ofNodedirectly to avoid race conditions. - For agents,
NodeView.usageis a deep-copied snapshot andNodeView.transcriptis an immutable tuple (empty tuple forCodeNode).
TranscriptPart
TranscriptPart represents each of the parts that are common to the model-specific SDKs in concept. The subtypes:
UserTextPartModelTextPartToolUsePartToolResultPartThinkingBlockPart- including both redacted and non-redacted
- includes
signaturefield for thinking block signatures
- On every follow-up call, replay the full history in original order.
Ensemble
- Is a
CodeFunctionthat decorates anyAgentFunctionto do parallel independent invocations followed by reconciliation.- First phase: each
AgentFunctioncall proceeds as normal, with args forwarded for normal user prompt substitution. - Second phase: same system prompt and substituted user prompt; append to the user prompt each of the completions along with a reconciliation instruction.
- First phase: each
- Given any
AgentFunction, mostly users will construct one from built-in factory facility (ctor directly):Ensemble(agent: AgentFunction, instances: Dict[Provider, int], name: Optional[str] = None, reconcile_by: Optional[Provider] = None)instances: how many parallel invocations ofAgentFunctionto do with each model.
- User uses this when defining their
Functions.
- Automatically has a valid inner Callable like any
CodeFunctionthat does the ensembling phases.
SessionBag
- Collection of arbitrary objects that may be read, mutated, and persisted by
Functions. - Each
Nodecreated introduces aSessionBagwith its lifetime. TheNodeand its children can access the bag.- Thus, the
Nodecan also access its parent'sSessionBag, if it has a parent.
- Thus, the
- Each
Nodecan also access theSessionBagof the rootNode. SessionScope: enum of lifetime scopes, each of which would refer to a differentSessionBagthat aNodecan access:TopLevel: Lifetime envelopes all Nodes in a top-level tree. This would give the rootNode's bag.Parent: Lifetime of theNode's parentNode. This would give the parent's bag.Self: Lifetime of theNodeitself. This gives theNodeaccess to its own bag.- The main application of this is for a
Functionto receive results from its children and to act as a scratchpad.
- The main application of this is for a
- Mechanism to do object-oriented programming
Functionoperates on an object and thus can behave like a method.Functioncan accept arguments that refer to objects; pass data betweenNodes by in-memory strong types instead of requiring ser/des or free-form text.
- Mechanism for
Functionto own its own objects and invokeFunctions that read/create/mutate them.- Example: an
AgentFunctionneeds its own persistent Bash session (e.g. process tree, env vars, vars, cwd)- To be used at random points over its lifetime
- Example: launch executables asynchronously (in terminal background; running locally on client); retrieve results later after doing other steps.
- Example: an
RunContextof aNodecarries the references to the three scopes ofSessionBags.- For a root
Node, theTopLevelbag is the same as theSelfbag. Trying to access the non-existentParentbag raisesNoParentSessionError. - For children of the root, the
Parentbag is the same as theTopLevelbag. - Any deeper
Nodes will find the bags of the threeSessionScopes to be different. RunContext.get_or_put(scope: SessionScope, namespace: str, key: str, factory: Callable[[], Any]) -> Any- To simplify, this is the only mechanism to be used by
Functions for access. Concurrency-safe in case of parallelFunctioninvocations. Simplify by invokingfactoryunder the lock since not high-frequency. Functionimplementations should cooperate to use descript namespaces and keys, composed of static string constants and instance numbers if multiplicity is possible.
- To simplify, this is the only mechanism to be used by
Runtimeis responsible for creatingSessionBagwith eachNodeand propagating references to new descendants.
- For a root
- Framework will currently rely on ref counting, garbage collection, and self-disposing object behavior (author responsibility).
Runtimedestruction, or explicit user request to delete a finished tree, will induce disposal of allSessionBag-referenced objects and their resources.- This keeps objects alive long past their usable scope (potential resource leak), but is very worth the debuggability for finished subtrees. We can make this more configurable in the future (e.g. mandatory finalizers and dispose on
Nodecompletion).
Exception Model
- Any
Functioncan raise or bubble up anExceptionat any point while running.- For
CodeFunction, this is just for the vanilla reasons:raise TException(..)in theCallable, e.g. due to contract breakage, bad args, business logic, assertion failure, etc.- Bubble-up: its
Callableinvokes a regular function that in turn raises and theCallableis unable to handle it or recover.
- For
AgentFunction, this is the agent making a proactive intelligent decision that it wants to raise anException.- All the reasons in classical programming, but also:
- Agent is unable to do as directed because:
- lacks context or key knowledge
- lacks the sub-
Functions it needs (leaf tools or sub-agents) due to author error - sub-agent (invoked
AgentFunction) is not behaving as expected on a sub-task - an invoked child
Functionhas raised, and it's unclear how to handle or it's recurring, and there are no alternatives or the alternatives have already been tried.
- Agent is unable to do as directed because:
- An agent may be given guidance on:
- when and which
Exceptions from childrenFunctions to recover from, versus when to bubble them up. - when to decide the given task is unsolvable and give up by raising.
- when and which
- Encourage the agent to declare failure to reduce the rate of hallucination.
- All the reasons in classical programming, but also:
- For
Framework support mechanisms:
- Framework built-in
RaiseException(is-aFunctionsubtype) intended to be provided to agents in theirAgentFunction.usesdefinition, by voluntary opt-in from the agent author.- The spec instructs usage directives like:
- Bubbling up an
Exceptionit can't solve? Include an inner exception type and inner msg inside of themsgarg. - No alternatives worked? Very briefly describe what was tried.
- Missing information or context, don't know how to solve, etc? Describe this very briefly for the caller in case a follow-up attempt could address this.
- Bubbling up an
- Implementation of the
RaiseExceptioncallable is a one-liner: raise theAgentException. (AssumeCodeFunctionnever invokes it).
- The spec instructs usage directives like:
- Differentiation of agent vs. service/infra faults:
AgentException: used when an agent decides to invokeraise_exception(msg)by its own volition, for any reason.- includes: faulting agent's name and instance id (
Node.id).
- includes: faulting agent's name and instance id (
ModelProviderException: used when anAgentNodeimplementation (providers/) fails for any reason.- Always unrelated to the agent's task and never caused by an agent.
- includes: provider class name, name of agent being processed when provider faulted, instance id (
Node.id), inner exception object. - Examples:
- provider
AgentNodemalimplementation (not following protocol; not using SDK correctly) - connection socket broken or can't open
- authentication / configuration
- provider is overloaded, client is being rate-limited, any kind of load shedding or quota issue
- core framework bug (developer regression) e.g. during
ctx.invoke(..). - other faults by the remote provider service
- provider
- Accessing
Node.result()will either return theFunctionoutput (usually a string) if it was successful, or will raise theExceptionfrom that invocation if there is one (similar to Futures in many languages), regardless of the invokedFunction's subtype. - Provider-specific
AgentNodesubtypes shall implement this contract:- When collecting
Functioninvocation results fromctx.invoke(..).result(), expect the possibility of anExceptionbeing raised and always catch it.- Pass on a string representation of the
Exception's type and message (with details but never too verbose and never with stacktrace) back to the LLM in the regular follow-up tool cycle, and flag the fault if the provider's SDK has an explicit field for that. Some LLMs are fine-tuned to pay attention to the error flag but most will understand theExceptionstring properly anyway especially if the detail is present. - Includes
ValueErrorfor built-in argument type checking (LLM can respond by re-trying).
- Pass on a string representation of the
- Implement backoff-retry around SDK
Exceptions that are known to be transient only. - Intercept
RaiseExceptioncalls and usectx.post_exception(e)whereeis an instance ofAgentException. Then exit the run loop. - Allow any other unexpected
Exceptionto bubble pastrun(). The supertypeAgentNodewill wrap it in aModelProviderExceptionwith context. - A batch of parallel tool calls may result in 0, 1, or more of them succeeding or excepting and this is normal.
- When a model issues a batch of tool calls and one of them is RaiseException (unusual), honor the model's intent and propagate
AgentExceptionto end the agent loop after the whole batch is attempted.
- When collecting
CodeFunctionauthors guidance:- Be aware that
Node.result()from invoked functions may raise. - Ensure raisable
Exceptions from the Callable have descript type names and sufficient detail. If bubbling, sometimes this requires try-catch interception just to augment details (e.g. is the error pertaining to an input or output) and then re-raising. - Consider
AgentException: may be difficult to handle statically; consider: retry, change provider. If repeatable, wrap attempts, augment context, and bubble up. - Consider
ModelProviderException: Avoid backoff-retry to prevent multi-layer retry. Log in case of bug. Augment context and re-raise.
- Be aware that
AgentFunctionauthors guidance:- Add the built-in
RaiseExceptionto theAgentFunction.usesproperty to enlist inAgentExceptions. - Provide additional guidance on when to raise, when to bubble up, (or how hard to retry alternatives first) in the system and user prompt. Iterate through trial and error. This will be very specific to the agent's purpose and scope.
- Strongly consider instructing the model to invoke
human_in_loop()before consideringraise_exception().
- Add the built-in
NodeView
NodeView is an immutable, consistent snapshot of a Node and, through reference, its subtree. Use it for observation; do not read live Node fields from UIs or other threads.
-
Fields
children: tuple[NodeView, ...]— immutable ordered childrenusage: Optional[TokenUsage]— deep-copied snapshot for agentstranscript: tuple[TranscriptPart, ...]— immutable transcript snapshot for agents (empty forCodeNode)update_seqnum: int— global sequence when this view was produced- Plus core fields:
id,fn,inputs,state,outputs,exception,started_at,ended_at
-
What triggers a new
NodeView- Node creation and linking into the tree
- Status changes (
post_status_update), success/exception/cancel - Transcript appends (agents call
post_transcript_update()after each append)
-
Consistency model (origin-only live rebuild)
- On each change, the origin node’s
NodeViewis rebuilt from the liveNodeunder a global lock. Each ancestor gets a freshNodeViewby reusing its previous snapshot fields and recomputing onlychildrenfrom current childNodeViews. No live ancestor fields are read. - Implication: an ancestor’s
usage/transcriptreflect the last time that ancestor itself published; child changes do not refresh them. Onlychildrenchanges propagate up.
- On each change, the origin node’s
-
Immutability guarantees
childrenandtranscriptare tuples in the snapshot. Provider code appends immutableTranscriptParts;ToolUsePart.argsare stored as immutable mappings;TokenUsageis deep-copied at snapshot time.
-
Watching for updates
- Use
node.watch(as_of_seq=prev_seq)to block until a newer snapshot is available, then setprev_seq = view.update_seqnumfor the next iteration. runtime.list_toplevel_views()returns a consistent snapshot of all rootNodeViews;runtime.get_view(node_id)fetches the latest view without blocking.
- Use
Deferred Features
This is a bucket list of nice-to-haves.
- Concurrency control
- Limit the number of
AgentNodes in the agent loop concurrently, keyed byProvider.
- Limit the number of
- Replayability, Pausability, Interruptibility, cancellation
- Pre-requisites:
- Restart-ability of tree from the state where any
Nodewas just created. - Serializability of
Nodetree state. - cancellation, Pause
NodeState.
- Restart-ability of tree from the state where any
- Pre-requisites:
NodeState.WaitingOnFunction- Async and Futures
RunContext.invoke()-> return Future and generally use async chaining.
- Streaming SDK usage in model provider
AgentNoderun loops.- For observability, debuggability.
- Compress results into the SDK's full native type blocks after each block is done streaming.
- Fully migrate to Event-Driven Architecture.
- Single event loop per Runtime; remove the per-
Nodethread.
- Single event loop per Runtime; remove the per-
- Smarter caching based on past
Functioninstance statistics. - Cycle & Recursion Prohibition
- Enforce at
Runtimeconstruction. - Ensure each
Functionhas legal references to otherFunctions. - Reject if not a DAG.
- Reject if function type annotations do not match the spec in
CodeFunction.
- Enforce at
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file netflux-0.2.2.tar.gz.
File metadata
- Download URL: netflux-0.2.2.tar.gz
- Upload date:
- Size: 393.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7711ab2c4d23eb3e932e09b0ed8edab025cecfcc6a376686bad8c7280ba9f6cd
|
|
| MD5 |
b04d1270ae4b17aecf8692bc6bda7ac7
|
|
| BLAKE2b-256 |
074a68933c2395c68d2f4985d40302ccea1fc1efed9680d6fea90f4d812a9e30
|
File details
Details for the file netflux-0.2.2-py3-none-any.whl.
File metadata
- Download URL: netflux-0.2.2-py3-none-any.whl
- Upload date:
- Size: 366.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d95be459f29064be65a2e58a050d8789c6e3fc691cab62f494195a89f4d337bf
|
|
| MD5 |
c9fa866988286fe585ab5703854a8e63
|
|
| BLAKE2b-256 |
28dc97ee724da1b839f288c5baf6ce7f858f9a48548d43303fa26e2d131fc337
|