Skip to main content

Execution engine for AI agent graphs — traversal, branching, merging, memory, message passing, and error handling

Project description

quartermaster-engine

Execution engine for AI agent graphs with pluggable storage, dispatching, and memory.

PyPI version Python 3.11+ License: Apache 2.0

Features

  • FlowRunner orchestrates graph execution: traversal, branching, merging, and error handling
  • Pluggable dispatchers: SyncDispatcher, ThreadDispatcher, AsyncDispatcher
  • Pluggable storage: InMemoryStore, SQLiteStore, or implement your own ExecutionStore
  • Two memory layers: FlowMemory (per-execution) and PersistentMemory (cross-execution)
  • Real-time event streaming: NodeStarted, TokenGenerated, NodeFinished, FlowError, UserInputRequired
  • Per-node error strategies: Stop, Retry (with configurable max retries), Skip
  • Flow pause/resume for user interaction nodes
  • Sync and async execution modes with run() and run_async()

Installation

pip install quartermaster-engine

# With SQLite persistent store
pip install quartermaster-engine[sqlite]

Quick Start

High-level path — provider_registry

The simplest way to run a graph: hand FlowRunner a provider registry and let it build the default node registry (covering every node type the bundled DSL emits — including AgentExecutor for agent() nodes with the canonical Quartermaster tool loop):

from quartermaster_engine import FlowRunner
from quartermaster_graph import Graph
from quartermaster_providers import register_local

provider_registry = register_local(
    "ollama",
    base_url="http://localhost:11434",   # or set $OLLAMA_HOST
    default_model="gemma4:26b",
)

graph = Graph("chat").start().user().agent().end().build()
runner = FlowRunner(graph=graph, provider_registry=provider_registry)
result = runner.run("Pozdravljen!")
print(result.success, result.final_output)

If you have a quartermaster_tools.ToolRegistry, pass it as tool_registry= so AgentExecutor can actually execute the tools your graph's .agent(tools=[...]) nodes request.

Low-level path — bring your own node_registry

For full control over which executor handles each node type (custom executors, alternative storage, etc.) hand FlowRunner a SimpleNodeRegistry directly. Use the helper build_default_registry(provider_registry) if you only want to swap one executor; pass the result back into FlowRunner(node_registry=...).

from uuid import uuid4
from quartermaster_engine import FlowRunner, InMemoryStore
from quartermaster_engine.nodes import SimpleNodeRegistry, NodeResult
from quartermaster_graph import GraphSpec, GraphNode, GraphEdge, NodeType

# 1. Define the graph
start_id, process_id, end_id = uuid4(), uuid4(), uuid4()

graph = GraphSpec(
    id=uuid4(),
    agent_id=uuid4(),
    start_node_id=start_id,
    nodes=[
        GraphNode(id=start_id, type=NodeType.START, name="Start"),
        GraphNode(
            id=process_id,
            type=NodeType.INSTRUCTION,
            name="Process",
            metadata={"llm_system_instruction": "Summarize the input.", "llm_model": "gpt-4o"},
        ),
        GraphNode(id=end_id, type=NodeType.END, name="End"),
    ],
    edges=[
        GraphEdge(source_id=start_id, target_id=process_id),
        GraphEdge(source_id=process_id, target_id=end_id),
    ],
)

# 2. Register node executors
registry = SimpleNodeRegistry()
# registry.register("Instruction1", my_instruction_executor)

# 3. Run the flow
runner = FlowRunner(graph=graph, node_registry=registry)
result = runner.run("Please summarize this article about AI safety.")

print(result.success)          # True/False
print(result.final_output)     # The final text output
print(result.duration_seconds)

Using GraphBuilder from quartermaster-graph

from quartermaster_graph import GraphBuilder
from quartermaster_engine import FlowRunner
from quartermaster_engine.nodes import SimpleNodeRegistry

graph = (
    GraphBuilder("Support Agent")
    .start()
    .instruction("Classify", model="gpt-4o")
    .decision("Route", options=["billing", "technical"])
    .on("billing").instruction("Handle billing").end()
    .on("technical").instruction("Handle technical").end()
    .build()
)

registry = SimpleNodeRegistry()
runner = FlowRunner(graph=graph, node_registry=registry)
result = runner.run("I need help with my invoice")

Stream Events in Real Time

from quartermaster_engine import FlowRunner, FlowEvent, NodeStarted, NodeFinished, TokenGenerated, FlowError

def handle_event(event: FlowEvent):
    if isinstance(event, NodeStarted):
        print(f"[START] {event.node_name} ({event.node_type})")
    elif isinstance(event, TokenGenerated):
        print(event.token, end="", flush=True)
    elif isinstance(event, NodeFinished):
        print(f"\n[DONE] Node finished: {event.result[:50]}...")
    elif isinstance(event, FlowError):
        print(f"[ERROR] {event.error} (recoverable={event.recoverable})")

runner = FlowRunner(graph=graph, node_registry=registry, on_event=handle_event)
result = runner.run("Hello!")

Async Execution

