Skip to main content

A robust, asynchronous multi-agent development framework

Project description

AgentSys

A robust, asynchronous multi-agent development framework designed for building scalable agent-based systems.

Overview

AgentSys is a Python framework that provides a flexible and powerful infrastructure for developing, testing, and deploying multi-agent systems. It focuses on:

  • Asynchronous Execution: Built from the ground up with async/await support
  • Robust Error Handling: Comprehensive error management and recovery
  • Flexible Routing: Multiple routing strategies for agent communication
  • Persistent Storage: Both in-memory and file-based storage options
  • Comprehensive Testing: Full test coverage with async support

Core Concepts

1. Agents

An agent in AgentSys is a self-contained unit that can:

  • Process tasks asynchronously
  • Maintain internal state
  • Communicate with other agents
  • Handle errors gracefully
  • Store and retrieve data

Example of a basic agent:

from agentsys.plugins.router import Agent, Task, ExecutionResult, AgentState

class DataProcessingAgent(Agent):
    def __init__(self, name: str, storage):
        super().__init__(name)
        self.storage = storage
        self.processed_count = 0

    async def process(self, task: Task) -> ExecutionResult:
        try:
            # Process the task
            result = await self._transform_data(task.input)
            
            # Store the result
            await self.storage.put(f"result_{task.id}", result)
            
            # Update internal state
            self.processed_count += 1
            
            return ExecutionResult(
                status=AgentState.COMPLETED,
                result=result
            )
        except Exception as e:
            return ExecutionResult(
                status=AgentState.FAILED,
                error=str(e)
            )

    async def _transform_data(self, data):
        # Custom data transformation logic
        return f"Processed: {data}"

2. Tasks

Tasks are the fundamental unit of work in AgentSys:

from agentsys.plugins.router import Task, TaskPriority

# Basic task
task = Task(
    input="raw_data",
    id="unique_id",  # Optional, auto-generated if not provided
    metadata={"source": "sensor_1"}  # Optional metadata
)

# Priority task
priority_task = Task(
    input="urgent_data",
    priority=TaskPriority.HIGH
)

# Task with routing hints
routed_task = Task(
    input="specialized_data",
    routing_key="data_processing"
)

3. Router

The router orchestrates communication between agents:

from agentsys.plugins.router import AgentRouter, RoutingConfig, RoutingStrategy

# Create router with custom configuration
router = AgentRouter(
    routing_config=RoutingConfig(
        strategy=RoutingStrategy.ROUND_ROBIN,
        timeout=60,
        max_retries=3
    )
)

# Register agents
data_agent = DataProcessingAgent("data_processor", storage)
await router.register_agent(data_agent)

# Register routes with decorators
@router.route("process_data")
async def process_data(task: Task):
    return await data_agent.process(task)

# Or register routes directly
router.add_route(
    route_key="backup_data",
    handler=data_agent.process
)

4. Storage

AgentSys provides flexible storage options:

from agentsys.plugins.storage import FileStorage, MemoryStorage, StorageConfig

# File storage with backup
file_storage = FileStorage(
    config=StorageConfig(
        base_path="./data",
        backup_enabled=True,
        backup_interval=3600
    )
)

# Memory storage for temporary data
memory_storage = MemoryStorage()

# Store and retrieve data
await storage.put("key", {"data": "value"})
data = await storage.get("key")
await storage.delete("key")

# List keys with prefix
keys = await storage.list(prefix="result_")

# Backup data
await storage.backup()

Building a Complete Agent System

Here's how to build a complete agent system:

import asyncio
from agentsys.plugins.router import (
    AgentRouter, Agent, Task, ExecutionResult,
    AgentState, RoutingConfig, RoutingStrategy
)
from agentsys.plugins.storage import FileStorage, StorageConfig

# 1. Define your agent
class AnalyticsAgent(Agent):
    def __init__(self, name: str, storage):
        super().__init__(name)
        self.storage = storage
    
    async def process(self, task: Task) -> ExecutionResult:
        try:
            # Process analytics
            result = await self._analyze(task.input)
            
            # Store results
            await self.storage.put(
                f"analytics_{task.id}",
                result
            )
            
            return ExecutionResult(
                status=AgentState.COMPLETED,
                result=result
            )
        except Exception as e:
            return ExecutionResult(
                status=AgentState.FAILED,
                error=str(e)
            )
    
    async def _analyze(self, data):
        # Your analytics logic here
        return {"analyzed": data}

# 2. Set up storage
storage = FileStorage(
    config=StorageConfig(
        base_path="./data",
        backup_enabled=True
    )
)

# 3. Configure and create router
router = AgentRouter(
    routing_config=RoutingConfig(
        strategy=RoutingStrategy.ROUND_ROBIN,
        timeout=60
    )
)

# 4. Create and register agent
analytics_agent = AnalyticsAgent("analytics_1", storage)
await router.register_agent(analytics_agent)

# 5. Define routes
@router.route("analyze_data")
async def analyze_data(task: Task):
    return await analytics_agent.process(task)

