Skip to main content

Execution engine for AI agent graphs — traversal, branching, merging, memory, message passing, and error handling

Project description

quartermaster-engine

Execution engine for AI agent graphs with pluggable storage, dispatching, and memory.

PyPI version Python 3.11+ License: Apache 2.0

Features

  • FlowRunner orchestrates graph execution: traversal, branching, merging, and error handling
  • Pluggable dispatchers: SyncDispatcher, ThreadDispatcher, AsyncDispatcher
  • Pluggable storage: InMemoryStore, SQLiteStore, or implement your own ExecutionStore
  • Two memory layers: FlowMemory (per-execution) and PersistentMemory (cross-execution)
  • Real-time event streaming: NodeStarted, TokenGenerated, NodeFinished, FlowError, UserInputRequired
  • Per-node error strategies: Stop, Retry (with configurable max retries), Skip
  • Flow pause/resume for user interaction nodes
  • Sync and async execution modes with run() and run_async()

Installation

pip install quartermaster-engine

# With SQLite persistent store
pip install quartermaster-engine[sqlite]

Quick Start

Run a Simple Graph

from uuid import uuid4
from quartermaster_engine import FlowRunner, InMemoryStore
from quartermaster_engine.nodes import SimpleNodeRegistry, NodeResult
from quartermaster_graph import GraphSpec, GraphNode, GraphEdge, NodeType

# 1. Define the graph
start_id, process_id, end_id = uuid4(), uuid4(), uuid4()

graph = GraphSpec(
    id=uuid4(),
    agent_id=uuid4(),
    start_node_id=start_id,
    nodes=[
        GraphNode(id=start_id, type=NodeType.START, name="Start"),
        GraphNode(
            id=process_id,
            type=NodeType.INSTRUCTION,
            name="Process",
            metadata={"llm_system_instruction": "Summarize the input.", "llm_model": "gpt-4o"},
        ),
        GraphNode(id=end_id, type=NodeType.END, name="End"),
    ],
    edges=[
        GraphEdge(source_id=start_id, target_id=process_id),
        GraphEdge(source_id=process_id, target_id=end_id),
    ],
)

# 2. Register node executors
registry = SimpleNodeRegistry()
# registry.register("Instruction1", my_instruction_executor)

# 3. Run the flow
runner = FlowRunner(graph=graph, node_registry=registry)
result = runner.run("Please summarize this article about AI safety.")

print(result.success)          # True/False
print(result.final_output)     # The final text output
print(result.duration_seconds)

Using GraphBuilder from quartermaster-graph

from quartermaster_graph import GraphBuilder
from quartermaster_engine import FlowRunner
from quartermaster_engine.nodes import SimpleNodeRegistry

graph = (
    GraphBuilder("Support Agent")
    .start()
    .instruction("Classify", model="gpt-4o")
    .decision("Route", options=["billing", "technical"])
    .on("billing").instruction("Handle billing").end()
    .on("technical").instruction("Handle technical").end()
    .build()
)

registry = SimpleNodeRegistry()
runner = FlowRunner(graph=graph, node_registry=registry)
result = runner.run("I need help with my invoice")

Stream Events in Real Time

from quartermaster_engine import FlowRunner, FlowEvent, NodeStarted, NodeFinished, TokenGenerated, FlowError

def handle_event(event: FlowEvent):
    if isinstance(event, NodeStarted):
        print(f"[START] {event.node_name} ({event.node_type})")
    elif isinstance(event, TokenGenerated):
        print(event.token, end="", flush=True)
    elif isinstance(event, NodeFinished):
        print(f"\n[DONE] Node finished: {event.result[:50]}...")
    elif isinstance(event, FlowError):
        print(f"[ERROR] {event.error} (recoverable={event.recoverable})")

runner = FlowRunner(graph=graph, node_registry=registry, on_event=handle_event)
result = runner.run("Hello!")

Async Execution

import asyncio
from quartermaster_engine import FlowRunner, TokenGenerated

async def run_flow():
    runner = FlowRunner(graph=graph, node_registry=registry)
    async for event in runner.run_async("Hello!"):
        if isinstance(event, TokenGenerated):
            print(event.token, end="", flush=True)

asyncio.run(run_flow())

Quick Execution with run_graph()

For rapid prototyping, use the convenience function:

from quartermaster_engine import run_graph

# Non-interactive (provide input)
run_graph(agent, user_input="Explain quantum computing")

# Interactive (prompts stdin at User nodes)
run_graph(agent)

# Force provider
run_graph(agent, user_input="Hello", provider="openai")

run_graph() handles provider detection, node registration, streaming output, and the pause/resume cycle for interactive User nodes.

API Reference