import asyncio
from quartermaster_engine import FlowRunner, TokenGenerated

async def run_flow():
    runner = FlowRunner(graph=graph, node_registry=registry)
    async for event in runner.run_async("Hello!"):
        if isinstance(event, TokenGenerated):
            print(event.token, end="", flush=True)

asyncio.run(run_flow())

Quick Execution with run_graph()

For rapid prototyping, use the convenience function:

from quartermaster_engine import run_graph

# Non-interactive (provide input)
run_graph(agent, user_input="Explain quantum computing")

# Interactive (prompts stdin at User nodes)
run_graph(agent)

# Force provider
run_graph(agent, user_input="Hello", provider="openai")

run_graph() handles provider detection, node registration, streaming output, and the pause/resume cycle for interactive User nodes.

API Reference

FlowRunner

The core orchestration class. Accepts a GraphSpec from quartermaster-graph (AgentGraph still works as a deprecated alias).

from quartermaster_engine import FlowRunner
from quartermaster_engine.dispatchers.sync_dispatcher import SyncDispatcher
from quartermaster_engine.messaging.context_manager import ContextManager

runner = FlowRunner(
    graph=spec,                      # GraphSpec from quartermaster-graph
    node_registry=registry,          # Maps node types to executors
    store=InMemoryStore(),           # Execution state storage
    dispatcher=SyncDispatcher(),     # How branches are dispatched
    context_manager=ContextManager(),  # LLM context window management
    on_event=handle_event,           # Real-time event callback
)
Method Description
run(input_message, flow_id=None) -> FlowResult Execute synchronously
run_async(input_message, flow_id=None) -> AsyncIterator[FlowEvent] Execute asynchronously, yielding events
resume(flow_id, user_input) -> FlowResult Resume a paused flow with user input
stop(flow_id) Stop a running flow

FlowResult

Field Type Description
flow_id UUID Unique execution identifier
success bool Whether all nodes completed successfully
final_output str Text output from the End node
output_data dict Structured output data
error str | None Error message if failed
node_results dict[UUID, NodeResult] Per-node results
duration_seconds float Total execution time

Dispatchers

Control how successor nodes are executed.

Dispatcher Description
SyncDispatcher Execute nodes sequentially in the calling thread. Simple and predictable.
ThreadDispatcher(max_workers=4) Execute branches in parallel using a thread pool. Good for I/O-bound nodes.
AsyncDispatcher Execute branches concurrently using asyncio tasks. For async web applications.

All dispatchers implement the TaskDispatcher protocol:

class TaskDispatcher(Protocol):
    def dispatch(self, flow_id, node_id, execute_fn) -> None: ...
    def wait_all(self) -> None: ...
    def shutdown(self) -> None: ...

Execution Stores

Pluggable storage for flow state, memory, and messages.

Store Description
InMemoryStore Dict-backed, no persistence. Great for testing.
SQLiteStore(db_path) SQLite-backed with WAL mode. For local development.

Implement ExecutionStore for custom backends (Redis, PostgreSQL, etc.):

from quartermaster_engine import ExecutionStore

class RedisStore:
    def save_node_execution(self, flow_id, node_id, execution) -> None: ...
    def get_node_execution(self, flow_id, node_id) -> NodeExecution | None: ...
    def get_all_node_executions(self, flow_id) -> dict[UUID, NodeExecution]: ...
    def save_memory(self, flow_id, key, value) -> None: ...
    def get_memory(self, flow_id, key) -> Any: ...
    def get_all_memory(self, flow_id) -> dict[str, Any]: ...
    def delete_memory(self, flow_id, key) -> None: ...
    def save_messages(self, flow_id, node_id, messages) -> None: ...
    def get_messages(self, flow_id, node_id) -> list[Message]: ...
    def append_message(self, flow_id, node_id, message) -> None: ...
    def clear_flow(self, flow_id) -> None: ...

Memory System

FlowMemory -- scoped to a single flow execution:

from quartermaster_engine import FlowMemory, InMemoryStore

store = InMemoryStore()
memory = FlowMemory(flow_id=my_flow_id, store=store)

memory.set("user_name", "Alice")
memory.set("preferences", {"language": "en"})

name = memory.get("user_name")            # "Alice"
all_data = memory.get_all()                # {"user_name": "Alice", ...}
keys = memory.list_keys()                  # ["user_name", "preferences"]
memory.delete("preferences")
memory.clear()

PersistentMemory -- cross-flow memory that survives between executions:

from quartermaster_engine import PersistentMemory, InMemoryPersistence

persistence = InMemoryPersistence()

persistence.write(agent_id, "user_pref", "dark_mode")
value = persistence.read(agent_id, "user_pref")     # "dark_mode"
persistence.update(agent_id, "user_pref", "light_mode")

results = persistence.search(agent_id, "pref")      # Substring search
keys = persistence.list_keys(agent_id)               # ["user_pref"]
persistence.delete(agent_id, "user_pref")

Error Handling

Per-node error strategies configured via GraphNode.error_handling:

