A robust, asynchronous multi-agent development framework
Project description
AgentSys
A robust, asynchronous multi-agent development framework designed for building scalable agent-based systems.
Overview
AgentSys is a Python framework that provides a flexible and powerful infrastructure for developing, testing, and deploying multi-agent systems. It focuses on:
- Asynchronous Execution: Built from the ground up with async/await support
- Robust Error Handling: Comprehensive error management and recovery
- Flexible Routing: Multiple routing strategies for agent communication
- Persistent Storage: Both in-memory and file-based storage options
- Comprehensive Testing: Full test coverage with async support
Core Concepts
1. Agents
An agent in AgentSys is a self-contained unit that can:
- Process tasks asynchronously
- Maintain internal state
- Communicate with other agents
- Handle errors gracefully
- Store and retrieve data
Example of a basic agent:
from agentsys.plugins.router import Agent, Task, ExecutionResult, AgentState
class DataProcessingAgent(Agent):
def __init__(self, name: str, storage):
super().__init__(name)
self.storage = storage
self.processed_count = 0
async def process(self, task: Task) -> ExecutionResult:
try:
# Process the task
result = await self._transform_data(task.input)
# Store the result
await self.storage.put(f"result_{task.id}", result)
# Update internal state
self.processed_count += 1
return ExecutionResult(
status=AgentState.COMPLETED,
result=result
)
except Exception as e:
return ExecutionResult(
status=AgentState.FAILED,
error=str(e)
)
async def _transform_data(self, data):
# Custom data transformation logic
return f"Processed: {data}"
2. Tasks
Tasks are the fundamental unit of work in AgentSys:
from agentsys.plugins.router import Task, TaskPriority
# Basic task
task = Task(
input="raw_data",
id="unique_id", # Optional, auto-generated if not provided
metadata={"source": "sensor_1"} # Optional metadata
)
# Priority task
priority_task = Task(
input="urgent_data",
priority=TaskPriority.HIGH
)
# Task with routing hints
routed_task = Task(
input="specialized_data",
routing_key="data_processing"
)
3. Router
The router orchestrates communication between agents:
from agentsys.plugins.router import AgentRouter, RoutingConfig, RoutingStrategy
# Create router with custom configuration
router = AgentRouter(
routing_config=RoutingConfig(
strategy=RoutingStrategy.ROUND_ROBIN,
timeout=60,
max_retries=3
)
)
# Register agents
data_agent = DataProcessingAgent("data_processor", storage)
await router.register_agent(data_agent)
# Register routes with decorators
@router.route("process_data")
async def process_data(task: Task):
return await data_agent.process(task)
# Or register routes directly
router.add_route(
route_key="backup_data",
handler=data_agent.process
)
4. Storage
AgentSys provides flexible storage options:
from agentsys.plugins.storage import FileStorage, MemoryStorage, StorageConfig
# File storage with backup
file_storage = FileStorage(
config=StorageConfig(
base_path="./data",
backup_enabled=True,
backup_interval=3600
)
)
# Memory storage for temporary data
memory_storage = MemoryStorage()
# Store and retrieve data
await storage.put("key", {"data": "value"})
data = await storage.get("key")
await storage.delete("key")
# List keys with prefix
keys = await storage.list(prefix="result_")
# Backup data
await storage.backup()
Building a Complete Agent System
Here's how to build a complete agent system:
import asyncio
from agentsys.plugins.router import (
AgentRouter, Agent, Task, ExecutionResult,
AgentState, RoutingConfig, RoutingStrategy
)
from agentsys.plugins.storage import FileStorage, StorageConfig
# 1. Define your agent
class AnalyticsAgent(Agent):
def __init__(self, name: str, storage):
super().__init__(name)
self.storage = storage
async def process(self, task: Task) -> ExecutionResult:
try:
# Process analytics
result = await self._analyze(task.input)
# Store results
await self.storage.put(
f"analytics_{task.id}",
result
)
return ExecutionResult(
status=AgentState.COMPLETED,
result=result
)
except Exception as e:
return ExecutionResult(
status=AgentState.FAILED,
error=str(e)
)
async def _analyze(self, data):
# Your analytics logic here
return {"analyzed": data}
# 2. Set up storage
storage = FileStorage(
config=StorageConfig(
base_path="./data",
backup_enabled=True
)
)
# 3. Configure and create router
router = AgentRouter(
routing_config=RoutingConfig(
strategy=RoutingStrategy.ROUND_ROBIN,
timeout=60
)
)
# 4. Create and register agent
analytics_agent = AnalyticsAgent("analytics_1", storage)
await router.register_agent(analytics_agent)
# 5. Define routes
@router.route("analyze_data")
async def analyze_data(task: Task):
return await analytics_agent.process(task)
# 6. Submit tasks
async def main():
try:
# Create task
task = Task(
input={"data": "sample"},
metadata={"priority": "high"}
)
# Submit and wait for result
result = await router.submit_task(task)
# Check result
if result.status == AgentState.COMPLETED:
print(f"Analysis complete: {result.result}")
else:
print(f"Analysis failed: {result.error}")
finally:
# Clean up
await router.stop()
# 7. Run the system
if __name__ == "__main__":
asyncio.run(main())
Testing Your Agents
Comprehensive testing example:
import pytest
from agentsys.plugins.router import AgentRouter, Task, AgentState
@pytest.fixture
async def router_instance():
router = AgentRouter()
try:
yield router
finally:
await router.stop()
@pytest.fixture
async def analytics_agent(router_instance):
agent = AnalyticsAgent("test_agent", MemoryStorage())
await router_instance.register_agent(agent)
return agent
@pytest.mark.asyncio
async def test_analytics_agent(router_instance, analytics_agent):
# Test data
test_data = {"test": "data"}
# Create and submit task
task = Task(input=test_data)
result = await router_instance.submit_task(task)
# Verify result
assert result.status == AgentState.COMPLETED
assert "analyzed" in result.result
assert result.error is None
Installation
Requires Python 3.11+
pip install agentsys
Core Components
Router
The router is the central component that manages agent communication and task execution:
from agentsys.plugins.router import AgentRouter, Route, Task, RoutingConfig
# Initialize router
router = AgentRouter(routing_config=RoutingConfig())
# Register routes
@router.route("process_data")
async def process_data(task: Task):
result = await process(task.input)
return ExecutionResult(
status=AgentState.COMPLETED,
result=result
)
# Submit tasks
task = Task(input="test_input")
result = await router.submit_task(task)
Storage
Flexible storage options for persisting agent state and data:
from agentsys.plugins.storage import FileStorage, MemoryStorage, StorageConfig
# File-based storage
storage = FileStorage(config=StorageConfig(
base_path="./data",
backup_enabled=True
))
# Memory storage
memory_storage = MemoryStorage()
# Store and retrieve data
await storage.put("key", "value")
value = await storage.get("key")
Routing Strategies
Multiple routing strategies available:
- RoundRobin: Distributes tasks evenly across agents
- LeastLoaded: Routes to agents with lowest workload
- Priority: Routes based on task priority
- Custom: Implement your own routing logic
from agentsys.plugins.router import RoutingStrategy
config = RoutingConfig(
strategy=RoutingStrategy.ROUND_ROBIN
)
Best Practices
-
Agent Design
- Keep agents focused on a single responsibility
- Implement proper error handling
- Use storage for persistence
- Clean up resources properly
-
Task Management
- Include relevant metadata
- Set appropriate priorities
- Use routing keys for specific agents
- Handle task timeouts
-
Resource Management
- Always use try/finally blocks
- Stop routers explicitly
- Clean up storage resources
- Use async context managers
-
Error Handling
- Return proper ExecutionResult objects
- Log errors with context
- Implement retry mechanisms
- Use timeouts for long operations
-
Testing
- Test both success and failure cases
- Use proper async fixtures
- Clean up resources in tests
- Test with different routing strategies
Configuration
Example configuration for production:
router_config = RoutingConfig(
strategy=RoutingStrategy.ROUND_ROBIN,
timeout=60,
max_retries=3,
max_queue_size=1000,
logging_enabled=True
)
storage_config = StorageConfig(
base_path="./data",
backup_enabled=True,
backup_interval=3600,
compression_enabled=True,
max_size_mb=1024
)
Contributing
- Fork the repository
- Create a feature branch
- Submit a pull request
License
MIT License
Requirements
- Python 3.11+
- pytest-asyncio
- pydantic
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentsys-0.2.0.tar.gz.
File metadata
- Download URL: agentsys-0.2.0.tar.gz
- Upload date:
- Size: 18.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6274bb4b083a8a4f00b7581f20331c97a29b3ef384ae0c508e549bc7d7a95eae
|
|
| MD5 |
9b52377de74bdfa9a23328c27dbd18cc
|
|
| BLAKE2b-256 |
ccb30e94e9d477f3d4bde244f81e965f9c4313f1da7d8f77b376bc80ada291b3
|
File details
Details for the file agentsys-0.2.0-py3-none-any.whl.
File metadata
- Download URL: agentsys-0.2.0-py3-none-any.whl
- Upload date:
- Size: 25.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
33cd1db5066a034c271f939cc0d88f73abe97ab52f72872563c4ddd1d0c3aa3e
|
|
| MD5 |
c1b0d6e03069a5198721520a4e995240
|
|
| BLAKE2b-256 |
33cc6ed4fe3eb05fadd9e6ecb67f3030ea7e9622b1404c8b9d00e61a450703ca
|