Skip to main content

Multi-Modal AI Agent System

Project description

xAgent - Multi-Modal AI Agent System

Python FastAPI Streamlit Redis License

๐Ÿš€ A powerful multi-modal AI Agent system with modern architecture

xAgent provides a complete AI assistant experience with text and image processing capabilities, intelligent vocabulary management, and high-performance concurrent tool execution. Built on FastAPI, Streamlit, and Redis for production-ready scalability.

๐Ÿ“‹ Table of Contents

โœจ Key Features

๐Ÿค– Core AI Capabilities

  • Multi-Modal Conversations: Engage in rich conversations with support for both text (via models like GPT-4o) and image inputs.
  • Persistent Sessions: Leverages Redis to maintain conversation history, ensuring seamless and stateful interactions across sessions.
  • Extensible Tool System: Easily integrate custom synchronous or asynchronous functions as tools. The system automatically handles sync-to-async conversion for non-blocking execution.
  • Concurrent Tool Execution: Capable of running multiple tools in parallel, significantly improving response times for complex queries.
  • Structured Outputs: Define response structure using Pydantic models to get reliable, typed data from the agent.
  • Agent as a Tool: A powerful pattern where specialized agents can be converted into tools, allowing a coordinator agent to delegate complex tasks.
  • MCP Integration: Dynamically loads and refreshes tools from external sources using the Model Context Protocol (MCP).

๐Ÿ”ง Developer-Focused Design

  • Modern Async Architecture: Built from the ground up with asyncio for high-performance, non-blocking operations.
  • Standalone HTTP Server: Expose agent functionality via a REST API, complete with streaming support for real-time responses. See the HTTP Agent Server section for details.
  • Modular and Pluggable: The clear separation of components like Agent, Session, and MessageDB makes the system easy to extend and maintain.
  • Ready-to-Use Frontend: Includes a Streamlit-based chat application for immediate interaction and testing.
  • Observability: Integrated with Langfuse for detailed tracing and monitoring of agent interactions.

๐Ÿ—๏ธ Architecture

Modern Design for High Performance

xAgent/
โ”œโ”€โ”€ ๐Ÿค– xagent/                # Core async agent framework
โ”‚   โ”œโ”€โ”€ core/                 # Agent and session management
โ”‚   โ”‚   โ”œโ”€โ”€ agent.py          # Main Agent class with chat
โ”‚   โ”‚   โ”œโ”€โ”€ session.py        # Session management with operations
โ”‚   โ”‚   โ””โ”€โ”€ server.py         # Standalone HTTP Agent Server
โ”‚   โ”œโ”€โ”€ db/                   # Database layer (Redis)
โ”‚   โ”‚   โ””โ”€โ”€ message.py        # Message persistence
โ”‚   โ”œโ”€โ”€ schemas/              # Data models and types (Pydantic)
โ”‚   โ”‚   โ””โ”€โ”€ message.py        # Message and ToolCall models
โ”‚   โ”œโ”€โ”€ tools/                # Tool ecosystem
โ”‚   โ”‚   โ”œโ”€โ”€ __init__.py       # Tool registry (web_search, draw_image)
โ”‚   โ”‚   โ”œโ”€โ”€ openai_tool.py    # OpenAI tool integrations
โ”‚   โ”‚   โ””โ”€โ”€ mcp_demo/         # MCP demo server and client
โ”‚   โ””โ”€โ”€ utils/                # Utility functions
โ”‚       โ”œโ”€โ”€ tool_decorator.py # Tool decorators
โ”‚       โ”œโ”€โ”€ mcp_convertor.py  # MCP client
โ”‚       โ””โ”€โ”€ image_upload.py   # AWS S3 image upload utility
โ”œโ”€โ”€ ๐Ÿ› ๏ธ toolkit/               # Custom tool ecosystem
โ”‚   โ”œโ”€โ”€ __init__.py           # Toolkit registry
โ”‚   โ”œโ”€โ”€ tools.py              # Custom tools (char_count)
โ”‚   โ”œโ”€โ”€ mcp_server.py         # Main MCP server
โ”‚   โ””โ”€โ”€ vocabulary/           # Vocabulary learning system
โ”œโ”€โ”€ โš™๏ธ config/                # Configuration files
โ”‚   โ””โ”€โ”€ agent.yaml            # Agent server configuration
โ”œโ”€โ”€ ๐ŸŽจ frontend/              # Streamlit web interface  
โ”‚   โ””โ”€โ”€ chat_app.py           # Main chat application
โ”œโ”€โ”€ ๐Ÿ“ examples/              # Usage examples and demos
โ””โ”€โ”€ ๐Ÿงช tests/                 # Comprehensive test suite

