Skip to main content

A lightweight, event-driven multi-agent framework for embodied AI systems

Project description

FastMind ๐Ÿง 

A lightweight, event-driven multi-agent framework for embodied AI systems.

PyPI version Python License: GPL-3.0

Features

  • FastAPI-like Decorators: Familiar @app.agent, @app.tool, @app.perception syntax, easy to learn
  • State Graph Architecture: Build agent workflows like flowcharts, not nested loops
  • Event-Driven: Asyncio-based, zero polling, high-performance async execution
  • Built-in Streaming: Real-time streaming output with backpressure control
  • Human-in-the-Loop: Interrupt and resume sessions for human approval
  • Perception Loops: Native support for sensors, timers, and external triggers
  • Tool Calling: ReAct-style agent-tool-agent loops
  • Session Isolation: Multi-user support with isolated session state
  • Lightweight: ~8000 lines, no big dependencies

Installation

From PyPI (Recommended)

pip install fastmind

From GitHub

pip install git+https://github.com/kandada/fastmind.git

With examples:

pip install git+https://github.com/kandada/fastmind.git#egg=fastmind[examples]

For development:

git clone https://github.com/kandada/fastmind.git
cd fastmind
pip install -e ".[all]"

Quick Start

from fastmind import FastMind, Graph, Event
from fastmind.contrib import FastMindAPI

app = FastMind()

@app.agent(name="chat_agent")
async def chat_agent(state: dict, event: Event) -> dict:
    state.setdefault("messages", [])
    state["messages"].append({"role": "user", "content": event.payload.get("text", "")})
    # Your LLM call here
    state["messages"].append({"role": "assistant", "content": "Hello!"})
    return state

graph = Graph()
graph.add_node("chat", chat_agent)
graph.set_entry_point("chat")
app.register_graph("main", graph)

async def main():
    api = FastMindAPI(app)
    await api.start()
    await api.push_event("user_001", Event("user.message", {"text": "Hello!"}, "user_001"))
    await api.stop()

import asyncio
asyncio.run(main())

Core Concepts

State

A dict-like container for session data shared across nodes:

state["messages"].append({"role": "user", "content": "Hello"})

Node

An async function that processes events and returns updated state:

async def my_node(state: dict, event: Event) -> dict:
    state["processed"] = True
    return state

Graph

A collection of nodes and edges defining your workflow:

graph = Graph()
graph.add_node("agent", chat_agent)
graph.add_edge("agent", "tool_node")
graph.set_entry_point("agent")

Event

External or internal triggers that drive graph execution:

event = Event(type="user.message", payload={"text": "Hello"}, session_id="user_001")

Streaming Output

Real-time streaming with zero polling:

@app.agent(name="chat_agent", stream=True)
async def chat_agent(state: dict, event: Event) -> dict:
    output_queue = state["_output_queue"]
    session_id = state["_session_id"]
    
    async def stream_llm():
        for chunk in llm_stream():
            for char in chunk:
                output_queue.put_nowait(Event(
                    type="stream.chunk",
                    payload={"delta": char},
                    session_id=session_id
                ))
                await asyncio.sleep(0.03)
        output_queue.put_nowait(Event(type="stream.end", payload={}, session_id=session_id))
    
    asyncio.create_task(stream_llm())
    return state

Human-in-the-Loop

Interrupt and resume for human approval:

@app.agent(name="order_agent")
async def order_agent(state: dict, event: Event) -> dict:
    state.setdefault("orders", [])
    amount = event.payload.get("amount", 0)
    state["orders"].append({"amount": amount, "status": "pending"})
    if amount > 1000:
        state["need_approval"] = True
    return state

async def approve_node(state: dict, event: Event) -> tuple[dict, list[Event]]:
    return state, [Event(
        type="interrupt",
        payload={"prompt": "Approve this transaction?", "resume_node": "confirm"},
        session_id=event.session_id
    )]

async def confirm_node(state: dict, event: Event) -> dict:
    if state.get("orders"):
        state["orders"][-1]["status"] = "confirmed"
    return state

async def reject_node(state: dict, event: Event) -> dict:
    if state.get("orders"):
        state["orders"][-1]["status"] = "rejected"
    return state

graph = Graph()
graph.add_node("order", order_agent)
graph.add_node("approve", approve_node)
graph.add_node("confirm", confirm_node)
graph.add_node("reject", reject_node)

graph.add_edge("order", "approve", condition=lambda s: s.get("need_approval"))
graph.add_edge("approve", "confirm")
graph.add_edge("approve", "reject")
graph.set_entry_point("order")

app.register_graph("main", graph)

Handle the interrupt in your application:

