Skip to main content

A robust, asynchronous multi-agent development framework

Project description

AgentSys

A robust, asynchronous multi-agent development framework designed for building scalable agent-based systems.

Overview

AgentSys is a Python framework that provides a flexible and powerful infrastructure for developing, testing, and deploying multi-agent systems. It focuses on:

  • Asynchronous Execution: Built from the ground up with async/await support
  • Robust Error Handling: Comprehensive error management and recovery
  • Flexible Routing: Multiple routing strategies for agent communication
  • Persistent Storage: Both in-memory and file-based storage options
  • Comprehensive Testing: Full test coverage with async support

Core Concepts

1. Agents

An agent in AgentSys is a self-contained unit that can:

  • Process tasks asynchronously
  • Maintain internal state
  • Communicate with other agents
  • Handle errors gracefully
  • Store and retrieve data

Example of a basic agent:

from agentsys.plugins.router import Agent, Task, ExecutionResult, AgentState

class DataProcessingAgent(Agent):
    def __init__(self, name: str, storage):
        super().__init__(name)
        self.storage = storage
        self.processed_count = 0

    async def process(self, task: Task) -> ExecutionResult:
        try:
            # Process the task
            result = await self._transform_data(task.input)
            
            # Store the result
            await self.storage.put(f"result_{task.id}", result)
            
            # Update internal state
            self.processed_count += 1
            
            return ExecutionResult(
                status=AgentState.COMPLETED,
                result=result
            )
        except Exception as e:
            return ExecutionResult(
                status=AgentState.FAILED,
                error=str(e)
            )

    async def _transform_data(self, data):
        # Custom data transformation logic
        return f"Processed: {data}"

2. Tasks

Tasks are the fundamental unit of work in AgentSys:

from agentsys.plugins.router import Task, TaskPriority

# Basic task
task = Task(
    input="raw_data",
    id="unique_id",  # Optional, auto-generated if not provided
    metadata={"source": "sensor_1"}  # Optional metadata
)

# Priority task
priority_task = Task(
    input="urgent_data",
    priority=TaskPriority.HIGH
)

# Task with routing hints
routed_task = Task(
    input="specialized_data",
    routing_key="data_processing"
)

3. Router

The router orchestrates communication between agents:

from agentsys.plugins.router import AgentRouter, RoutingConfig, RoutingStrategy

# Create router with custom configuration
router = AgentRouter(
    routing_config=RoutingConfig(
        strategy=RoutingStrategy.ROUND_ROBIN,
        timeout=60,
        max_retries=3
    )
)

# Register agents
data_agent = DataProcessingAgent("data_processor", storage)
await router.register_agent(data_agent)

# Register routes with decorators
@router.route("process_data")
async def process_data(task: Task):
    return await data_agent.process(task)

# Or register routes directly
router.add_route(
    route_key="backup_data",
    handler=data_agent.process
)

4. Storage

AgentSys provides flexible storage options:

from agentsys.plugins.storage import FileStorage, MemoryStorage, StorageConfig

# File storage with backup
file_storage = FileStorage(
    config=StorageConfig(
        base_path="./data",
        backup_enabled=True,
        backup_interval=3600
    )
)

# Memory storage for temporary data
memory_storage = MemoryStorage()

# Store and retrieve data
await storage.put("key", {"data": "value"})
data = await storage.get("key")
await storage.delete("key")

# List keys with prefix
keys = await storage.list(prefix="result_")

# Backup data
await storage.backup()

Building a Complete Agent System

Here's how to build a complete agent system:

import asyncio
from agentsys.plugins.router import (
    AgentRouter, Agent, Task, ExecutionResult,
    AgentState, RoutingConfig, RoutingStrategy
)
from agentsys.plugins.storage import FileStorage, StorageConfig

# 1. Define your agent
class AnalyticsAgent(Agent):
    def __init__(self, name: str, storage):
        super().__init__(name)
        self.storage = storage
    
    async def process(self, task: Task) -> ExecutionResult:
        try:
            # Process analytics
            result = await self._analyze(task.input)
            
            # Store results
            await self.storage.put(
                f"analytics_{task.id}",
                result
            )
            
            return ExecutionResult(
                status=AgentState.COMPLETED,
                result=result
            )
        except Exception as e:
            return ExecutionResult(
                status=AgentState.FAILED,
                error=str(e)
            )
    
    async def _analyze(self, data):
        # Your analytics logic here
        return {"analyzed": data}

# 2. Set up storage
storage = FileStorage(
    config=StorageConfig(
        base_path="./data",
        backup_enabled=True
    )
)

# 3. Configure and create router
router = AgentRouter(
    routing_config=RoutingConfig(
        strategy=RoutingStrategy.ROUND_ROBIN,
        timeout=60
    )
)

# 4. Create and register agent
analytics_agent = AnalyticsAgent("analytics_1", storage)
await router.register_agent(analytics_agent)

# 5. Define routes
@router.route("analyze_data")
async def analyze_data(task: Task):
    return await analytics_agent.process(task)

