A lightweight, event-driven multi-agent framework for embodied AI systems
Project description
FastMind ๐ง
A lightweight, event-driven multi-agent framework for embodied AI systems.
Features
- FastAPI-like Decorators: Familiar
@app.agent,@app.tool,@app.perceptionsyntax, easy to learn - State Graph Architecture: Build agent workflows like flowcharts, not nested loops
- Event-Driven: Asyncio-based, zero polling, high-performance async execution
- Built-in Streaming: Real-time streaming output with backpressure control
- Human-in-the-Loop: Interrupt and resume sessions for human approval
- Perception Loops: Native support for sensors, timers, and external triggers
- Tool Calling: ReAct-style agent-tool-agent loops
- Session Isolation: Multi-user support with isolated session state
- Lightweight: ~8000 lines, no big dependencies
Installation
From PyPI (Recommended)
pip install fastmind
From GitHub
pip install git+https://github.com/kandada/fastmind.git
With examples:
pip install git+https://github.com/kandada/fastmind.git#egg=fastmind[examples]
For development:
git clone https://github.com/kandada/fastmind.git
cd fastmind
pip install -e ".[all]"
Quick Start
from fastmind import FastMind, Graph, Event
from fastmind.contrib import FastMindAPI
app = FastMind()
@app.agent(name="chat_agent")
async def chat_agent(state: dict, event: Event) -> dict:
state.setdefault("messages", [])
state["messages"].append({"role": "user", "content": event.payload.get("text", "")})
# Your LLM call here
state["messages"].append({"role": "assistant", "content": "Hello!"})
return state
graph = Graph()
graph.add_node("chat", chat_agent)
graph.set_entry_point("chat")
app.register_graph("main", graph)
async def main():
api = FastMindAPI(app)
await api.start()
await api.push_event("user_001", Event("user.message", {"text": "Hello!"}, "user_001"))
await api.stop()
import asyncio
asyncio.run(main())
Core Concepts
State
A dict-like container for session data shared across nodes:
state["messages"].append({"role": "user", "content": "Hello"})
Node
An async function that processes events and returns updated state:
async def my_node(state: dict, event: Event) -> dict:
state["processed"] = True
return state
Graph
A collection of nodes and edges defining your workflow:
graph = Graph()
graph.add_node("agent", chat_agent)
graph.add_edge("agent", "tool_node")
graph.set_entry_point("agent")
Event
External or internal triggers that drive graph execution:
event = Event(type="user.message", payload={"text": "Hello"}, session_id="user_001")
Streaming Output
Real-time streaming with zero polling:
@app.agent(name="chat_agent", stream=True)
async def chat_agent(state: dict, event: Event) -> dict:
output_queue = state["_output_queue"]
session_id = state["_session_id"]
async def stream_llm():
for chunk in llm_stream():
for char in chunk:
output_queue.put_nowait(Event(
type="stream.chunk",
payload={"delta": char},
session_id=session_id
))
await asyncio.sleep(0.03)
output_queue.put_nowait(Event(type="stream.end", payload={}, session_id=session_id))
asyncio.create_task(stream_llm())
return state
Human-in-the-Loop
Interrupt and resume for human approval:
@app.agent(name="order_agent")
async def order_agent(state: dict, event: Event) -> dict:
state.setdefault("orders", [])
amount = event.payload.get("amount", 0)
state["orders"].append({"amount": amount, "status": "pending"})
if amount > 1000:
state["need_approval"] = True
return state
async def approve_node(state: dict, event: Event) -> tuple[dict, list[Event]]:
return state, [Event(
type="interrupt",
payload={"prompt": "Approve this transaction?", "resume_node": "confirm"},
session_id=event.session_id
)]
async def confirm_node(state: dict, event: Event) -> dict:
if state.get("orders"):
state["orders"][-1]["status"] = "confirmed"
return state
async def reject_node(state: dict, event: Event) -> dict:
if state.get("orders"):
state["orders"][-1]["status"] = "rejected"
return state
graph = Graph()
graph.add_node("order", order_agent)
graph.add_node("approve", approve_node)
graph.add_node("confirm", confirm_node)
graph.add_node("reject", reject_node)
graph.add_edge("order", "approve", condition=lambda s: s.get("need_approval"))
graph.add_edge("approve", "confirm")
graph.add_edge("approve", "reject")
graph.set_entry_point("order")
app.register_graph("main", graph)
Handle the interrupt in your application:
async def main():
api = FastMindAPI(app)
await api.start()
event = Event("user.message", {"amount": 2000}, "user_001")
await api.push_event("user_001", event)
async for ev in api.stream_events("user_001"):
if ev.type == "interrupt":
print(f"Interrupt: {ev.payload['prompt']}")
await api.resume_session("user_001", "confirm") # or "reject"
Perception Loop
React to sensors, timers, and external events:
@app.perception(interval=5.0, name="sensor_monitor")
async def sensor_monitor(app: FastMind):
while True:
data = await read_sensor()
yield Event(type="sensor.data", payload=data, session_id="system")
await asyncio.sleep(5.0)
Tool Calling (ReAct)
@app.tool(name="get_weather", description="Get weather")
async def get_weather(city: str) -> str:
return f"{city} is sunny"
from fastmind import ToolNode
tool_node = ToolNode(app.get_tools())
def has_tool_calls(state: dict, event: Event) -> str:
return "tools" if state.get("tool_calls") else None
graph.add_conditional_edges("agent", has_tool_calls, {None: "__end__"})
graph.add_edge("tools", "agent")
Examples
| Example | Description |
|---|---|
| simple_chat.py | Basic chat agent |
| simple_chat_with_tool.py | Agent with tool calling (ReAct) |
| streaming_chat.py | Real-time streaming output |
| human_in_loop.py | Human approval workflow |
| perception_loop.py | Sensor data processing |
| drone.py | Timer-based perception |
| companion_bot.py | Multi-agent conversation |
| humanoid_robot.py | Multi-tool collaboration |
| sleep_assessment.py | Multi-state HITL flow |
| comprehensive_assistant.py | Full-featured assistant |
Run an example:
python -m fastmind.examples.simple_chat
API Reference
FastMindAPI
api = FastMindAPI(app)
await api.start() # Start engine and perception loops
await api.push_event(session_id, event) # Push event to session
async for ev in api.stream_events(session_id): # Stream output events
print(ev)
await api.stop() # Stop engine
Session
session = api.get_session(session_id)
state = session.state # Get session state
await session.wait_for_output(timeout=5.0) # Wait for output event
Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ FastMindAPI โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ PerceptionLoop โโโโโถโ Engine โ โ
โ โ Scheduler โ โ โโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโ โ โ Session (per user) โ โ โ
โ โ โ โโ State โ โ โ
โ โ โ โโ Event Queue โ โ โ
โ โ โ โโ Output Queue โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Testing
pip install fastmind[dev]
pytest tests/ -v
Changelog
v0.1.3
- Bug Fix: Fixed
stream_eventstimeout issue when agent returns no output events - Improvement: Enhanced debug logging in engine for better observability
- Improvement: Added
_has_conditional_edges()helper method to Graph class - Tests: Added comprehensive test suite for ReAct loops and node execution protection
v0.1.2
- Initial release
License
GPL-3.0 License - see LICENSE for details.
Acknowledgments
Inspired by LangGraph for the state graph architecture design.
Author
Links
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastmind-0.1.6.tar.gz.
File metadata
- Download URL: fastmind-0.1.6.tar.gz
- Upload date:
- Size: 20.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f5157168812657fcd7d9ef9d885a148f363a767fd6fec7478d6c90eaafddabd
|
|
| MD5 |
acf68080f6d59b598259b77c035a52dc
|
|
| BLAKE2b-256 |
4297eec2f9da9348c48942ef99eb043a43bbaa70d47edbde693c04ccb4410d02
|
File details
Details for the file fastmind-0.1.6-py3-none-any.whl.
File metadata
- Download URL: fastmind-0.1.6-py3-none-any.whl
- Upload date:
- Size: 34.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
11947d445fd42796cd259a66fbd72c169bf585f48a6cafa5d3fd47ee23f4f525
|
|
| MD5 |
59797186f6904c15ffaccc513fc36bf0
|
|
| BLAKE2b-256 |
bda0edbcff9209a289e6e0cedc2e50527c4b94712f28b887b4479cfc7b902d37
|