async def main():
    api = FastMindAPI(app)
    await api.start()
    
    event = Event("user.message", {"amount": 2000}, "user_001")
    await api.push_event("user_001", event)
    
    async for ev in api.stream_events("user_001"):
        if ev.type == "interrupt":
            print(f"Interrupt: {ev.payload['prompt']}")
            await api.resume_session("user_001", "confirm")  # or "reject"

Perception Loop

React to sensors, timers, and external events:

@app.perception(interval=5.0, name="sensor_monitor")
async def sensor_monitor(app: FastMind):
    while True:
        data = await read_sensor()
        yield Event(type="sensor.data", payload=data, session_id="system")
        await asyncio.sleep(5.0)

Tool Calling (ReAct)

@app.tool(name="get_weather", description="Get weather")
async def get_weather(city: str) -> str:
    return f"{city} is sunny"

from fastmind import ToolNode

tool_node = ToolNode(app.get_tools())

def has_tool_calls(state: dict, event: Event) -> str:
    return "tools" if state.get("tool_calls") else None

graph.add_conditional_edges("agent", has_tool_calls, {None: "__end__"})
graph.add_edge("tools", "agent")

Examples

Example Description
simple_chat.py Basic chat agent
simple_chat_with_tool.py Agent with tool calling (ReAct)
streaming_chat.py Real-time streaming output
human_in_loop.py Human approval workflow
perception_loop.py Sensor data processing
drone.py Timer-based perception
companion_bot.py Multi-agent conversation
humanoid_robot.py Multi-tool collaboration
sleep_assessment.py Multi-state HITL flow
comprehensive_assistant.py Full-featured assistant

Run an example:

python -m fastmind.examples.simple_chat

API Reference

FastMindAPI

api = FastMindAPI(app)

await api.start()                    # Start engine and perception loops
await api.push_event(session_id, event)  # Push event to session
async for ev in api.stream_events(session_id):  # Stream output events
    print(ev)
await api.stop()                     # Stop engine

Session

session = api.get_session(session_id)
state = session.state                 # Get session state
await session.wait_for_output(timeout=5.0)  # Wait for output event

Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      FastMindAPI                         โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚ PerceptionLoop   โ”‚โ”€โ”€โ”€โ–ถโ”‚        Engine              โ”‚  โ”‚
โ”‚  โ”‚ Scheduler       โ”‚    โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚  โ”‚ Session (per user)   โ”‚  โ”‚  โ”‚
โ”‚                         โ”‚  โ”‚  โ”œโ”€ State           โ”‚  โ”‚  โ”‚
โ”‚                         โ”‚  โ”‚  โ”œโ”€ Event Queue     โ”‚  โ”‚  โ”‚
โ”‚                         โ”‚  โ”‚  โ””โ”€ Output Queue    โ”‚  โ”‚  โ”‚
โ”‚                         โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚  โ”‚
โ”‚                         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Testing

pip install fastmind[dev]
pytest tests/ -v

Changelog

v0.1.3

  • Bug Fix: Fixed stream_events timeout issue when agent returns no output events
  • Improvement: Enhanced debug logging in engine for better observability
  • Improvement: Added _has_conditional_edges() helper method to Graph class
  • Tests: Added comprehensive test suite for ReAct loops and node execution protection

v0.1.2

  • Initial release

License

GPL-3.0 License - see LICENSE for details.

Acknowledgments

Inspired by LangGraph for the state graph architecture design.

Author

xiefujin email:490021684@qq.com

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastmind-0.1.8.tar.gz (20.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastmind-0.1.8-py3-none-any.whl (30.3 kB view details)

Uploaded Python 3

File details

Details for the file fastmind-0.1.8.tar.gz.

File metadata

  • Download URL: fastmind-0.1.8.tar.gz
  • Upload date:
  • Size: 20.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.6

File hashes

Hashes for fastmind-0.1.8.tar.gz
Algorithm Hash digest
SHA256 03a56e7b5919a5a6b11b2ec76d38f4a2622ec91a4a8ab5ca727f4a249b72d5eb
MD5 45d8e98bfc79c63409178499fc76695d
BLAKE2b-256 f68d34d05be1060830b9998a3ad0115251f71e6dcbcdca36a903159b76fa052c

See more details on using hashes here.

File details

Details for the file fastmind-0.1.8-py3-none-any.whl.

File metadata

  • Download URL: fastmind-0.1.8-py3-none-any.whl
  • Upload date:
  • Size: 30.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.6

File hashes

Hashes for fastmind-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 e83bd7dfaa0dba761fb0084b4959b792787bdac9d5fca50b8577a945488c8851
MD5 06be0901393c7e50c7329be613005643
BLAKE2b-256 5b374ad37bc2566d8cf7133311d1d8d34f8ae92b737da03459d7ec70dff5e8bf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page