# 6. Submit tasks
async def main():
    try:
        # Create task
        task = Task(
            input={"data": "sample"},
            metadata={"priority": "high"}
        )
        
        # Submit and wait for result
        result = await router.submit_task(task)
        
        # Check result
        if result.status == AgentState.COMPLETED:
            print(f"Analysis complete: {result.result}")
        else:
            print(f"Analysis failed: {result.error}")
    finally:
        # Clean up
        await router.stop()

# 7. Run the system
if __name__ == "__main__":
    asyncio.run(main())

Testing Your Agents

Comprehensive testing example:

import pytest
from agentsys.plugins.router import AgentRouter, Task, AgentState

@pytest.fixture
async def router_instance():
    router = AgentRouter()
    try:
        yield router
    finally:
        await router.stop()

@pytest.fixture
async def analytics_agent(router_instance):
    agent = AnalyticsAgent("test_agent", MemoryStorage())
    await router_instance.register_agent(agent)
    return agent

@pytest.mark.asyncio
async def test_analytics_agent(router_instance, analytics_agent):
    # Test data
    test_data = {"test": "data"}
    
    # Create and submit task
    task = Task(input=test_data)
    result = await router_instance.submit_task(task)
    
    # Verify result
    assert result.status == AgentState.COMPLETED
    assert "analyzed" in result.result
    assert result.error is None

Installation

Requires Python 3.11+

pip install agentsys

Core Components

Router

The router is the central component that manages agent communication and task execution:

from agentsys.plugins.router import AgentRouter, Route, Task, RoutingConfig

# Initialize router
router = AgentRouter(routing_config=RoutingConfig())

# Register routes
@router.route("process_data")
async def process_data(task: Task):
    result = await process(task.input)
    return ExecutionResult(
        status=AgentState.COMPLETED,
        result=result
    )

# Submit tasks
task = Task(input="test_input")
result = await router.submit_task(task)

Storage

Flexible storage options for persisting agent state and data:

from agentsys.plugins.storage import FileStorage, MemoryStorage, StorageConfig

# File-based storage
storage = FileStorage(config=StorageConfig(
    base_path="./data",
    backup_enabled=True
))

# Memory storage
memory_storage = MemoryStorage()

# Store and retrieve data
await storage.put("key", "value")
value = await storage.get("key")

Routing Strategies

Multiple routing strategies available:

  • RoundRobin: Distributes tasks evenly across agents
  • LeastLoaded: Routes to agents with lowest workload
  • Priority: Routes based on task priority
  • Custom: Implement your own routing logic
from agentsys.plugins.router import RoutingStrategy

config = RoutingConfig(
    strategy=RoutingStrategy.ROUND_ROBIN
)

Best Practices

  1. Agent Design

    • Keep agents focused on a single responsibility
    • Implement proper error handling
    • Use storage for persistence
    • Clean up resources properly
  2. Task Management

    • Include relevant metadata
    • Set appropriate priorities
    • Use routing keys for specific agents
    • Handle task timeouts
  3. Resource Management

    • Always use try/finally blocks
    • Stop routers explicitly
    • Clean up storage resources
    • Use async context managers
  4. Error Handling

    • Return proper ExecutionResult objects
    • Log errors with context
    • Implement retry mechanisms
    • Use timeouts for long operations
  5. Testing

    • Test both success and failure cases
    • Use proper async fixtures
    • Clean up resources in tests
    • Test with different routing strategies

Configuration

Example configuration for production:

router_config = RoutingConfig(
    strategy=RoutingStrategy.ROUND_ROBIN,
    timeout=60,
    max_retries=3,
    max_queue_size=1000,
    logging_enabled=True
)

storage_config = StorageConfig(
    base_path="./data",
    backup_enabled=True,
    backup_interval=3600,
    compression_enabled=True,
    max_size_mb=1024
)

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Submit a pull request

License

MIT License

Requirements

  • Python 3.11+
  • pytest-asyncio
  • pydantic

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentsys-0.2.0.tar.gz (18.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentsys-0.2.0-py3-none-any.whl (25.5 kB view details)

Uploaded Python 3

File details

Details for the file agentsys-0.2.0.tar.gz.

File metadata

  • Download URL: agentsys-0.2.0.tar.gz
  • Upload date:
  • Size: 18.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.5

File hashes

Hashes for agentsys-0.2.0.tar.gz
Algorithm Hash digest
SHA256 6274bb4b083a8a4f00b7581f20331c97a29b3ef384ae0c508e549bc7d7a95eae
MD5 9b52377de74bdfa9a23328c27dbd18cc
BLAKE2b-256 ccb30e94e9d477f3d4bde244f81e965f9c4313f1da7d8f77b376bc80ada291b3

See more details on using hashes here.

File details

Details for the file agentsys-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: agentsys-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 25.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.5

File hashes

Hashes for agentsys-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 33cd1db5066a034c271f939cc0d88f73abe97ab52f72872563c4ddd1d0c3aa3e
MD5 c1b0d6e03069a5198721520a4e995240
BLAKE2b-256 33cc6ed4fe3eb05fadd9e6ecb67f3030ea7e9622b1404c8b9d00e61a450703ca

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page