๐Ÿ”„ Core Components

Component Purpose Technology
Agent Core conversation handler OpenAI API + AsyncIO
Session Message history management Redis + Operations
MessageDB Scalable persistence layer Redis with client
Tools Extensible function ecosystem Auto sync-to-async conversion
MCP Dynamic tool loading protocol HTTP client

๐Ÿš€ Quick Start

Prerequisites

Requirement Version Purpose
Python 3.12+ Core runtime
Redis 7.0+ Message persistence
OpenAI API Key - AI model access

Installation

Clone and Setup

git clone https://github.com/ZJCODE/xAgent.git
cd xAgent
pip install -r requirements.txt

install by using pip

pip install myxagent

Environment Configuration

# Copy and edit environment file
cp .env.example .env

Required variables

OPENAI_API_KEY=your_openai_api_key

Optional variables

REDIS_URL=your_redis_url_with_password

LANGFUSE_SECRET_KEY=your_langfuse_key
LANGFUSE_PUBLIC_KEY=your_langfuse_public_key
LANGFUSE_HOST=https://cloud.langfuse.com

AWS_ACCESS_KEY_ID=your_aws_access_key_id
AWS_SECRET_ACCESS_KEY=your_aws_secret_access_key
AWS_REGION=us-east-1
BUCKET_NAME=your_bucket_name

Running the Application

๐Ÿš€ Quick Start (All Services)

chmod +x run.sh
./run.sh

โš™๏ธ Manual Start (Individual Services)

# Terminal 1: Standalone HTTP Agent Server
python xagent/core/server.py --config config/agent.yaml --toolkit toolkit

# Terminal 2: MCP Server
python toolkit/mcp_server.py

# Terminal 3: Frontend
streamlit run frontend/chat_app.py --server.port 8501

๐ŸŒ Access Points

Service URL Description
Chat Interface http://localhost:8501 Main user interface
API Docs http://localhost:8000/docs Interactive API documentation
Health Check http://localhost:8000/health Service status monitoring
HTTP Agent Server http://localhost:8010/chat Standalone agent HTTP API

๐Ÿ’ก Usage Examples

๐Ÿ“˜ Basic Chat

import asyncio
from xagent.core import Agent, Session
from xagent.tools import web_search

async def main():
    # Create agent with modern architecture
    agent = Agent(
        name="my_assistant",
        system_prompt="You are a helpful AI assistant.",
        model="gpt-4.1-mini",
        tools=[web_search]  # Add web search tool
        stream=False  # Set to True for streaming responses
    )

    # Create session for conversation management
    session = Session(session_id="session456")

    # Chat interaction
    response = await agent.chat("Hello, how are you?", session)
    print(response)

    # Continue conversation with context
    response = await agent.chat("What's the weather like in Hangzhou?", session)
    print(response)

    # Streaming response example
    response = await agent.chat("Hello, how are you?", session,stream=True)
    async for event in response:
        print(event)


asyncio.run(main())

๐Ÿ—„๏ธ Advanced Chat with Redis Persistence

import asyncio
from xagent.core import Agent, Session
from xagent.db import MessageDB

async def chat_with_persistence():
    # Initialize Redis-backed message storage
    message_db = MessageDB()
    
    # Create agent
    agent = Agent(
        name="persistent_agent",
        model="gpt-4.1-mini",
        tools=[]
    )

    # Create session with Redis persistence
    session = Session(
        user_id="user123", 
        session_id="persistent_session",
        message_db=message_db
    )

    # Chat with automatic message persistence
    response = await agent.chat("Remember this: my favorite color is blue", session)
    print(response)
    
    # Later conversation - context is preserved in Redis
    response = await agent.chat("What's my favorite color?", session)
    print(response)

asyncio.run(chat_with_persistence())

๐Ÿ”ง Custom Tools (Sync and Async)

import asyncio
import time
import httpx
from xagent.utils.tool_decorator import function_tool
from xagent.core import Agent, Session