FlowRunner

The core orchestration class. Accepts a GraphSpec from quartermaster-graph (AgentGraph still works as a deprecated alias).

from quartermaster_engine import FlowRunner
from quartermaster_engine.dispatchers.sync_dispatcher import SyncDispatcher
from quartermaster_engine.messaging.context_manager import ContextManager

runner = FlowRunner(
    graph=spec,                      # GraphSpec from quartermaster-graph
    node_registry=registry,          # Maps node types to executors
    store=InMemoryStore(),           # Execution state storage
    dispatcher=SyncDispatcher(),     # How branches are dispatched
    context_manager=ContextManager(),  # LLM context window management
    on_event=handle_event,           # Real-time event callback
)
Method Description
run(input_message, flow_id=None) -> FlowResult Execute synchronously
run_async(input_message, flow_id=None) -> AsyncIterator[FlowEvent] Execute asynchronously, yielding events
resume(flow_id, user_input) -> FlowResult Resume a paused flow with user input
stop(flow_id) Stop a running flow

FlowResult

Field Type Description
flow_id UUID Unique execution identifier
success bool Whether all nodes completed successfully
final_output str Text output from the End node
output_data dict Structured output data
error str | None Error message if failed
node_results dict[UUID, NodeResult] Per-node results
duration_seconds float Total execution time

Dispatchers

Control how successor nodes are executed.

Dispatcher Description
SyncDispatcher Execute nodes sequentially in the calling thread. Simple and predictable.
ThreadDispatcher(max_workers=4) Execute branches in parallel using a thread pool. Good for I/O-bound nodes.
AsyncDispatcher Execute branches concurrently using asyncio tasks. For async web applications.

All dispatchers implement the TaskDispatcher protocol:

class TaskDispatcher(Protocol):
    def dispatch(self, flow_id, node_id, execute_fn) -> None: ...
    def wait_all(self) -> None: ...
    def shutdown(self) -> None: ...

Execution Stores

Pluggable storage for flow state, memory, and messages.

Store Description
InMemoryStore Dict-backed, no persistence. Great for testing.
SQLiteStore(db_path) SQLite-backed with WAL mode. For local development.

Implement ExecutionStore for custom backends (Redis, PostgreSQL, etc.):

from quartermaster_engine import ExecutionStore

class RedisStore:
    def save_node_execution(self, flow_id, node_id, execution) -> None: ...
    def get_node_execution(self, flow_id, node_id) -> NodeExecution | None: ...
    def get_all_node_executions(self, flow_id) -> dict[UUID, NodeExecution]: ...
    def save_memory(self, flow_id, key, value) -> None: ...
    def get_memory(self, flow_id, key) -> Any: ...
    def get_all_memory(self, flow_id) -> dict[str, Any]: ...
    def delete_memory(self, flow_id, key) -> None: ...
    def save_messages(self, flow_id, node_id, messages) -> None: ...
    def get_messages(self, flow_id, node_id) -> list[Message]: ...
    def append_message(self, flow_id, node_id, message) -> None: ...
    def clear_flow(self, flow_id) -> None: ...

Memory System

FlowMemory -- scoped to a single flow execution:

from quartermaster_engine import FlowMemory, InMemoryStore

store = InMemoryStore()
memory = FlowMemory(flow_id=my_flow_id, store=store)

memory.set("user_name", "Alice")
memory.set("preferences", {"language": "en"})

name = memory.get("user_name")            # "Alice"
all_data = memory.get_all()                # {"user_name": "Alice", ...}
keys = memory.list_keys()                  # ["user_name", "preferences"]
memory.delete("preferences")
memory.clear()

PersistentMemory -- cross-flow memory that survives between executions:

from quartermaster_engine import PersistentMemory, InMemoryPersistence

persistence = InMemoryPersistence()

persistence.write(agent_id, "user_pref", "dark_mode")
value = persistence.read(agent_id, "user_pref")     # "dark_mode"
persistence.update(agent_id, "user_pref", "light_mode")

results = persistence.search(agent_id, "pref")      # Substring search
keys = persistence.list_keys(agent_id)               # ["user_pref"]
persistence.delete(agent_id, "user_pref")

Error Handling

Per-node error strategies configured via GraphNode.error_handling:

Strategy Behavior
ErrorStrategy.STOP Halt entire flow on error (default)
ErrorStrategy.RETRY Retry up to max_retries times with retry_delay backoff
ErrorStrategy.SKIP Skip failed node, continue to successors
from quartermaster_engine.types import GraphNode, NodeType, ErrorStrategy

node = GraphNode(
    type=NodeType.INSTRUCTION,
    name="Unreliable API",
    error_handling=ErrorStrategy.RETRY,
    max_retries=3,
    retry_delay=2.0,
    timeout=30.0,
)

