Skip to main content

A lightweight, event-driven multi-agent framework for embodied AI systems

Project description

FastMind ๐Ÿง 

A lightweight, event-driven multi-agent framework for embodied AI systems.

PyPI version Python License: GPL-3.0

Features

  • FastAPI-like Decorators: Familiar @app.agent, @app.tool, @app.perception syntax, easy to learn
  • State Graph Architecture: Build agent workflows like flowcharts, not nested loops
  • Event-Driven: Asyncio-based, zero polling, high-performance async execution
  • Built-in Streaming: Real-time streaming output with backpressure control
  • Human-in-the-Loop: Interrupt and resume sessions for human approval
  • Perception Loops: Native support for sensors, timers, and external triggers
  • Tool Calling: ReAct-style agent-tool-agent loops
  • Session Isolation: Multi-user support with isolated session state
  • Lightweight: ~8000 lines, no big dependencies

Installation

From PyPI (Recommended)

pip install fastmind

From GitHub

pip install git+https://github.com/kandada/fastmind.git

With examples:

pip install git+https://github.com/kandada/fastmind.git#egg=fastmind[examples]

For development:

git clone https://github.com/kandada/fastmind.git
cd fastmind
pip install -e ".[all]"

Quick Start

from fastmind import FastMind, Graph, Event
from fastmind.contrib import FastMindAPI

app = FastMind()

@app.agent(name="chat_agent")
async def chat_agent(state: dict, event: Event) -> dict:
    state.setdefault("messages", [])
    state["messages"].append({"role": "user", "content": event.payload.get("text", "")})
    # Your LLM call here
    state["messages"].append({"role": "assistant", "content": "Hello!"})
    return state

graph = Graph()
graph.add_node("chat", chat_agent)
graph.set_entry_point("chat")
app.register_graph("main", graph)

async def main():
    api = FastMindAPI(app)
    await api.start()
    await api.push_event("user_001", Event("user.message", {"text": "Hello!"}, "user_001"))
    await api.stop()

import asyncio
asyncio.run(main())

Core Concepts

State

A dict-like container for session data shared across nodes:

state["messages"].append({"role": "user", "content": "Hello"})

Node

An async function that processes events and returns updated state:

async def my_node(state: dict, event: Event) -> dict:
    state["processed"] = True
    return state

Graph

A collection of nodes and edges defining your workflow:

graph = Graph()
graph.add_node("agent", chat_agent)
graph.add_edge("agent", "tool_node")
graph.set_entry_point("agent")

Event

External or internal triggers that drive graph execution:

event = Event(type="user.message", payload={"text": "Hello"}, session_id="user_001")

Streaming Output

Real-time streaming with zero polling:

@app.agent(name="chat_agent", stream=True)
async def chat_agent(state: dict, event: Event) -> dict:
    output_queue = state["_output_queue"]
    session_id = state["_session_id"]
    
    async def stream_llm():
        for chunk in llm_stream():
            for char in chunk:
                output_queue.put_nowait(Event(
                    type="stream.chunk",
                    payload={"delta": char},
                    session_id=session_id
                ))
                await asyncio.sleep(0.03)
        output_queue.put_nowait(Event(type="stream.end", payload={}, session_id=session_id))
    
    asyncio.create_task(stream_llm())
    return state

Human-in-the-Loop

Interrupt and resume for human approval:

@app.agent(name="order_agent")
async def order_agent(state: dict, event: Event) -> dict:
    state.setdefault("orders", [])
    amount = event.payload.get("amount", 0)
    state["orders"].append({"amount": amount, "status": "pending"})
    if amount > 1000:
        state["need_approval"] = True
    return state

async def approve_node(state: dict, event: Event) -> tuple[dict, list[Event]]:
    return state, [Event(
        type="interrupt",
        payload={"prompt": "Approve this transaction?", "resume_node": "confirm"},
        session_id=event.session_id
    )]

async def confirm_node(state: dict, event: Event) -> dict:
    if state.get("orders"):
        state["orders"][-1]["status"] = "confirmed"
    return state

async def reject_node(state: dict, event: Event) -> dict:
    if state.get("orders"):
        state["orders"][-1]["status"] = "rejected"
    return state

graph = Graph()
graph.add_node("order", order_agent)
graph.add_node("approve", approve_node)
graph.add_node("confirm", confirm_node)
graph.add_node("reject", reject_node)

graph.add_edge("order", "approve", condition=lambda s: s.get("need_approval"))
graph.add_edge("approve", "confirm")
graph.add_edge("approve", "reject")
graph.set_entry_point("order")

app.register_graph("main", graph)

Handle the interrupt in your application:

async def main():
    api = FastMindAPI(app)
    await api.start()
    
    event = Event("user.message", {"amount": 2000}, "user_001")
    await api.push_event("user_001", event)
    
    async for ev in api.stream_events("user_001"):
        if ev.type == "interrupt":
            print(f"Interrupt: {ev.payload['prompt']}")
            await api.resume_session("user_001", "confirm")  # or "reject"

Perception Loop

React to sensors, timers, and external events:

@app.perception(interval=5.0, name="sensor_monitor")
async def sensor_monitor(app: FastMind):
    while True:
        data = await read_sensor()
        yield Event(type="sensor.data", payload=data, session_id="system")
        await asyncio.sleep(5.0)

Tool Calling (ReAct)

@app.tool(name="get_weather", description="Get weather")
async def get_weather(city: str) -> str:
    return f"{city} is sunny"

from fastmind import ToolNode

tool_node = ToolNode(app.get_tools())

def has_tool_calls(state: dict, event: Event) -> str:
    return "tools" if state.get("tool_calls") else None

graph.add_conditional_edges("agent", has_tool_calls, {None: "__end__"})
graph.add_edge("tools", "agent")

Examples

Example Description
simple_chat.py Basic chat agent
simple_chat_with_tool.py Agent with tool calling (ReAct)
streaming_chat.py Real-time streaming output
human_in_loop.py Human approval workflow
perception_loop.py Sensor data processing
drone.py Timer-based perception
companion_bot.py Multi-agent conversation
humanoid_robot.py Multi-tool collaboration
sleep_assessment.py Multi-state HITL flow
comprehensive_assistant.py Full-featured assistant

Run an example:

python -m fastmind.examples.simple_chat

API Reference

FastMindAPI

api = FastMindAPI(app)

await api.start()                    # Start engine and perception loops
await api.push_event(session_id, event)  # Push event to session
async for ev in api.stream_events(session_id):  # Stream output events
    print(ev)
await api.stop()                     # Stop engine

Session

session = api.get_session(session_id)
state = session.state                 # Get session state
await session.wait_for_output(timeout=5.0)  # Wait for output event

Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      FastMindAPI                         โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚ PerceptionLoop   โ”‚โ”€โ”€โ”€โ–ถโ”‚        Engine              โ”‚  โ”‚
โ”‚  โ”‚ Scheduler       โ”‚    โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚  โ”‚ Session (per user)   โ”‚  โ”‚  โ”‚
โ”‚                         โ”‚  โ”‚  โ”œโ”€ State           โ”‚  โ”‚  โ”‚
โ”‚                         โ”‚  โ”‚  โ”œโ”€ Event Queue     โ”‚  โ”‚  โ”‚
โ”‚                         โ”‚  โ”‚  โ””โ”€ Output Queue    โ”‚  โ”‚  โ”‚
โ”‚                         โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚  โ”‚
โ”‚                         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Testing

pip install fastmind[dev]
pytest tests/ -v

Changelog

v0.1.3

  • Bug Fix: Fixed stream_events timeout issue when agent returns no output events
  • Improvement: Enhanced debug logging in engine for better observability
  • Improvement: Added _has_conditional_edges() helper method to Graph class
  • Tests: Added comprehensive test suite for ReAct loops and node execution protection

v0.1.2

  • Initial release

License

GPL-3.0 License - see LICENSE for details.

Acknowledgments

Inspired by LangGraph for the state graph architecture design.

Author

xiefujin email:490021684@qq.com

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastmind-0.1.7.tar.gz (20.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastmind-0.1.7-py3-none-any.whl (30.1 kB view details)

Uploaded Python 3

File details

Details for the file fastmind-0.1.7.tar.gz.

File metadata

  • Download URL: fastmind-0.1.7.tar.gz
  • Upload date:
  • Size: 20.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.6

File hashes

Hashes for fastmind-0.1.7.tar.gz
Algorithm Hash digest
SHA256 d7fc575a32298cff46ff42ee1e3b0fdd567c1d433c7769081c5c2de97c810c9f
MD5 761172e1c36729859b294c605186282c
BLAKE2b-256 59fbcaa1d6578c92ea6f001fc7cd2821247f06b00e5db5d25812b994e2fd45c8

See more details on using hashes here.

File details

Details for the file fastmind-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: fastmind-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 30.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.6

File hashes

Hashes for fastmind-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 2d8161fdc512ab9beb82defd8be6d012aa2d6a7261121fb762c53fad60d34673
MD5 5888376dd10d12a28a71ca28ffb187f9
BLAKE2b-256 1fbc3541a08aedd453e9a4d62efd001c1c12996cfb55ebd561feac50c355b3e2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page