# Sync tools - automatically converted to async
@function_tool()
def calculate_square(n: int) -> int:
    """Calculate square of a number (CPU-intensive)."""
    time.sleep(0.1)  # Simulate CPU work
    return n * n

@function_tool()
def format_text(text: str, style: str) -> str:
    """Format text with various styles."""
    if style == "upper":
        return text.upper()
    elif style == "title":
        return text.title()
    return text

# Async tools - used directly for I/O operations
@function_tool()
async def fetch_weather(city: str) -> str:
    """Fetch weather data from API."""
    async with httpx.AsyncClient() as client:
        # Simulate weather API call
        await asyncio.sleep(0.5)
        return f"Weather in {city}: 22ยฐC, Sunny"

async def main():
    # Mix of sync and async tools
    agent = Agent(
        tools=[calculate_square, format_text, fetch_weather],
        model="gpt-4.1-mini"
    )
    
    session = Session(user_id="user123")
    
    # Agent handles all tools automatically - sync tools run in thread pool
    response = await agent.chat(
        "Calculate the square of 15, format 'hello world' in title case, and get weather for Tokyo",
        session
    )
    print(response)

asyncio.run(main())

๐Ÿ”ง MCP Protocol Integration

import asyncio
from xagent.core import Agent, Session

async def mcp_integration_example():
    # Create agent with MCP tools
    agent = Agent(
        tools=[],
        mcp_servers=["http://localhost:8001/mcp/"],  # Auto-refresh MCP tools
        model="gpt-4.1-mini"
    )
    
    session = Session(user_id="user123")
    
    # Use MCP tools automatically
    response = await agent.chat("Use the available MCP tools to help me", session)
    print(response)

asyncio.run(mcp_integration_example())

๐Ÿ“Š Structured Output with Pydantic

import asyncio
from pydantic import BaseModel
from xagent.core import Agent, Session
from xagent.tools import web_search

class WeatherReport(BaseModel):
    location: str
    temperature: int
    condition: str
    humidity: int

class Step(BaseModel):
    explanation: str
    output: str

class MathReasoning(BaseModel):
    steps: list[Step]
    final_answer: str

async def get_structured_response():
    agent = Agent(model="gpt-4.1-mini", tools=[web_search])
    session = Session(user_id="user123")
    
    # Request structured output for weather
    weather_data = await agent.chat(
        "what's the weather like in Hangzhou?",
        session,
        output_type=WeatherReport
    )
    
    print(f"Location: {weather_data.location}")
    print(f"Temperature: {weather_data.temperature}ยฐF")
    print(f"Condition: {weather_data.condition}")
    print(f"Humidity: {weather_data.humidity}%")

    # Request structured output for mathematical reasoning
    reply = await agent.chat(
        "how can I solve 8x + 7 = -23", 
        session, 
        output_type=MathReasoning
    )
    for index, step in enumerate(reply.steps):
        print(f"Step {index + 1}: {step.explanation} => Output: {step.output}")
    print("Final Answer:", reply.final_answer)

asyncio.run(get_structured_response())

๐Ÿค– Agent as Tool Pattern

import asyncio
from xagent.core import Agent, Session
from xagent.db import MessageDB
from xagent.tools import web_search

async def agent_as_tool_example():
    # Create specialized agents
    researcher_agent = Agent(
        name="research_specialist",
        system_prompt="Research expert. Gather information and provide insights.",
        model="gpt-4.1-mini",
        tools=[web_search]
    )
    
    writing_agent = Agent(
        name="writing_specialist", 
        system_prompt="Professional writer. Create engaging content.",
        model="gpt-4.1-mini"
    )
    
    # Convert agents to tools
    message_db = MessageDB()
    research_tool = researcher_agent.as_tool(
        name="researcher",
        description="Research topics and provide detailed analysis",
        message_db=message_db
    )
    
    writing_tool = writing_agent.as_tool(
        name="content_writer",
        description="Write and edit content",
        message_db=message_db
    )
    
    # Main coordinator agent with specialist tools
    coordinator = Agent(
        name="coordinator",
        tools=[research_tool, writing_tool],
        system_prompt="Coordination agent that delegates to specialists.",
        model="gpt-4.1"
    )
    
    session = Session(user_id="user123")
    
    # Complex multi-step task
    response = await coordinator.chat(
        "Research renewable energy benefits and write a brief summary",
        session
    )
    print(response)

asyncio.run(agent_as_tool_example())

