Multi-Modal AI Agent System
Project description
xAgent - Multi-Modal AI Agent System
๐ A powerful multi-modal AI Agent system with modern architecture
xAgent provides a complete AI assistant experience with text and image processing capabilities, intelligent vocabulary management, and high-performance concurrent tool execution. Built on FastAPI, Streamlit, and Redis for production-ready scalability.
๐ Table of Contents
- โจ Key Features
- ๐๏ธ Architecture
- ๐ Quick Start
- ๐ก Usage Examples
- ๐ HTTP Agent Server
- ๐ง Development Guide
- ๐ค API Reference
- ๐ Monitoring & Observability
- ๐ค Contributing
- ๐ License
โจ Key Features
๐ค Core AI Capabilities
- Multi-Modal Conversations: Engage in rich conversations with support for both text (via models like GPT-4o) and image inputs.
- Persistent Sessions: Leverages Redis to maintain conversation history, ensuring seamless and stateful interactions across sessions.
- Extensible Tool System: Easily integrate custom synchronous or asynchronous functions as tools. The system automatically handles sync-to-async conversion for non-blocking execution.
- Concurrent Tool Execution: Capable of running multiple tools in parallel, significantly improving response times for complex queries.
- Structured Outputs: Define response structure using Pydantic models to get reliable, typed data from the agent.
- Agent as a Tool: A powerful pattern where specialized agents can be converted into tools, allowing a coordinator agent to delegate complex tasks.
- MCP Integration: Dynamically loads and refreshes tools from external sources using the Model Context Protocol (MCP).
๐ง Developer-Focused Design
- Modern Async Architecture: Built from the ground up with
asynciofor high-performance, non-blocking operations. - Standalone HTTP Server: Expose agent functionality via a REST API, complete with streaming support for real-time responses. See the HTTP Agent Server section for details.
- Modular and Pluggable: The clear separation of components like
Agent,Session, andMessageDBmakes the system easy to extend and maintain. - Ready-to-Use Frontend: Includes a Streamlit-based chat application for immediate interaction and testing.
- Observability: Integrated with Langfuse for detailed tracing and monitoring of agent interactions.
๐๏ธ Architecture
Modern Design for High Performance
xAgent/
โโโ ๐ค xagent/ # Core async agent framework
โ โโโ core/ # Agent and session management
โ โ โโโ agent.py # Main Agent class with chat
โ โ โโโ session.py # Session management with operations
โ โ โโโ server.py # Standalone HTTP Agent Server
โ โโโ db/ # Database layer (Redis)
โ โ โโโ message.py # Message persistence
โ โโโ schemas/ # Data models and types (Pydantic)
โ โ โโโ message.py # Message and ToolCall models
โ โโโ tools/ # Tool ecosystem
โ โ โโโ __init__.py # Tool registry (web_search, draw_image)
โ โ โโโ openai_tool.py # OpenAI tool integrations
โ โ โโโ mcp_demo/ # MCP demo server and client
โ โโโ utils/ # Utility functions
โ โโโ tool_decorator.py # Tool decorators
โ โโโ mcp_convertor.py # MCP client
โ โโโ image_upload.py # AWS S3 image upload utility
โโโ ๐ ๏ธ toolkit/ # Custom tool ecosystem
โ โโโ __init__.py # Toolkit registry
โ โโโ tools.py # Custom tools (char_count)
โ โโโ mcp_server.py # Main MCP server
โ โโโ vocabulary/ # Vocabulary learning system
โโโ โ๏ธ config/ # Configuration files
โ โโโ agent.yaml # Agent server configuration
โโโ ๐จ frontend/ # Streamlit web interface
โ โโโ chat_app.py # Main chat application
โโโ ๐ examples/ # Usage examples and demos
โโโ ๐งช tests/ # Comprehensive test suite
๐ Core Components
| Component | Purpose | Technology |
|---|---|---|
| Agent | Core conversation handler | OpenAI API + AsyncIO |
| Session | Message history management | Redis + Operations |
| MessageDB | Scalable persistence layer | Redis with client |
| Tools | Extensible function ecosystem | Auto sync-to-async conversion |
| MCP | Dynamic tool loading protocol | HTTP client |
๐ Quick Start
Prerequisites
| Requirement | Version | Purpose |
|---|---|---|
| Python | 3.12+ | Core runtime |
| Redis | 7.0+ | Message persistence |
| OpenAI API Key | - | AI model access |
Installation
Clone and Setup
git clone https://github.com/ZJCODE/xAgent.git
cd xAgent
pip install -r requirements.txt
install by using pip
pip install myxagent
Environment Configuration
# Copy and edit environment file
cp .env.example .env
Required variables
OPENAI_API_KEY=your_openai_api_key
Optional variables
REDIS_URL=your_redis_url_with_password
LANGFUSE_SECRET_KEY=your_langfuse_key
LANGFUSE_PUBLIC_KEY=your_langfuse_public_key
LANGFUSE_HOST=https://cloud.langfuse.com
AWS_ACCESS_KEY_ID=your_aws_access_key_id
AWS_SECRET_ACCESS_KEY=your_aws_secret_access_key
AWS_REGION=us-east-1
BUCKET_NAME=your_bucket_name
Running the Application
๐ Quick Start (All Services)
chmod +x run.sh
./run.sh
โ๏ธ Manual Start (Individual Services)
# Terminal 1: Standalone HTTP Agent Server
python xagent/core/server.py --config config/agent.yaml --toolkit toolkit
# Terminal 2: MCP Server
python toolkit/mcp_server.py
# Terminal 3: Frontend
streamlit run frontend/chat_app.py --server.port 8501
๐ Access Points
| Service | URL | Description |
|---|---|---|
| Chat Interface | http://localhost:8501 | Main user interface |
| API Docs | http://localhost:8000/docs | Interactive API documentation |
| Health Check | http://localhost:8000/health | Service status monitoring |
| HTTP Agent Server | http://localhost:8010/chat | Standalone agent HTTP API |
๐ก Usage Examples
๐ Basic Chat
import asyncio
from xagent.core import Agent, Session
from xagent.tools import web_search
async def main():
# Create agent with modern architecture
agent = Agent(
name="my_assistant",
system_prompt="You are a helpful AI assistant.",
model="gpt-4.1-mini",
tools=[web_search] # Add web search tool
stream=False # Set to True for streaming responses
)
# Create session for conversation management
session = Session(session_id="session456")
# Chat interaction
response = await agent.chat("Hello, how are you?", session)
print(response)
# Continue conversation with context
response = await agent.chat("What's the weather like in Hangzhou?", session)
print(response)
# Streaming response example
response = await agent.chat("Hello, how are you?", session,stream=True)
async for event in response:
print(event)
asyncio.run(main())
๐๏ธ Advanced Chat with Redis Persistence
import asyncio
from xagent.core import Agent, Session
from xagent.db import MessageDB
async def chat_with_persistence():
# Initialize Redis-backed message storage
message_db = MessageDB()
# Create agent
agent = Agent(
name="persistent_agent",
model="gpt-4.1-mini",
tools=[]
)
# Create session with Redis persistence
session = Session(
user_id="user123",
session_id="persistent_session",
message_db=message_db
)
# Chat with automatic message persistence
response = await agent.chat("Remember this: my favorite color is blue", session)
print(response)
# Later conversation - context is preserved in Redis
response = await agent.chat("What's my favorite color?", session)
print(response)
asyncio.run(chat_with_persistence())
๐ง Custom Tools (Sync and Async)
import asyncio
import time
import httpx
from xagent.utils.tool_decorator import function_tool
from xagent.core import Agent, Session
# Sync tools - automatically converted to async
@function_tool()
def calculate_square(n: int) -> int:
"""Calculate square of a number (CPU-intensive)."""
time.sleep(0.1) # Simulate CPU work
return n * n
@function_tool()
def format_text(text: str, style: str) -> str:
"""Format text with various styles."""
if style == "upper":
return text.upper()
elif style == "title":
return text.title()
return text
# Async tools - used directly for I/O operations
@function_tool()
async def fetch_weather(city: str) -> str:
"""Fetch weather data from API."""
async with httpx.AsyncClient() as client:
# Simulate weather API call
await asyncio.sleep(0.5)
return f"Weather in {city}: 22ยฐC, Sunny"
async def main():
# Mix of sync and async tools
agent = Agent(
tools=[calculate_square, format_text, fetch_weather],
model="gpt-4.1-mini"
)
session = Session(user_id="user123")
# Agent handles all tools automatically - sync tools run in thread pool
response = await agent.chat(
"Calculate the square of 15, format 'hello world' in title case, and get weather for Tokyo",
session
)
print(response)
asyncio.run(main())
๐ง MCP Protocol Integration
import asyncio
from xagent.core import Agent, Session
async def mcp_integration_example():
# Create agent with MCP tools
agent = Agent(
tools=[],
mcp_servers=["http://localhost:8001/mcp/"], # Auto-refresh MCP tools
model="gpt-4.1-mini"
)
session = Session(user_id="user123")
# Use MCP tools automatically
response = await agent.chat("Use the available MCP tools to help me", session)
print(response)
asyncio.run(mcp_integration_example())
๐ Structured Output with Pydantic
import asyncio
from pydantic import BaseModel
from xagent.core import Agent, Session
from xagent.tools import web_search
class WeatherReport(BaseModel):
location: str
temperature: int
condition: str
humidity: int
class Step(BaseModel):
explanation: str
output: str
class MathReasoning(BaseModel):
steps: list[Step]
final_answer: str
async def get_structured_response():
agent = Agent(model="gpt-4.1-mini", tools=[web_search])
session = Session(user_id="user123")
# Request structured output for weather
weather_data = await agent.chat(
"what's the weather like in Hangzhou?",
session,
output_type=WeatherReport
)
print(f"Location: {weather_data.location}")
print(f"Temperature: {weather_data.temperature}ยฐF")
print(f"Condition: {weather_data.condition}")
print(f"Humidity: {weather_data.humidity}%")
# Request structured output for mathematical reasoning
reply = await agent.chat(
"how can I solve 8x + 7 = -23",
session,
output_type=MathReasoning
)
for index, step in enumerate(reply.steps):
print(f"Step {index + 1}: {step.explanation} => Output: {step.output}")
print("Final Answer:", reply.final_answer)
asyncio.run(get_structured_response())
๐ค Agent as Tool Pattern
import asyncio
from xagent.core import Agent, Session
from xagent.db import MessageDB
from xagent.tools import web_search
async def agent_as_tool_example():
# Create specialized agents
researcher_agent = Agent(
name="research_specialist",
system_prompt="Research expert. Gather information and provide insights.",
model="gpt-4.1-mini",
tools=[web_search]
)
writing_agent = Agent(
name="writing_specialist",
system_prompt="Professional writer. Create engaging content.",
model="gpt-4.1-mini"
)
# Convert agents to tools
message_db = MessageDB()
research_tool = researcher_agent.as_tool(
name="researcher",
description="Research topics and provide detailed analysis",
message_db=message_db
)
writing_tool = writing_agent.as_tool(
name="content_writer",
description="Write and edit content",
message_db=message_db
)
# Main coordinator agent with specialist tools
coordinator = Agent(
name="coordinator",
tools=[research_tool, writing_tool],
system_prompt="Coordination agent that delegates to specialists.",
model="gpt-4.1"
)
session = Session(user_id="user123")
# Complex multi-step task
response = await coordinator.chat(
"Research renewable energy benefits and write a brief summary",
session
)
print(response)
asyncio.run(agent_as_tool_example())
๐ HTTP Agent Server
xAgent provides a standalone HTTP server that exposes the Agent functionality through REST API endpoints. This allows integration with other systems and services through simple HTTP calls.
๐ Starting the HTTP Server
# Start with default config
python xagent/core/server.py --config config/agent.yaml --toolkit toolkit
# Server will start on http://localhost:8010 by default
After installing the package, you can use the xagent-server command:
# Start the server using the installed command
xagent-server --config /path/to/your/config.yaml --toolkit /path/to/your/toolkit
๐ Programmatic Usage
You can also start the HTTP Agent Server directly from Python:
from xagent.core.server import HTTPAgentServer
# Create and run the HTTP Agent Server
server = HTTPAgentServer(config_path = "config/agent.yaml",toolkit_path = "toolkit")
# Run the server
server.run(host="0.0.0.0", port=8010)
โ๏ธ Configuration
The HTTP server is configured through a YAML file (e.g., config/agent.yaml):
agent:
name: "Agent"
system_prompt: |
You are a helpful assistant. Your task is to assist users with their queries and tasks.
model: "gpt-4.1-mini"
mcp_servers:
- "http://localhost:8001/mcp/"
tools:
- "web_search" # built-in web search tool
- "draw_image" # built-in image drawing tool
- "char_count" # custom tool for counting characters
use_local_session: false
server:
host: "0.0.0.0"
port: 8010
๐ก API Endpoints
POST /chat
Main chat endpoint for interacting with the AI agent.
Request Body:
{
"user_id": "string",
"session_id": "string",
"user_message": "string",
"image_source": "string",
"stream": false
}
image_source: Image URL or base64 encoded image (Optional)stream: Set totrueto enable streaming response (Optional, defaults tofalse)
Standard Response (stream: false):
{
"reply": "string"
}
Streaming Response (stream: true):
The server will stream Server-Sent Events (SSE). Each event is a JSON object.
- Data Event:
data: {"delta": "some text"} - Completion Event:
data: [DONE]
๐ก Usage Examples
Basic Chat Request
curl -X POST "http://localhost:8010/chat" \
-H "Content-Type: application/json" \
-d '{
"user_id": "user123",
"session_id": "session456",
"user_message": "Hello, how are you?"
}'
streaming response example:
curl -X POST "http://localhost:8010/chat" \
-H "Content-Type: application/json" \
-d '{
"user_id": "user123",
"session_id": "session456",
"user_message": "Hello, how are you?",
"stream": true
}'
Chat with Image
curl -X POST "http://localhost:8010/chat" \
-H "Content-Type: application/json" \
-d '{
"user_id": "user123",
"session_id": "session456",
"user_message": "What do you see in this image?",
"image_source": "https://example.com/image.jpg"
}'
๐ง Development Guide
๐ ๏ธ Creating Tools
Both sync and async functions work seamlessly:
from xagent.utils.tool_decorator import function_tool
import asyncio
import time
# โ
Sync tool - perfect for CPU-bound operations
@function_tool()
def my_sync_tool(input_text: str) -> str:
"""Process text synchronously (runs in thread pool)."""
time.sleep(0.1) # Simulate CPU-intensive work
return f"Sync processed: {input_text}"
# โ
Async tool - ideal for I/O-bound operations
@function_tool()
async def my_async_tool(input_text: str) -> str:
"""Process text asynchronously."""
await asyncio.sleep(0.1) # Simulate async I/O operation
return f"Async processed: {input_text}"
๐ Tool Development Guidelines
| Use Case | Tool Type | Example |
|---|---|---|
| CPU-bound | Sync functions | Math calculations, data processing |
| I/O-bound | Async functions | API calls, database queries |
| Simple operations | Sync functions | String manipulation, file operations |
| Network requests | Async functions | HTTP requests, WebSocket connections |
โ ๏ธ Note: Recursive functions are not supported as tools due to potential stack overflow issues in async environments.
๐ Automatic Conversion
xAgent's @function_tool() decorator automatically handles sync-to-async conversion:
- Sync functions โ Run in thread pool (non-blocking)
- Async functions โ Run directly on event loop
- Concurrent execution โ All tools execute in parallel when called
๐ Override Defaults
You can override the default tool name and description using the function_tool decorator:
@function_tool(name="custom_square", description="Calculate the square of a number")
def calculate_square(n: int) -> int:
return n * n
๐ค API Reference
Core Classes
๐ค Agent
Main AI agent class for handling conversations and tool execution.
Agent(
name: Optional[str] = None,
system_prompt: Optional[str] = None,
model: Optional[str] = None,
client: Optional[AsyncOpenAI] = None,
tools: Optional[list] = None,
mcp_servers: Optional[str | list] = None
)
Key Methods:
async chat(user_message, session, **kwargs) -> str | BaseModel: Main chat interfaceasync __call__(user_message, session, **kwargs) -> str | BaseModel: Shorthand for chatas_tool(name, description, message_db) -> Callable: Convert agent to tool
Parameters:
name: Agent identifier (default: "default_agent")system_prompt: Instructions for the agent behaviormodel: OpenAI model to use (default: "gpt-4.1-mini")client: Custom AsyncOpenAI client instancetools: List of function toolsmcp_servers: MCP server URLs for dynamic tool loading
๐ฌ Session
Manages conversation history and persistence with operations.
Session(
user_id: str,
session_id: Optional[str] = None,
message_db: Optional[MessageDB] = None
)
Key Methods:
async add_messages(messages: Message | List[Message]) -> None: Store messagesasync get_messages(count: int = 20) -> List[Message]: Retrieve message historyasync clear_session() -> None: Clear conversation historyasync pop_message() -> Optional[Message]: Remove last non-tool message
Features:
- Automatic fallback to in-memory storage if no MessageDB provided
- Redis-backed persistence for production use
- Thread-safe operations
- Efficient message batching
๐๏ธ MessageDB
Redis-backed message persistence layer.
# Initialize with environment variables or defaults
message_db = MessageDB()
# Usage with session
session = Session(
user_id="user123",
message_db=message_db
)
Important Considerations
| Aspect | Details |
|---|---|
| Tool functions | Can be sync or async (automatic conversion) |
| Agent interactions | Always use await |
| Context | Run in context with asyncio.run() |
| Concurrency | All tools execute in parallel automatically |
๐ Monitoring & Observability
xAgent includes comprehensive observability features:
- ๐ Langfuse Integration - Track AI interactions and performance
- ๐ Structured Logging - Throughout the entire system
- โค๏ธ Health Checks - API monitoring endpoints
- โก Performance Metrics - Tool execution time and success rates
๐ค Contributing
We welcome contributions! Here's how to get started:
Development Workflow
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Commit your changes:
git commit -m 'Add amazing feature' - Push to the branch:
git push origin feature/amazing-feature - Open a Pull Request
Development Guidelines
| Area | Requirements |
|---|---|
| Code Style | Follow PEP 8 standards |
| Testing | Add tests for new features |
| Documentation | Update docs as needed |
| Type Safety | Use type hints throughout |
| Commits | Follow conventional commit messages |
Package Upload
First time upload
pip install build twine
python -m build
twine upload dist/*
Subsequent uploads
rm -rf dist/ build/ *.egg-info/
python -m build
twine upload dist/*
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Acknowledgments
Special thanks to the amazing open source projects that make xAgent possible:
- OpenAI - GPT models powering our AI
- FastAPI - Robust async API framework
- Streamlit - Intuitive web interface
- Redis - High-performance data storage
- Langfuse - Observability and monitoring
๐ Support & Community
| Resource | Link | Purpose |
|---|---|---|
| ๐ Issues | GitHub Issues | Bug reports & feature requests |
| ๐ฌ Discussions | GitHub Discussions | Community chat & Q&A |
| ๐ง Email | zhangjun310@live.com | Direct support |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file myxagent-0.1.1-py3-none-any.whl.
File metadata
- Download URL: myxagent-0.1.1-py3-none-any.whl
- Upload date:
- Size: 40.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
45253733920a52239778650a4eb5f31f598fb588a318dd985411b248a07ba2c1
|
|
| MD5 |
b0f3868ccba846fbdc129c583c772822
|
|
| BLAKE2b-256 |
b37fbc8f404b4235ca23ea40422ccd18e91f3b968a6dc7c208beb1d4eaedbf36
|