A framework for building LLM-powered applications with intelligent agents, task decomposition, and RAG
Project description
MAS - Multi-Agent System Framework for LLM Applications
A robust, modular, and extensible framework for building LLM-powered applications with intelligent agents, tool integration, task decomposition, and dynamic workflows.
Features
- 🤖 Intelligent Agents - Create autonomous agents with think-act-observe cycle and built-in state management
- 🛠️ Tool System - Flexible registry for tools with automatic parameter discovery and documentation
- 🧩 Task Decomposition - Break down complex tasks into manageable subtasks
- 📚 Retrieval Augmented Generation (RAG) - Enhance LLM responses with relevant context and document chunking
- 🔄 Dynamic Flows - Create and modify workflows with visualization and dependency management
- 🔌 Provider Architecture - Modular support for OpenAI, HuggingFace, Ollama, and custom providers
- 💾 Vector Store Integration - FAISS and Chroma support with consistent interfaces
- 🧠 Memory and Middleware - Built-in memory middleware and extensible middleware architecture
- ⚡ Async First - Fully asynchronous architecture with proper error handling
- ✅ Formally Verified - Core flow execution logic validated through formal verification
Installation
Basic Installation
# Install using poetry
poetry install
# Or using pip
pip install .
Installing with Specific Features
# Install with OpenAI support
poetry install --extras openai
# Install with HuggingFace support
poetry install --extras huggingface
# Install with Vector Store support
poetry install --extras vector-stores
# Install all features
poetry install --extras all
Vector Store Dependencies
To use specific vector stores, install the corresponding extras:
# For FAISS
poetry install --extras faiss
# For Chroma
poetry install --extras chroma
# For all vector stores
poetry install --extras vector-stores
Quick Start
Here's a simple example using a tool-using agent with Ollama:
import asyncio
from mas.core.agent import Agent, AgentRegistry, Tool
from mas.core.llm import LLMNode, LLMConfig
# Define a custom agent
@AgentRegistry.register
class ResearchAgent(Agent):
async def think(self, context):
# Process input and plan next steps
query = context.get("query", "")
return {"plan": f"Research information about {query}"}
async def act(self, decision):
# Execute the research plan
plan = decision.get("plan", "")
result = await self.tools["search"](plan)
return {"search_results": result}
async def observe(self, result):
# Process and summarize the results
search_results = result.get("search_results", "")
return {"summary": f"Based on research: {search_results}"}
async def main():
# Create LLM configuration
config = LLMConfig(
provider_name="ollama",
provider_config={
"model": "llama3",
"base_url": "http://localhost:11434/v1"
},
temperature=0.7
)
# Create LLM node for the agent
llm_node = LLMNode("llm", config)
# Create agent
agent = ResearchAgent("researcher")
# Register tools
@AgentRegistry.register_tool(name="search", description="Search for information")
async def search(query):
# In a real application, this would perform an actual search
return f"Search results for {query}"
# Add tool to agent
agent.add_capability("search")
# Process a query
result = await agent.process({"query": "quantum computing"})
print(result["observation"]["summary"])
if __name__ == "__main__":
asyncio.run(main())
Core Components
Agent System
Agents follow a think-act-observe cycle with improved state management:
from mas.core.agent import Agent, AgentRegistry, Tool
@AgentRegistry.register
class AnalysisAgent(Agent):
async def think(self, context):
# Process information and make decisions
return {"decision": "analyze_data"}
async def act(self, decision):
# Execute actions based on decisions
if decision["decision"] == "analyze_data":
# Use registered tools
result = await self.tools["data_analysis"](context["data"])
return {"result": result}
async def observe(self, result):
# Process results and update state
self.set_state({"last_analysis": result["result"]})
return {"observation": "Analysis complete"}
# Register a tool
@AgentRegistry.register_tool(name="data_analysis", description="Analyze data")
async def analyze_data(data):
# Analysis implementation
return {"insights": "Data analysis results"}
RAG Integration
Enhanced RAG with document chunking and reranking:
from mas.core.rag import RAGNode, RAGConfig, DocumentLoader, chunk_document
from mas.core.models import Document
from mas.core.llm import LLMNode, LLMConfig
from mas.core.vectorstores.factory import create_vectorstore
# Create embedding model
embedding_config = LLMConfig(
provider_name="openai",
provider_config={"model": "text-embedding-ada-002"},
)
embedding_node = LLMNode("embeddings", embedding_config)
# Create LLM for generation
llm_config = LLMConfig(
provider_name="openai",
provider_config={"model": "gpt-4o-mini"},
)
llm_node = LLMNode("llm", llm_config)
# Create document loader with chunking preprocessor
document_loader = DocumentLoader(
name="loader",
preprocessors=[chunk_document(max_length=500, overlap=50)]
)
# Initialize RAG node
rag = RAGNode(
name="research_rag",
config=RAGConfig(
vectorstore_type="faiss",
vectorstore_config={
"dimension": 1536,
"similarity_metric": "cosine"
},
num_results=5,
rerank_results=True
),
embedding_node=embedding_node,
llm_node=llm_node
)
# Load and add documents
documents = await document_loader.process({
"documents": ["Document 1 content", "Document 2 content"]
})
await rag.add_documents(documents["documents"])
# Query with RAG
result = await rag.process({
"query": "What are the key concepts?"
})
print(result["response"])
Flow Orchestration
Create complex, dynamic workflows with dependency management and validation:
from mas.core.flow import Flow
from mas.core.llm import LLMNode, LLMConfig
from mas.core.rag import RAGNode, RAGConfig
# Create a flow
flow = Flow(name="research_flow", description="Research and analysis flow")
# Add nodes
reader = DocumentLoader(name="reader")
flow.add_node(reader)
embedder = LLMNode("embedder", embedding_config)
flow.add_node(embedder)
rag = RAGNode(name="rag", config=rag_config, embedding_node=embedder)
flow.add_node(rag)
analyzer = LLMNode("analyzer", llm_config)
flow.add_node(analyzer)
# Connect nodes by name (simplified API)
flow.connect_nodes("reader", "rag", "docs_to_rag")
flow.connect_nodes("rag", "analyzer", "context_to_analyzer")
# Process flow
result = await flow.process({
"documents": ["Document 1", "Document 2"],
"query": "Analyze the trend in these documents"
})
# Visualize flow
flow_viz = flow.visualize()
Advanced Features
Middleware System
Use middleware to enhance provider capabilities:
from mas.core.chat import MemoryMiddleware, SimpleMemory
from mas.core.providers.middleware import MiddlewareProvider
# Create memory and middleware
memory = SimpleMemory()
middleware = MemoryMiddleware(memory)
# Add to provider
provider.add_middleware(middleware)
# Configure memory operations in message metadata
message = Message(
role="user",
content="Recall what I told you about preferences",
metadata={
"requires_memory": ["user_preferences"]
}
)
# Memory will be automatically injected into prompt
Provider Registration
Create and register custom LLM providers:
from mas.core.providers.factory import register_provider
from mas.core.providers.base import BaseLLMProvider
@register_provider("custom")
class CustomProvider(BaseLLMProvider):
"""Custom provider implementation."""
provider_name = "custom"
supports_streaming = True
supports_embeddings = True
default_embedding_dimension = 768
async def initialize(self):
# Initialize resources
self._client = await setup_client(self.config)
await super().initialize() # Set _initialized flag
async def cleanup(self):
# Clean up resources
await self._client.close()
await super().cleanup() # Clear _initialized flag
async def generate(self, prompt, temperature=0.7, **kwargs):
await self._ensure_initialized()
# Generate completion
response = await self._client.complete(prompt, temperature)
return response.text
async def stream_generate(self, prompt, **kwargs):
await self._ensure_initialized()
# Stream completion
async for chunk in self._client.stream(prompt):
yield chunk
async def embed(self, text):
await self._ensure_initialized()
# Generate embeddings
embedding = await self._client.embed(text)
return embedding
Vector Store Registration
Create and register custom vector stores:
from mas.core.vectorstores.factory import register_vectorstore
from mas.core.vectorstores.base import VectorStoreProvider
from mas.core.models import Document
from mas.core.llm import LLMNode
@register_vectorstore("custom_store")
class CustomVectorStore(VectorStoreProvider):
"""Custom vector store implementation."""
def __init__(self, config: Dict[str, Any], embedding_node: LLMNode):
super().__init__(config, embedding_node)
# Initialize your vector store-specific attributes
async def initialize(self):
# Initialize the vector store
# Your initialization logic here
await super().initialize()
async def cleanup(self):
# Cleanup resources
# Your cleanup logic here
await super().cleanup()
async def add_documents(self, documents: List[Document]) -> int:
await self._ensure_initialized()
# Add documents to the vector store
# Return number of documents added
return len(documents)
async def similarity_search(self, query: str, k: int = 4,
filter: Optional[Dict[str, Any]] = None) -> List[Document]:
await self._ensure_initialized()
# Perform similarity search
# Return list of matching documents
async def delete(self, ids_or_filter: Any) -> int:
await self._ensure_initialized()
# Delete documents
# Return number of documents deleted
Batch and Stream Processing
Efficient batch processing and streaming:
# Batch processing
results = await flow.batch_process([
{"query": "Question 1", "documents": ["Doc A", "Doc B"]},
{"query": "Question 2", "documents": ["Doc C", "Doc D"]},
], batch_size=5)
# Streaming
async for chunk in llm_node.stream_generate("Explain quantum computing"):
print(chunk, end="", flush=True)
Formal Verification
The flow execution engine has been formally verified to guarantee several critical properties:
- ✓ acyclic: All flows are guaranteed to be acyclic, preventing infinite execution loops
- ✓ reachability: All nodes in the flow can be reached from input nodes
- ✓ deterministic_execution: Flow execution is deterministic given the same inputs
- ✓ resource_safety: All resources are properly initialized and cleaned up
- ✓ parallel_safety: Parallel node execution is safe and properly synchronized
Error Handling
Improved error handling with custom exceptions:
try:
result = await flow.process(inputs)
except FlowExecutionError as e:
print(f"Error in flow: {e}")
print(f"Failed node: {e.node_name}")
print(f"Details: {e.details}")
except RAGError as e:
print(f"RAG error: {e}")
except AgentError as e:
print(f"Agent error: {e}")
except ProviderError as e:
print(f"Provider error: {e}")
Contributing
Contributions are welcome! Please read our contributing guidelines for details.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file manas_ai-0.1.0.tar.gz.
File metadata
- Download URL: manas_ai-0.1.0.tar.gz
- Upload date:
- Size: 42.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.167.4-microsoft-standard-WSL2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4b86745a4299e7229458deb333e70d23e963bcb18f10e6afad120e32c30ba065
|
|
| MD5 |
285096a9904734bbffde5d9e654fa328
|
|
| BLAKE2b-256 |
68126ef6eff61739aff5adb4f8b7907c616c4839ae598d8394c10282d890c9d1
|
File details
Details for the file manas_ai-0.1.0-py3-none-any.whl.
File metadata
- Download URL: manas_ai-0.1.0-py3-none-any.whl
- Upload date:
- Size: 54.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.167.4-microsoft-standard-WSL2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e7f1429194473daa9234c569eb3a5132d083172cd4be093bdaf0f878bcd363f8
|
|
| MD5 |
3af6fcbdca8856165d53b05297995073
|
|
| BLAKE2b-256 |
a9d59569e2af769255fc0b365e1096faae5042aeed3c04398019815e7d76f076
|