Events

Real-time events emitted during flow execution:

Event Fields Description
NodeStarted flow_id, node_id, node_type, node_name Node begins execution
TokenGenerated flow_id, node_id, token Streaming token from LLM
NodeFinished flow_id, node_id, result, output_data Node completed
FlowFinished flow_id, final_output, output_data Entire flow completed
UserInputRequired flow_id, node_id, prompt, options Flow paused for user input
FlowError flow_id, node_id, error, recoverable Node failed

run_graph() uses streaming by default -- TokenGenerated events are printed as they arrive, giving real-time output from LLM nodes without extra setup.

ExecutionContext

The runtime context passed to each node executor:

Field Type Description
flow_id UUID Flow execution identifier
node_id UUID Current node identifier
graph GraphSpec Full graph definition
current_node GraphNode Current node definition
messages list[Message] Conversation history
memory dict[str, Any] Flow-scoped memory snapshot
metadata dict[str, Any] Node metadata
on_token Callable[[str], None] | None Callback for streaming tokens

Helper methods:

Method Description
get_meta(key, default=None) Get value from node metadata, falling back to context metadata
set_meta(key, value) Set a metadata value on this context
emit_token(token) Emit a streaming token via callback
emit_message(content) Emit a complete message via callback

Node Execution Protocol

Implement NodeExecutor to add custom node types:

from quartermaster_engine.nodes import NodeExecutor, NodeResult
from quartermaster_engine.context.execution_context import ExecutionContext

class MyInstructionExecutor:
    async def execute(self, context: ExecutionContext) -> NodeResult:
        system_instruction = context.get_meta("llm_system_instruction", "")
        # Call your LLM here...
        response_text = "Generated response"

        # Stream tokens in real time
        for token in response_text.split():
            context.emit_token(token + " ")

        return NodeResult(
            success=True,
            data={"model": "gpt-4o"},
            output_text=response_text,
        )

Register executors with SimpleNodeRegistry:

from quartermaster_engine.nodes import SimpleNodeRegistry

registry = SimpleNodeRegistry()
registry.register("Instruction1", MyInstructionExecutor())
registry.register("Decision1", MyDecisionExecutor())

# List registered types
registry.list_types()  # ["Instruction1", "Decision1"]

NodeResult

Returned by node executors after execution:

Field Type Description
success bool Whether execution succeeded
data dict[str, Any] Structured output data
error str | None Error message if failed
picked_node str | None For decision nodes: which successor to trigger
output_text str | None Main text output
wait_for_user bool If True, flow pauses for user input
user_prompt str | None Prompt to show the user
user_options list[str] | None Options for user selection

Configuration

SQLiteStore

from quartermaster_engine.stores.sqlite_store import SQLiteStore

store = SQLiteStore(db_path="my_agent.db")
runner = FlowRunner(graph=graph, node_registry=registry, store=store)

Tables are created automatically on first use. Uses WAL mode for concurrent read access.

ThreadDispatcher

from quartermaster_engine.dispatchers.thread_dispatcher import ThreadDispatcher

dispatcher = ThreadDispatcher(max_workers=8)
runner = FlowRunner(graph=graph, node_registry=registry, dispatcher=dispatcher)
result = runner.run("Process this in parallel")
dispatcher.shutdown()  # Clean up thread pool

Contributing

See CONTRIBUTING.md for guidelines.

License

Apache License 2.0 -- see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quartermaster_engine-0.1.2.tar.gz (70.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

quartermaster_engine-0.1.2-py3-none-any.whl (45.7 kB view details)

Uploaded Python 3

File details

Details for the file quartermaster_engine-0.1.2.tar.gz.

File metadata

  • Download URL: quartermaster_engine-0.1.2.tar.gz
  • Upload date:
  • Size: 70.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for quartermaster_engine-0.1.2.tar.gz
Algorithm Hash digest
SHA256 87c0c9a811184f671a02021bae39d9b5559048d54319aa84c7df4a576f335828
MD5 b93f8dc48259f3f75b2a96fe5a065cf0
BLAKE2b-256 20b32cb6aeb0039e954b5520377521a15acfcfd5c95b091e5a2f71c7f3a35bd1

See more details on using hashes here.

File details

Details for the file quartermaster_engine-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for quartermaster_engine-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6faedd07c66047d908c4f44dfae7270442ba0a6a8493b251db8e6df4fdb14cc7
MD5 e6d3b582c769d330402772dc19d7d27d
BLAKE2b-256 90b381236669163f30d96b3fc6ddbbd585b042a00a87e078c40908c10a5f1801

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page