Strategy Behavior
ErrorStrategy.STOP Halt entire flow on error (default)
ErrorStrategy.RETRY Retry up to max_retries times with retry_delay backoff
ErrorStrategy.SKIP Skip failed node, continue to successors
from quartermaster_engine.types import GraphNode, NodeType, ErrorStrategy

node = GraphNode(
    type=NodeType.INSTRUCTION,
    name="Unreliable API",
    error_handling=ErrorStrategy.RETRY,
    max_retries=3,
    retry_delay=2.0,
    timeout=30.0,
)

Events

Real-time events emitted during flow execution:

Event Fields Description
NodeStarted flow_id, node_id, node_type, node_name Node begins execution
TokenGenerated flow_id, node_id, token Streaming token from LLM
NodeFinished flow_id, node_id, result, output_data Node completed
FlowFinished flow_id, final_output, output_data Entire flow completed
UserInputRequired flow_id, node_id, prompt, options Flow paused for user input
FlowError flow_id, node_id, error, recoverable Node failed

run_graph() uses streaming by default -- TokenGenerated events are printed as they arrive, giving real-time output from LLM nodes without extra setup.

ExecutionContext

The runtime context passed to each node executor:

Field Type Description
flow_id UUID Flow execution identifier
node_id UUID Current node identifier
graph GraphSpec Full graph definition
current_node GraphNode Current node definition
messages list[Message] Conversation history
memory dict[str, Any] Flow-scoped memory snapshot
metadata dict[str, Any] Node metadata
on_token Callable[[str], None] | None Callback for streaming tokens

Helper methods:

Method Description
get_meta(key, default=None) Get value from node metadata, falling back to context metadata
set_meta(key, value) Set a metadata value on this context
emit_token(token) Emit a streaming token via callback
emit_message(content) Emit a complete message via callback

Node Execution Protocol

Implement NodeExecutor to add custom node types:

from quartermaster_engine.nodes import NodeExecutor, NodeResult
from quartermaster_engine.context.execution_context import ExecutionContext

class MyInstructionExecutor:
    async def execute(self, context: ExecutionContext) -> NodeResult:
        system_instruction = context.get_meta("llm_system_instruction", "")
        # Call your LLM here...
        response_text = "Generated response"

        # Stream tokens in real time
        for token in response_text.split():
            context.emit_token(token + " ")

        return NodeResult(
            success=True,
            data={"model": "gpt-4o"},
            output_text=response_text,
        )

Register executors with SimpleNodeRegistry:

from quartermaster_engine.nodes import SimpleNodeRegistry

registry = SimpleNodeRegistry()
registry.register("Instruction1", MyInstructionExecutor())
registry.register("Decision1", MyDecisionExecutor())

# List registered types
registry.list_types()  # ["Instruction1", "Decision1"]

NodeResult

Returned by node executors after execution:

Field Type Description
success bool Whether execution succeeded
data dict[str, Any] Structured output data
error str | None Error message if failed
picked_node str | None For decision nodes: which successor to trigger
output_text str | None Main text output
wait_for_user bool If True, flow pauses for user input
user_prompt str | None Prompt to show the user
user_options list[str] | None Options for user selection

Configuration

SQLiteStore

from quartermaster_engine.stores.sqlite_store import SQLiteStore

store = SQLiteStore(db_path="my_agent.db")
runner = FlowRunner(graph=graph, node_registry=registry, store=store)

Tables are created automatically on first use. Uses WAL mode for concurrent read access.

ThreadDispatcher

from quartermaster_engine.dispatchers.thread_dispatcher import ThreadDispatcher

dispatcher = ThreadDispatcher(max_workers=8)
runner = FlowRunner(graph=graph, node_registry=registry, dispatcher=dispatcher)
result = runner.run("Process this in parallel")
dispatcher.shutdown()  # Clean up thread pool

Contributing

See CONTRIBUTING.md for guidelines.

License

Apache License 2.0 -- see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quartermaster_engine-0.2.1.tar.gz (83.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

quartermaster_engine-0.2.1-py3-none-any.whl (55.2 kB view details)

Uploaded Python 3

File details

Details for the file quartermaster_engine-0.2.1.tar.gz.

File metadata

  • Download URL: quartermaster_engine-0.2.1.tar.gz
  • Upload date:
  • Size: 83.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for quartermaster_engine-0.2.1.tar.gz
Algorithm Hash digest
SHA256 fdbce4096f69251fb439f6690d48867be6cc233fe546a6bb149e7b554cdd2834
MD5 f7e10f6754b5e07df6934ac2324baee0
BLAKE2b-256 bf1cba12b54c4231887069837d5d79e010b7a74b2eb72dbfab9e14c4095c4cd6

See more details on using hashes here.

File details

Details for the file quartermaster_engine-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for quartermaster_engine-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6b93dd0670a7d5e53a2f582b0198b60d43901926321cd45c00c8377b4a5ee133
MD5 7643f511a987a46f267c72614fdc9b84
BLAKE2b-256 9af5bab4fe74fab13aaf152cc2b7cb31da74dad75d5e645aeaacf3e785b7643b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page