๐ŸŒ HTTP Agent Server

xAgent provides a standalone HTTP server that exposes the Agent functionality through REST API endpoints. This allows integration with other systems and services through simple HTTP calls.

๐Ÿš€ Starting the HTTP Server

# Start with default config
python xagent/core/server.py --config config/agent.yaml --toolkit toolkit

# Server will start on http://localhost:8010 by default

After installing the package, you can use the xagent-server command:

# Start the server using the installed command
xagent-server --config /path/to/your/config.yaml --toolkit /path/to/your/toolkit

๐Ÿƒ Programmatic Usage

You can also start the HTTP Agent Server directly from Python:

from xagent.core.server import HTTPAgentServer

# Create and run the HTTP Agent Server
server = HTTPAgentServer(config_path = "config/agent.yaml",toolkit_path = "toolkit")

# Run the server
server.run(host="0.0.0.0", port=8010)

โš™๏ธ Configuration

The HTTP server is configured through a YAML file (e.g., config/agent.yaml):

agent:
  name: "Agent"
  system_prompt: |
    You are a helpful assistant. Your task is to assist users with their queries and tasks.
  model: "gpt-4.1-mini"
  mcp_servers:
    - "http://localhost:8001/mcp/"
  tools:
    - "web_search" # built-in web search tool
    - "draw_image" # built-in image drawing tool
    - "char_count" # custom tool for counting characters
  use_local_session: false

server:
  host: "0.0.0.0"
  port: 8010

๐Ÿ“ก API Endpoints

POST /chat

Main chat endpoint for interacting with the AI agent.

Request Body:

{
  "user_id": "string",      
  "session_id": "string",   
  "user_message": "string", 
  "image_source": "string",
  "stream": false
}
  • image_source: Image URL or base64 encoded image (Optional)
  • stream: Set to true to enable streaming response (Optional, defaults to false)

Standard Response (stream: false):

{
  "reply": "string"
}

Streaming Response (stream: true):

The server will stream Server-Sent Events (SSE). Each event is a JSON object.

  • Data Event: data: {"delta": "some text"}
  • Completion Event: data: [DONE]

๐Ÿ’ก Usage Examples

Basic Chat Request

curl -X POST "http://localhost:8010/chat" \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "user123",
    "session_id": "session456",
    "user_message": "Hello, how are you?"
  }'

streaming response example:

curl -X POST "http://localhost:8010/chat" \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "user123",
    "session_id": "session456",
    "user_message": "Hello, how are you?",
    "stream": true
  }'

Chat with Image

curl -X POST "http://localhost:8010/chat" \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "user123",
    "session_id": "session456", 
    "user_message": "What do you see in this image?",
    "image_source": "https://example.com/image.jpg"
  }'

๐Ÿ”ง Development Guide

๐Ÿ› ๏ธ Creating Tools

Both sync and async functions work seamlessly:

from xagent.utils.tool_decorator import function_tool
import asyncio
import time

# โœ… Sync tool - perfect for CPU-bound operations
@function_tool()
def my_sync_tool(input_text: str) -> str:
    """Process text synchronously (runs in thread pool)."""
    time.sleep(0.1)  # Simulate CPU-intensive work
    return f"Sync processed: {input_text}"

# โœ… Async tool - ideal for I/O-bound operations  
@function_tool()
async def my_async_tool(input_text: str) -> str:
    """Process text asynchronously."""
    await asyncio.sleep(0.1)  # Simulate async I/O operation
    return f"Async processed: {input_text}"

๐Ÿ“‹ Tool Development Guidelines

Use Case Tool Type Example
CPU-bound Sync functions Math calculations, data processing
I/O-bound Async functions API calls, database queries
Simple operations Sync functions String manipulation, file operations
Network requests Async functions HTTP requests, WebSocket connections

โš ๏ธ Note: Recursive functions are not supported as tools due to potential stack overflow issues in async environments.

๐Ÿ”„ Automatic Conversion

xAgent's @function_tool() decorator automatically handles sync-to-async conversion:

  • Sync functions โ†’ Run in thread pool (non-blocking)
  • Async functions โ†’ Run directly on event loop
  • Concurrent execution โ†’ All tools execute in parallel when called

๐Ÿ“ Override Defaults

You can override the default tool name and description using the function_tool decorator:

@function_tool(name="custom_square", description="Calculate the square of a number")
def calculate_square(n: int) -> int:
    return n * n