# 6. Submit tasks
async def main():
    try:
        # Create task
        task = Task(
            input={"data": "sample"},
            metadata={"priority": "high"}
        )
        
        # Submit and wait for result
        result = await router.submit_task(task)
        
        # Check result
        if result.status == AgentState.COMPLETED:
            print(f"Analysis complete: {result.result}")
        else:
            print(f"Analysis failed: {result.error}")
    finally:
        # Clean up
        await router.stop()

# 7. Run the system
if __name__ == "__main__":
    asyncio.run(main())

Testing Your Agents

Comprehensive testing example:

import pytest
from agentsys.plugins.router import AgentRouter, Task, AgentState

@pytest.fixture
async def router_instance():
    router = AgentRouter()
    try:
        yield router
    finally:
        await router.stop()

@pytest.fixture
async def analytics_agent(router_instance):
    agent = AnalyticsAgent("test_agent", MemoryStorage())
    await router_instance.register_agent(agent)
    return agent

@pytest.mark.asyncio
async def test_analytics_agent(router_instance, analytics_agent):
    # Test data
    test_data = {"test": "data"}
    
    # Create and submit task
    task = Task(input=test_data)
    result = await router_instance.submit_task(task)
    
    # Verify result
    assert result.status == AgentState.COMPLETED
    assert "analyzed" in result.result
    assert result.error is None

Installation

Requires Python 3.11+

pip install agentsys

Core Components

Router

The router is the central component that manages agent communication and task execution:

from agentsys.plugins.router import AgentRouter, Route, Task, RoutingConfig

# Initialize router
router = AgentRouter(routing_config=RoutingConfig())

# Register routes
@router.route("process_data")
async def process_data(task: Task):
    result = await process(task.input)
    return ExecutionResult(
        status=AgentState.COMPLETED,
        result=result
    )

# Submit tasks
task = Task(input="test_input")
result = await router.submit_task(task)

Storage

Flexible storage options for persisting agent state and data:

from agentsys.plugins.storage import FileStorage, MemoryStorage, StorageConfig

# File-based storage
storage = FileStorage(config=StorageConfig(
    base_path="./data",
    backup_enabled=True
))

# Memory storage
memory_storage = MemoryStorage()

# Store and retrieve data
await storage.put("key", "value")
value = await storage.get("key")

Routing Strategies

Multiple routing strategies available:

  • RoundRobin: Distributes tasks evenly across agents
  • LeastLoaded: Routes to agents with lowest workload
  • Priority: Routes based on task priority
  • Custom: Implement your own routing logic
from agentsys.plugins.router import RoutingStrategy

config = RoutingConfig(
    strategy=RoutingStrategy.ROUND_ROBIN
)

Best Practices

  1. Agent Design

    • Keep agents focused on a single responsibility
    • Implement proper error handling
    • Use storage for persistence
    • Clean up resources properly
  2. Task Management

    • Include relevant metadata
    • Set appropriate priorities
    • Use routing keys for specific agents
    • Handle task timeouts
  3. Resource Management

    • Always use try/finally blocks
    • Stop routers explicitly
    • Clean up storage resources
    • Use async context managers
  4. Error Handling

    • Return proper ExecutionResult objects
    • Log errors with context
    • Implement retry mechanisms
    • Use timeouts for long operations
  5. Testing

    • Test both success and failure cases
    • Use proper async fixtures
    • Clean up resources in tests
    • Test with different routing strategies

Configuration

Example configuration for production:

router_config = RoutingConfig(
    strategy=RoutingStrategy.ROUND_ROBIN,
    timeout=60,
    max_retries=3,
    max_queue_size=1000,
    logging_enabled=True
)

storage_config = StorageConfig(
    base_path="./data",
    backup_enabled=True,
    backup_interval=3600,
    compression_enabled=True,
    max_size_mb=1024
)

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Submit a pull request

License

MIT License

Requirements

  • Python 3.11+
  • pytest-asyncio
  • pydantic

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentsys-0.3.0.tar.gz (564.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentsys-0.3.0-py3-none-any.whl (700.1 kB view details)

Uploaded Python 3

File details

Details for the file agentsys-0.3.0.tar.gz.

File metadata

  • Download URL: agentsys-0.3.0.tar.gz
  • Upload date:
  • Size: 564.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.8

File hashes

Hashes for agentsys-0.3.0.tar.gz
Algorithm Hash digest
SHA256 3a357bc8b911a1b848598d7e15a02cfc9cc8816068324484c2652a5b58fc3314
MD5 c5d61c7e93fbbb999f041490a8b42c6b
BLAKE2b-256 cff9adbd5391f764396f80515cf7df77a11103dca985377e6ffcab75c70d9587

See more details on using hashes here.

File details

Details for the file agentsys-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: agentsys-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 700.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.8

File hashes

Hashes for agentsys-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e827102780c94b216f1c728015a1611db146734057afae1532cfc05bd264f75a
MD5 474eee618feb10654aa1c4b4b0c12d26
BLAKE2b-256 25a5026e5bb3e9633ebd798da25a779f93aa076d57b7802e156a1143db61a313

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page