๐Ÿค– API Reference

Core Classes

๐Ÿค– Agent

Main AI agent class for handling conversations and tool execution.

Agent(
    name: Optional[str] = None,
    system_prompt: Optional[str] = None, 
    model: Optional[str] = None,
    client: Optional[AsyncOpenAI] = None,
    tools: Optional[list] = None,
    mcp_servers: Optional[str | list] = None
)

Key Methods:

  • async chat(user_message, session, **kwargs) -> str | BaseModel: Main chat interface
  • async __call__(user_message, session, **kwargs) -> str | BaseModel: Shorthand for chat
  • as_tool(name, description, message_db) -> Callable: Convert agent to tool

Parameters:

  • name: Agent identifier (default: "default_agent")
  • system_prompt: Instructions for the agent behavior
  • model: OpenAI model to use (default: "gpt-4.1-mini")
  • client: Custom AsyncOpenAI client instance
  • tools: List of function tools
  • mcp_servers: MCP server URLs for dynamic tool loading

๐Ÿ’ฌ Session

Manages conversation history and persistence with operations.

Session(
    user_id: str,
    session_id: Optional[str] = None,
    message_db: Optional[MessageDB] = None
)

Key Methods:

  • async add_messages(messages: Message | List[Message]) -> None: Store messages
  • async get_messages(count: int = 20) -> List[Message]: Retrieve message history
  • async clear_session() -> None: Clear conversation history
  • async pop_message() -> Optional[Message]: Remove last non-tool message

Features:

  • Automatic fallback to in-memory storage if no MessageDB provided
  • Redis-backed persistence for production use
  • Thread-safe operations
  • Efficient message batching

๐Ÿ—„๏ธ MessageDB

Redis-backed message persistence layer.

# Initialize with environment variables or defaults
message_db = MessageDB()

# Usage with session
session = Session(
    user_id="user123",
    message_db=message_db
)

Important Considerations

Aspect Details
Tool functions Can be sync or async (automatic conversion)
Agent interactions Always use await
Context Run in context with asyncio.run()
Concurrency All tools execute in parallel automatically

๐Ÿ“Š Monitoring & Observability

xAgent includes comprehensive observability features:

  • ๐Ÿ” Langfuse Integration - Track AI interactions and performance
  • ๐Ÿ“ Structured Logging - Throughout the entire system
  • โค๏ธ Health Checks - API monitoring endpoints
  • โšก Performance Metrics - Tool execution time and success rates

๐Ÿค Contributing

We welcome contributions! Here's how to get started:

Development Workflow

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/amazing-feature
  3. Commit your changes: git commit -m 'Add amazing feature'
  4. Push to the branch: git push origin feature/amazing-feature
  5. Open a Pull Request

Development Guidelines

Area Requirements
Code Style Follow PEP 8 standards
Testing Add tests for new features
Documentation Update docs as needed
Type Safety Use type hints throughout
Commits Follow conventional commit messages

Package Upload

First time upload

pip install build twine
python -m build
twine upload dist/*

Subsequent uploads

rm -rf dist/ build/ *.egg-info/
python -m build
twine upload dist/*

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

Special thanks to the amazing open source projects that make xAgent possible:

  • OpenAI - GPT models powering our AI
  • FastAPI - Robust async API framework
  • Streamlit - Intuitive web interface
  • Redis - High-performance data storage
  • Langfuse - Observability and monitoring

๐Ÿ“ž Support & Community

Resource Link Purpose
๐Ÿ› Issues GitHub Issues Bug reports & feature requests
๐Ÿ’ฌ Discussions GitHub Discussions Community chat & Q&A
๐Ÿ“ง Email zhangjun310@live.com Direct support

xAgent - Empowering conversations with AI ๐Ÿš€

GitHub stars GitHub forks

Built with โค๏ธ for the AI community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

myxagent-0.1.1-py3-none-any.whl (40.6 kB view details)

Uploaded Python 3

File details

Details for the file myxagent-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: myxagent-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 40.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.11

File hashes

Hashes for myxagent-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 45253733920a52239778650a4eb5f31f598fb588a318dd985411b248a07ba2c1
MD5 b0f3868ccba846fbdc129c583c772822
BLAKE2b-256 b37fbc8f404b4235ca23ea40422ccd18e91f3b968a6dc7c208beb1d4eaedbf36

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page