Skip to main content

A lightweight framework for building AI agent systems

Project description

LiteSwarm

LiteSwarm is a lightweight, extensible framework for building AI agent systems. It provides a minimal yet powerful foundation for creating both simple chatbots and complex agent teams, with customization possible at every level.

The framework is LLM-agnostic and supports 100+ language models through litellm, including:

  • OpenAI
  • Anthropic (Claude)
  • Google (Gemini)
  • Azure OpenAI
  • AWS Bedrock
  • And many more

Quick Navigation

Installation

Choose your preferred installation method:

Using pip:

pip install liteswarm

Using uv (recommended for faster installation):

uv pip install liteswarm

Using poetry:

poetry add liteswarm

Using pipx (for CLI tools):

pipx install liteswarm

Requirements

  • Python 3.11 or higher
  • Async support (asyncio)
  • A valid API key for your chosen LLM provider

API Keys

You can provide your API key in two ways:

  1. Through environment variables:

    # For OpenAI
    export OPENAI_API_KEY=sk-...
    # For Anthropic
    export ANTHROPIC_API_KEY=sk-ant-...
    # For Google
    export GOOGLE_API_KEY=...
    

    or using os.environ:

    import os
    
    # For OpenAI
    os.environ["OPENAI_API_KEY"] = "sk-..."
    # For Anthropic
    os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
    # For Google
    os.environ["GOOGLE_API_KEY"] = "..."
    
  2. Using a .env file:

    OPENAI_API_KEY=sk-...
    ANTHROPIC_API_KEY=sk-ant-...
    GOOGLE_API_KEY=...
    
  3. Using the LLM class:

    from liteswarm.types import LLM
    
    llm = LLM(
        model="gpt-4o",
        api_key="sk-...", # or api_base, api_version, etc.
    )
    

See litellm's documentation for a complete list of supported providers and their environment variables.

Key Features

  • Lightweight Core: Minimal base implementation that's easy to understand and extend
  • LLM Agnostic: Support for 100+ language models through litellm
  • Flexible Agent System: Create agents with custom instructions and capabilities
  • Tool Integration: Easy integration of Python functions as agent tools
  • Structured Outputs: Built-in support for validating and parsing agent responses
  • Multi-Agent Teams: Coordinate multiple specialized agents for complex tasks
  • Event Streaming: Real-time streaming of agent responses, tool calls, and other events
  • Context Management: Smart handling of conversation history and context
  • Cost Tracking: Optional tracking of token usage and API costs

Core Components

Message Store

The Message Store is responsible for managing conversation history and message persistence. It provides:

  • Message Storage: Efficient storage and retrieval of conversation messages
  • History Management: Methods for adding, updating, and removing messages
  • State Preservation: Maintains conversation state between interactions
  • Memory Optimization: Support for different memory strategies
  • Format Validation: Ensures messages follow the required schema

Example usage:

import asyncio

from liteswarm.core.message_store import LiteMessageStore
from liteswarm.types import Message


async def main() -> None:
    # Create a message store
    message_store = LiteMessageStore()

    # Add messages to the message store
    await message_store.add_messages(
        [
            Message(role="user", content="Hello!"),
            Message(role="assistant", content="Hello! How can I help you today?"),
        ]
    )

    # Get all messages in the message store
    messages = await message_store.get_messages()

    # Display results
    print("Messages:")
    for message in messages:
        print(f"- {message.content}")


if __name__ == "__main__":
    asyncio.run(main())

See MessageStore for more details.

Message Index

The Message Index provides semantic search capabilities over conversation history:

  • Semantic Search: Find messages based on meaning, not just keywords
  • Relevance Scoring: Rank messages by semantic similarity
  • Embedding Support: Multiple embedding models (OpenAI, HuggingFace, etc.)
  • Efficient Retrieval: Fast lookup of relevant context
  • Customizable Search: Configurable search parameters and strategies

Example usage:

import asyncio

from liteswarm.core import LiteMessageIndex
from liteswarm.types import Message, MessageRecord


async def main() -> None:
    # Create an index
    index = LiteMessageIndex()

    # Prepare chat messages for indexing
    # fmt: off
    messages = [
        Message(role="user", content="Can you help me with setting up a development environment?"),
        Message(role="assistant", content="Sure! What kind of development environment are you working on?"),
        Message(role="user", content="I want to set up a Flutter development environment."),
        Message(role="assistant", content="To set up Flutter, you’ll need to install Flutter SDK, an IDE like VS Code or Android Studio, and ensure you have the required tools for your platform."),
        Message(role="user", content="What are the system requirements for running Flutter?"),
        Message(role="assistant", content="The system requirements depend on your operating system. For example, on macOS, you need macOS 10.14 or later and Xcode installed."),
    ]
    # fmt: on

    # Convert messages to MessageRecord
    chat_messages = [MessageRecord.from_message(message) for message in messages]

    # Add messages to index
    await index.index(chat_messages)

    # Find relevant messages
    relevant_messages = await index.search(
        query="system requirements for Flutter",
        max_results=10,
        score_threshold=0.6,
    )

    # Display the results
    print("Relevant messages:")
    for message, score in relevant_messages:
        print(f"- {message.content} (score: {score:.2f})")


if __name__ == "__main__":
    asyncio.run(main())

See MessageIndex for more details.

Context Manager

The Context Manager optimizes conversation context to prevent token limits and improve relevance:

  • Context Optimization: Smart selection of relevant messages
  • Window Management: Sliding window over conversation history
  • RAG Integration: Retrieval-augmented generation support
  • Strategy Selection: Multiple optimization strategies
  • Token Management: Automatic handling of context length

Example usage:

import asyncio

from liteswarm.core import LiteContextManager, LiteMessageStore
from liteswarm.types import Message
from liteswarm.types.context_manager import RAGStrategyConfig


async def main() -> None:
    # Create message store that the context manager will use
    message_store = LiteMessageStore()

    # Add messages to the message store
    # fmt: off
    await message_store.add_messages(
        [
            Message(role="user", content="Hi there!"),
            Message(role="assistant", content="Hello! How can I assist you today?"),
            Message(role="user", content="Can you tell me the weather in London?"),
            Message(role="assistant", content="Sure! The weather in London is currently sunny with a high of 20°C."),
            Message(role="user", content="Thanks! How about Paris?"),
            Message(role="assistant", content="You're welcome! The weather in Paris is cloudy with occasional rain showers and a high of 15°C."),
            Message(role="user", content="What should I pack for a trip to both cities?"),
            Message(role="assistant", content="For London, pack light layers and sunglasses. For Paris, consider bringing an umbrella and a warm jacket."),
            Message(role="user", content="Got it. What are some must-see attractions in both cities?"),
            Message(role="assistant", content="In London, visit the Tower of London and Buckingham Palace. In Paris, don't miss the Eiffel Tower and the Louvre Museum."),
        ]
    )
    # fmt: on

    # Create context manager
    context_manager = LiteContextManager(message_store=message_store)

    # Optimize context using RAG strategy
    optimized_context = await context_manager.optimize_context(
        model="gpt-4o",
        strategy="rag",
        rag_config=RAGStrategyConfig(
            query="weather in London",
            max_messages=10,
            score_threshold=0.6,
        ),
    )

    # Display optimized context
    print("Optimized context:")
    for message in optimized_context:
        print(f"{message.role}: {message.content}")


if __name__ == "__main__":
    asyncio.run(main())

The Context Manager supports several optimization strategies:

  1. Summarize: Creates concise summaries of older messages
  2. Window: Keeps a sliding window of recent messages
  3. RAG: Uses semantic search to find relevant messages
  4. Trim: Simple truncation to fit token limits

Each strategy can be configured through the Context Manager's settings:

context_manager = LiteContextManager(
    window_size=50,  # Maximum messages in sliding window
    preserve_recent=25,  # Messages to keep when summarizing
    relevant_window_size=10,  # Maximum relevant messages to return
    chunk_size=10,  # Messages per summary chunk
    default_strategy="trim",  # Default optimization strategy
    default_embedding_model="text-embedding-3-small",  # Default model for embeddings
)

See ContextManager for more details.

Basic Usage

Simple Agent

import asyncio

from liteswarm.core import Swarm
from liteswarm.types import LLM, Agent


async def main() -> None:
    # Create an agent
    agent = Agent(
        id="assistant",
        instructions="You are a helpful AI assistant.",
        llm=LLM(
            model="claude-3-5-sonnet-20241022",
            temperature=0.7,
        ),
    )

    # Create swarm and execute
    swarm = Swarm()
    result = await swarm.execute(
        agent=agent,
        prompt="Hello!",
    )

    print(result.content)


if __name__ == "__main__":
    asyncio.run(main())

Agent with Tools

import asyncio

from liteswarm.core import Swarm
from liteswarm.types import LLM, Agent


async def main() -> None:
    def calculate_sum(a: int, b: int) -> int:
        """Calculate the sum of two numbers."""
        return a + b

    # Create a math agent with tools
    agent = Agent(
        id="math_agent",
        instructions="Use tools for calculations. Never calculate yourself.",
        llm=LLM(
            model="claude-3-5-sonnet-20241022",
            tools=[calculate_sum],
            tool_choice="auto",
        ),
    )

    # Create swarm and execute
    swarm = Swarm()
    result = await swarm.execute(
        agent=agent,
        prompt="What is 2 + 2?",
    )

    print(result.content)


if __name__ == "__main__":
    asyncio.run(main())

Event Streaming

LiteSwarm provides a powerful event streaming API that allows real-time monitoring of agent interactions:

import asyncio

from liteswarm.core import Swarm
from liteswarm.types import LLM, Agent


async def main() -> None:
    agent = Agent(
        id="assistant",
        instructions="You are a helpful assistant.",
        llm=LLM(model="gpt-4o"),
    )

    swarm = Swarm()
    stream = swarm.stream(agent, prompt="Hello!")

    # Method 1: Process events with if/else branching
    async for event in stream:
        if event.type == "agent_response_chunk":
            # Handle content updates
            if content := event.chunk.completion.delta.content:
                print(content, end="", flush=True)
            # Handle completion
            if event.chunk.completion.finish_reason:
                print("\nFinished:", event.chunk.completion.finish_reason)
        elif event.type == "agent_switch":
            # Handle agent switching
            prev_id = event.previous.id if event.previous else "None"
            print(f"\nSwitching from {prev_id} to {event.current.id}")
        elif event.type == "tool_call_result":
            # Handle tool execution results
            print(f"\nTool result: {event.tool_call_result.result}")
        elif event.type == "error":
            # Handle errors
            print(f"\nError: {event.error}")

    # Get final result after streaming
    result = await stream.get_result()
    print(f"\nFinal result: {result.content}")


if __name__ == "__main__":
    asyncio.run(main())

Event Types

LiteSwarm emits several types of events during execution:

  • AgentResponseChunkEvent:

    • Content updates from the agent
    • Completion status and reason
    • Usage statistics and cost info
  • AgentSwitchEvent:

    • Previous and current agent IDs
    • Switching context and reason
  • ToolCallResultEvent:

    • Tool execution results
    • Tool call metadata
    • Execution status
  • ErrorEvent:

    • Error details and type
    • Agent context
    • Stack trace if available
  • CompleteEvent:

    • Final conversation state
    • Complete message history
    • Execution metadata

See SwarmEvent page for more details.

Custom Event Handlers

You can create custom event handlers for more sophisticated event processing. There are two main ways to handle events:

  1. Using the streaming API directly (recommended for most cases):
stream = swarm.stream(agent, prompt="Hello!")
async for event in stream:
    if event.type == "agent_response_chunk":
        print(event.chunk.completion.delta.content, end="", flush=True)
    elif event.type == "tool_call_result":
        print(f"\nTool: {event.tool_call_result.tool_call.function.name}")
    elif event.type == "error":
        print(f"\nError: {event.error}")
  1. Using a custom event handler with execute() (useful for advanced event handling or non-async contexts):
import asyncio

from typing_extensions import override

from liteswarm.core import Swarm, SwarmEventHandler
from liteswarm.types import LLM, Agent, SwarmEvent


class CustomHandler(SwarmEventHandler):
    @override
    async def on_event(self, event: SwarmEvent) -> None:
        if event.type == "agent_response_chunk":
            # Process content updates
            if content := event.chunk.completion.delta.content:
                print(content, end="", flush=True)

            # Track completion
            if event.chunk.completion.finish_reason:
                print(f"\nFinished: {event.chunk.completion.finish_reason}")

        elif event.type == "agent_switch":
            # Log agent switches
            prev_id = event.previous.id if event.previous else "None"
            print(f"\nSwitching agents: {prev_id} -> {event.current.id}")

        elif event.type == "tool_call_result":
            # Process tool results
            print(f"\nTool executed: {event.tool_call_result.tool_call.function.name}")
            print(f"Result: {event.tool_call_result.result}")

        elif event.type == "error":
            # Handle errors
            print(f"\nError occurred: {event.error}")

        elif event.type == "complete":
            # Process completion
            print("\nExecution completed")
            print(f"Messages: {len(event.messages)}")


async def main() -> None:
    agent = Agent(
        id="assistant",
        instructions="You are a helpful assistant.",
        llm=LLM(model="gpt-4o"),
    )

    swarm = Swarm()
    result = await swarm.execute(
        agent=agent,
        prompt="Hello!",
        event_handler=CustomHandler(),
    )

    print(f"\n\nResult: {result.model_dump_json(indent=2, exclude_none=True)}")


if __name__ == "__main__":
    asyncio.run(main())

Streaming with Result Collection

The stream() method returns an async generator that supports both event streaming and result collection:

import asyncio

from liteswarm.core import Swarm
from liteswarm.types import LLM, Agent


async def main() -> None:
    agent = Agent(
        id="assistant",
        instructions="You are a helpful assistant.",
        llm=LLM(model="gpt-4o"),
    )

    swarm = Swarm(
        include_usage=True,
        include_cost=True,
    )

    # Stream events and collect result
    stream = swarm.stream(agent, prompt="Hello!")

    # Process events during execution
    async for event in stream:
        if event.type == "agent_response_chunk":
            print(event.chunk.completion.delta.content, end="", flush=True)

    # Get final result after completion
    result = await stream.get_result()
    print(f"\nFinal content: {result.content}")

    # Access metadata
    if result.usage:
        print(f"Tokens used: {result.usage.total_tokens}")

    if result.response_cost:
        prompt_cost = result.response_cost.prompt_tokens_cost
        completion_cost = result.response_cost.completion_tokens_cost
        total_cost = prompt_cost + completion_cost
        print(f"Cost: ${total_cost}")


if __name__ == "__main__":
    asyncio.run(main())

Event-Driven Architecture

LiteSwarm's event system enables:

  1. Real-time Updates:

    • Stream content as it's generated
    • Monitor agent state changes
    • Track tool execution progress
  2. Custom Processing:

    • Filter and transform events
    • Implement custom logging
    • Build interactive UIs
  3. Error Handling:

    • Catch and process errors
    • Implement recovery strategies
    • Monitor execution health
  4. Progress Tracking:

    • Monitor token usage
    • Track execution costs
    • Measure response times

Advanced Features

Agent Switching

Agents can dynamically switch to other agents during execution:

import asyncio
import json

from liteswarm.core import Swarm
from liteswarm.types import LLM, Agent, ToolResult
from liteswarm.utils import dump_messages


async def main() -> None:
    def multiply(a: int, b: int) -> int:
        """Multiply two numbers."""
        return a * b

    # Create a math agent with tools
    math_agent = Agent(
        id="math",
        instructions="You are a math expert.",
        llm=LLM(
            model="gpt-4o",
            tools=[multiply],
            tool_choice="auto",
        ),
    )

    def switch_to_math() -> ToolResult:
        """Switch to math agent for calculations."""
        return ToolResult(
            content="Switching to math expert",
            agent=math_agent,
        )

    # Create the main agent with switch tool
    main_agent = Agent(
        id="assistant",
        instructions="Help users and switch to math agent for calculations.",
        llm=LLM(
            model="gpt-4o",
            tools=[switch_to_math],
            tool_choice="auto",
        ),
    )

    # Agent will automatically switch when needed
    swarm = Swarm()
    await swarm.execute(
        agent=main_agent,
        prompt="What is 234 * 567?",
    )

    # Print the full conversation history
    messages = await swarm.message_store.get_messages()
    print(json.dumps(dump_messages(messages), indent=2))


if __name__ == "__main__":
    asyncio.run(main())

Note: LiteSwarm requires explicitly passing the agent you want to use in each execute() or stream() call. To maintain a conversation with the same agent, pass that agent in subsequent calls. The conversation history is preserved, but the active agent is determined by what you pass to these methods:

# Start with agent1
result1 = await swarm.execute(agent1, prompt="Start task")

# Continue with agent1
result2 = await swarm.execute(agent1, prompt="Continue task")

# Switch to agent2 (history is preserved)
result3 = await swarm.execute(agent2, prompt="Review work")

Agent Teams

The SwarmTeam class (from liteswarm.experimental) provides an experimental framework for orchestrating complex agent workflows with automated planning. It follows a two-phase process:

  1. Planning Phase:

    • Analyzes the prompt to create a structured plan
    • Breaks down work into specific tasks with dependencies
    • Supports interactive feedback loop for plan refinement
    • Validates task types and team capabilities
  2. Execution Phase:

    • Executes tasks in dependency order
    • Assigns tasks to capable team members
    • Tracks progress and maintains execution state
    • Produces an artifact with results and updates

Here's a complete example:

import asyncio
import json
from typing import Literal

from pydantic import BaseModel
from typing_extensions import override

from liteswarm.core import ConsoleEventHandler, Swarm
from liteswarm.experimental import SwarmTeam
from liteswarm.types import (
    LLM,
    Agent,
    ArtifactStatus,
    ContextVariables,
    Plan,
    PlanFeedbackHandler,
    Task,
    TaskDefinition,
    TeamMember,
)


# 1. Define task types and outputs
class WriteDocTask(Task):
    type: Literal["write_documentation"]
    topic: str
    target_audience: Literal["beginner", "intermediate", "advanced"]


class ReviewDocTask(Task):
    type: Literal["review_documentation"]
    content: str
    criteria: list[str]


class Documentation(BaseModel):
    content: str
    examples: list[str]
    see_also: list[str]


class ReviewFeedback(BaseModel):
    approved: bool
    issues: list[str]
    suggestions: list[str]


# 2. (Optional) Create interactive feedback handler
class InteractiveFeedback(PlanFeedbackHandler):
    @override
    async def handle(
        self,
        plan: Plan,
        prompt: str,
        context: ContextVariables | None,
    ) -> tuple[str, ContextVariables | None] | None:
        print("\nProposed plan:")
        for task in plan.tasks:
            print(f"- {task.title}")

        if input("\nApprove? [y/N]: ").lower() != "y":
            return "Please revise the plan", context
        else:
            return None


async def main() -> None:
    # 3. Create task definitions
    def build_write_doc_instructions(
        task: WriteDocTask,
        context: ContextVariables,
    ) -> str:
        return f"""
        Write a {task.target_audience}-level documentation about {task.topic}.

        Style Guide from context:
        {context.style_guide}

        You must return a JSON object that matches the following schema:
        {json.dumps(Documentation.model_json_schema())}
        """

    write_doc = TaskDefinition(
        task_type=WriteDocTask,
        instructions=build_write_doc_instructions,
        response_format=Documentation,
    )

    def build_review_doc_instructions(
        task: ReviewDocTask,
        context: ContextVariables,
    ) -> str:
        return f"""
        Review the following documentation:
        {task.content}

        Review criteria:
        {task.criteria}

        Style Guide to check against:
        {context.style_guide}

        You must return a JSON object that matches the following schema:
        {json.dumps(ReviewFeedback.model_json_schema())}
        """

    review_doc = TaskDefinition(
        task_type=ReviewDocTask,
        instructions=build_review_doc_instructions,
        response_format=ReviewFeedback,
    )

    # 4. Create specialized agents
    writer = Agent(
        id="tech_writer",
        instructions="""You are an expert technical writer who creates clear,
        concise documentation with practical examples.""",
        llm=LLM(
            model="gpt-4o",
            temperature=0.7,
        ),
    )

    reviewer = Agent(
        id="doc_reviewer",
        instructions="""You are a documentation reviewer who ensures accuracy,
        clarity, and completeness of technical documentation.""",
        llm=LLM(
            model="gpt-4o",
            temperature=0.3,  # Lower temperature for more consistent reviews
        ),
    )

    # 5. Create team members
    writer_member = TeamMember(
        id="writer",
        agent=writer,
        task_types=[WriteDocTask],
    )

    reviewer_member = TeamMember(
        id="reviewer",
        agent=reviewer,
        task_types=[ReviewDocTask],
    )

    # 6. Create swarm team
    event_handler = ConsoleEventHandler()
    swarm = Swarm(event_handler=event_handler)
    team = SwarmTeam(
        swarm=swarm,
        members=[writer_member, reviewer_member],
        task_definitions=[write_doc, review_doc],
        event_handler=event_handler,
    )

    # 7. Execute the user request
    artifact = await team.execute(
        prompt="Create beginner-friendly documentation about Python list comprehensions",
        context=ContextVariables(
            style_guide="""
            - Use simple language
            - Include practical examples
            - Link to related topics
            - Start with basic concepts
            - Show common patterns
            """
        ),
        feedback_handler=InteractiveFeedback(),
    )

    # 8. Inspect and print the results
    if artifact.status == ArtifactStatus.COMPLETED:
        print("\nDocumentation Team Results:")
        for result in artifact.task_results:
            print(f"\nTask: {result.task.type}")

            if not result.output:
                continue

            match result.output:
                case Documentation() as doc:
                    print("\nContent:")
                    print(doc.content)
                    print("\nExamples:")
                    for example in doc.examples:
                        print(f"• {example}")
                    print("\nSee Also:")
                    for ref in doc.see_also:
                        print(f"• {ref}")

                case ReviewFeedback() as review:
                    print("\nReview Feedback:")
                    print(f"Approved: {review.approved}")
                    if review.issues:
                        print("\nIssues:")
                        for issue in review.issues:
                            print(f"• {issue}")
                    if review.suggestions:
                        print("\nSuggestions:")
                        for suggestion in review.suggestions:
                            print(f"• {suggestion}")


if __name__ == "__main__":
    asyncio.run(main())

The SwarmTeam will:

  1. Create a plan with appropriate tasks and dependencies
  2. Allow plan review/modification through feedback handler
  3. Execute tasks in correct order using capable team members
  4. Produce an artifact containing all results and updates

See the software_team example for a complete implementation of a development team workflow.

Streaming with Structured Outputs

LiteSwarm provides two layers of structured output handling with real-time streaming support:

import asyncio

from pydantic import BaseModel

from liteswarm.core import Swarm
from liteswarm.types import LLM, Agent


class EntitiesModel(BaseModel):
    attributes: list[str]
    colors: list[str]
    animals: list[str]


async def main() -> None:
    # Create an agent for entity extraction
    agent = Agent(
        id="extract-entities-agent",
        instructions="You're an extraction agent. Extract the entities from the input text.",
        llm=LLM(
            model="gpt-4o",
            response_format=EntitiesModel,
        ),
    )

    # Create swarm and get stream
    swarm = Swarm()
    stream = swarm.stream(
        agent,
        prompt="The quick brown fox jumps over the lazy dog with piercing blue eyes",
    )

    # Method 1: Stream responses and handle parsed content
    print("Streaming responses:")
    async for event in stream:
        if event.type == "agent_response_chunk":
            response_chunk = event.chunk
            completion = response_chunk.completion

            # Handle raw content
            if completion.delta.content is not None:
                print(completion.delta.content, end="", flush=True)

            # Handle structured outputs
            if isinstance(response_chunk.parsed_content, EntitiesModel):
                print("\nParsed content:")
                print(response_chunk.parsed_content.model_dump_json(indent=2))

    # Method 2: Get final result with parsed content
    result = await stream.get_result()
    if isinstance(result.parsed_content, EntitiesModel):
        print("\n\nFinal parsed result:")
        print(result.parsed_content.model_dump_json(indent=2))

    # Method 3: Direct execution with structured output
    print("\n\n")
    print("=" * 80)
    print("Direct execution:")
    print("=" * 80)

    result = await swarm.execute(
        agent,
        prompt="The quick brown fox jumps over the lazy dog with piercing blue eyes",
    )

    if isinstance(result.parsed_content, EntitiesModel):
        print("\nParsed result:")
        print(result.parsed_content.model_dump_json(indent=2))


if __name__ == "__main__":
    asyncio.run(main())

The example demonstrates:

  1. Real-time Content Streaming:

    • Stream raw content as it's generated
    • Access structured output when parsing completes
    • Handle both raw and parsed content in same stream
  2. Structured Output Handling:

    • Define output schema with Pydantic
    • Configure agent with response format
    • Access parsed content through events
  3. Multiple Access Methods:

    • Stream events for real-time updates
    • Get final result for complete response
    • Access both raw and parsed content

The streaming API provides:

  • event.chunk.completion.delta.content: Raw content updates
  • event.chunk.parsed_content: Parsed structured output
  • event.chunk.completion.finish_reason: Completion status
  • event.chunk.completion.usage: Token usage statistics
  • event.chunk.completion.response_cost: Cost tracking (if enabled)

See examples/structured_outputs/run.py for more examples of different structured output strategies.

Error Handling

LiteSwarm provides comprehensive error handling with built-in retry mechanisms. The framework automatically retries failed operations with exponential backoff:

import asyncio

from liteswarm.core import Swarm
from liteswarm.types import LLM, Agent
from liteswarm.types.exceptions import RetryError, SwarmError


async def main() -> None:
    # Create swarm with custom retry settings
    swarm = Swarm(
        # Retry Configuration
        max_retries=3,  # Maximum number of retry attempts
        initial_retry_delay=1.0,  # Initial delay between retries (seconds)
        max_retry_delay=10.0,  # Maximum delay between retries (seconds)
        backoff_factor=2.0,  # Exponential backoff multiplier
    )

    agent = Agent(
        id="assistant",
        instructions="You are a helpful assistant.",
        llm=LLM(model="gpt-4o"),
    )

    # The framework will automatically:
    # 1. Retry failed API calls with exponential backoff
    # 2. Handle transient errors (network issues, rate limits)
    # 3. Preserve conversation state between retries
    # 4. Track and expose retry statistics

    try:
        stream = swarm.stream(agent, prompt="Hello!")
        async for response in stream:
            print(response.delta.content, end="", flush=True)

        result = await stream.get_result()
        print("\nFinal:", result.content)

    except RetryError as e:
        # RetryError includes:
        # - Original error that caused retries
        # - Number of retry attempts made
        # - Total retry duration
        # - Backoff strategy details
        print(f"Retry mechanism failed after {e.attempts} attempts: {e}")
        print(f"Original error: {e.original_error}")
        print(f"Total retry duration: {e.total_duration}s")

    except SwarmError as e:
        print(f"Other swarm error: {e}")


if __name__ == "__main__":
    asyncio.run(main())

The framework provides automatic retry with exponential backoff for:

  1. API Calls: Handles transient API issues

    • Network connectivity problems
    • Rate limit errors
    • Temporary service outages
    • Timeout errors
  2. Response Generation: Manages streaming issues

    • Incomplete or malformed responses
    • Connection drops during streaming
    • Token limit exceeded errors
    • Model-specific failures
  3. Agent Switching: Handles transition errors

    • Failed agent initialization
    • Context transfer issues
    • Tool execution failures
    • State management errors

The retry mechanism features:

  1. Exponential Backoff: Gradually increases delay between retries

    # Example retry delays with default settings:
    # Attempt 1: 1.0 seconds
    # Attempt 2: 2.0 seconds
    # Attempt 3: 4.0 seconds
    # Attempt 4: 8.0 seconds (capped at max_retry_delay)
    swarm = Swarm(
        max_retries=3,
        initial_retry_delay=1.0,
        max_retry_delay=10.0,
        backoff_factor=2.0,
    )
    
  2. State Preservation: Maintains conversation context

    • Preserves message history
    • Retains agent state
    • Keeps tool execution results
    • Maintains parsed content
  3. Detailed Error Information: Provides comprehensive error data

    try:
        result = await swarm.execute(agent, prompt)
    except RetryError as e:
        print(f"Attempts: {e.attempts}")
        print(f"Duration: {e.total_duration}s")
        print(f"Original error: {e.original_error}")
        print(f"Backoff strategy: {e.backoff_strategy}")
    
  4. Customizable Behavior: Configure retry settings

    swarm = Swarm(
        # Retry settings
        max_retries=5,              # More retry attempts
        initial_retry_delay=0.5,    # Start with shorter delays
        max_retry_delay=30.0,      # Allow longer maximum delay
        backoff_factor=3.0,        # More aggressive backoff
    )
    

The framework also provides specific error types for different failure scenarios:

  • SwarmError: Base class for all swarm-specific errors

    • Provides context about what went wrong
    • Contains the original exception if applicable
    • Includes agent information when relevant
  • CompletionError: Raised when LLM completion fails permanently

    • Indicates API call failure after all retries
    • Contains the original API error
    • Provides completion attempt details
  • ContextLengthError: Raised when context becomes too large

    • Indicates when message history exceeds limits
    • Provides details about context size
    • Suggests using memory management
  • SwarmTeamError: Base class for team-related errors

    • Provides unified error handling for team operations
    • Contains original error and team context
    • Used for planning and execution failures
  • PlanValidationError: Raised when plan validation fails

    • Indicates invalid task types or dependencies
    • Lists specific validation failures
    • Helps identify plan structure issues
  • TaskExecutionError: Raised when task execution fails

    • Contains task and assignee information
    • Provides execution failure details
    • Helps track which task and member failed
  • ResponseParsingError: Raised when response parsing fails

    • Contains raw response and expected format
    • Helps debug format mismatches
    • Used for structured output validation
  • ResponseRepairError: Raised when response repair fails

    • Indicates failed attempts to fix invalid responses
    • Contains repair attempt details
    • Used when response cannot be salvaged
  • MaxAgentSwitchesError: Raised when too many agent switches occur

    • Indicates potential infinite switching loops
    • Shows switch count and limit
    • Includes agent switch history
  • MaxResponseContinuationsError: Raised when response needs too many continuations

    • Indicates when response exceeds length limits
    • Shows continuation count and limit
    • Suggests breaking task into smaller parts
  • RetryError: Raised when retry mechanism fails

    • Contains original error that caused retries
    • Shows retry count and settings
    • Includes backoff strategy details

Best practices for error handling:

  1. Use Specific Handlers: Catch specific errors for targeted handling

    try:
        result = await swarm.execute(agent, prompt)
    except CompletionError as e:
        # Handle API failures
    except ContextLengthError as e:
        # Handle context length issues
    except SwarmError as e:
        # Handle other swarm errors
    
  2. Team Error Recovery: Handle team-specific errors

    try:
        artifact = await team.execute_plan(plan)
    except PlanValidationError as e:
        # Handle invalid plan structure
    except TaskExecutionError as e:
        # Handle task execution failures
    except ResponseParsingError as e:
        # Handle response format issues
    except SwarmTeamError as e:
        # Handle other team errors
    
  3. Response Format Recovery: Handle parsing and repair

    try:
        result = await team.execute_task(task)
    except ResponseParsingError as e:
        # Try to repair the response
        repaired = repair_json(e.response)
        result = parse_response(repaired, e.response_format)
    except ResponseRepairError as e:
        # Handle unrecoverable format issues
    
  4. Retry Configuration: Customize retry behavior

    swarm = Swarm(
        max_retries=3,
        initial_retry_delay=1.0,
        max_retry_delay=10.0,
        backoff_factor=2.0,
    )
    

Context Variables

Context variables let you pass data between interactions. Here's a simple example:

import asyncio
import json

from liteswarm.core import Swarm
from liteswarm.types import LLM, Agent, ContextVariables, ToolResult

mock_database = {
    "alice": {
        "language": "Python",
        "experience": "intermediate",
        "interests": ["web", "data science"],
    }
}


async def main() -> None:
    def get_user_preferences(user_id: str) -> ToolResult:
        """Get user preferences from a simulated database."""
        user_preferences = mock_database.get(user_id, {})
        return ToolResult(
            content=f"Found preferences for {user_id}: {user_preferences}",
            context_variables=ContextVariables(
                user_preferences=user_preferences,
                learning_path=[],  # Initialize empty learning path
            ),
        )

    def update_learning_path(topic: str, completed: bool = False) -> ToolResult:
        """Update the user's learning path with a new topic or mark as completed."""
        return ToolResult(
            content=f"{'Completed' if completed else 'Added'} topic: {topic}",
            context_variables=ContextVariables(
                topic=topic,
                completed=completed,
            ),
        )

    # Create an agent with tools
    agent = Agent(
        id="tutor",
        instructions=lambda context_variables: f"""
        You are a programming tutor tracking a student's learning journey.

        Current Context:
        - User ID: {json.dumps(context_variables.get('user_id', 'unknown'))}
        - User Preferences: {json.dumps(context_variables.get('user_preferences', {}))}
        - Learning Path: {json.dumps(context_variables.get('learning_path', []))}
        - Last Topic: {json.dumps(context_variables.get('topic', None))}
        - Last Topic Completed: {json.dumps(context_variables.get('completed', False))}

        Track their progress and suggest next steps based on their preferences and current progress.
        """,
        llm=LLM(
            model="gpt-4o",
            tools=[get_user_preferences, update_learning_path],
            tool_choice="auto",
            temperature=0.3,
        ),
    )

    # Create swarm and execute with initial context
    swarm = Swarm()

    # First interaction - get user preferences
    result = await swarm.execute(
        agent=agent,
        prompt="Start Alice's learning journey",
        context_variables=ContextVariables(user_id="alice"),
    )
    print("\nInitial Setup:", result.content)

    # Second interaction - suggest first topic
    result = await swarm.execute(
        agent=agent,
        prompt="What should Alice learn first?",
        # Context variables are preserved from the previous execution
    )
    print("\nFirst Topic Suggestion:", result.content)

    # Third interaction - mark progress and get next topic
    result = await swarm.execute(
        agent=agent,
        prompt="Alice completed the first topic. What's next?",
        # Context variables are preserved from the previous execution
    )
    print("\nProgress Update:", result.content)


if __name__ == "__main__":
    asyncio.run(main())

Structured Outputs

LiteSwarm provides two layers of structured output handling:

  1. LLM-level Response Format:

    • Set via response_format in LLM class
    • Provider-specific structured output support
    • For OpenAI/Anthropic: Direct JSON schema enforcement
    • For other providers: Manual prompt engineering
  2. Framework-level Response Format:

    • Set in TaskDefinition and PlanningAgent
    • Provider-agnostic parsing and validation
    • Supports both Pydantic models and custom parsers
    • Handles response repair and validation

Using Swarm directly with LLM-level response format:

import asyncio

from pydantic import BaseModel

from liteswarm.core import Swarm
from liteswarm.types import LLM, Agent

CODE_TO_REVIEW = """
def calculate_sum(a: int, b: int) -> int:
    \"\"\"Calculate the sum of two numbers.\"\"\"
    return a - b
"""


class ReviewOutput(BaseModel):
    issues: list[str]
    approved: bool


async def main() -> None:
    agent = Agent(
        id="reviewer",
        instructions="Review code and provide structured feedback",
        llm=LLM(
            model="gpt-4o",
            response_format=ReviewOutput,  # Direct OpenAI JSON schema support
        ),
    )

    swarm = Swarm()
    result = await swarm.execute(
        agent=agent,
        prompt=f"Review the code and provide structured feedback:\n{CODE_TO_REVIEW}",
    )

    if not isinstance(result.parsed_content, ReviewOutput):
        print("Agent failed to produce a response of type ReviewOutput")
        return

    if result.parsed_content.issues:
        print("Issues:")
        for issue in result.parsed_content.issues:
            print(f"- {issue}")

    print(f"\nApproved: {result.parsed_content.approved}")


if __name__ == "__main__":
    asyncio.run(main())

Using SwarmTeam with both layers (recommended for complex workflows):

import asyncio
from typing import Literal

from pydantic import BaseModel

from liteswarm.core import ConsoleEventHandler, Swarm
from liteswarm.experimental import LitePlanningAgent, SwarmTeam
from liteswarm.types import (
    LLM,
    Agent,
    ArtifactStatus,
    ContextVariables,
    Plan,
    Task,
    TaskDefinition,
    TeamMember,
)

CODE_TO_REVIEW = """
def calculate_sum(a: int, b: int) -> int:
    \"\"\"Calculate the sum of two numbers.\"\"\"
    return a - bs  # Bug: Typo in variable name and wrong operator
"""


# 1. Define data structures for the review process
class ReviewTask(Task):
    type: Literal["code-review"]
    code: str
    language: str
    review_type: Literal["general", "security", "performance"]


class CodeReviewOutput(BaseModel):
    issues: list[str]
    approved: bool
    suggested_fixes: list[str]


class CodeReviewPlan(Plan):
    tasks: list[ReviewTask]


# 2. Create prompt builders
def build_review_prompt(prompt: str, context: ContextVariables) -> str:
    return f"""
    You're given the following user request:
    <request>
    {prompt}
    </request>

    Here is the code to review:
    <code language="{context.get('language', '')}" review_type="{context.get('review_type', '')}">
    {context.get('code', '')}
    </code>

    Please create a review plan consisting of 1 task.
    """.strip()


async def main() -> None:
    # 3. Create task definitions
    review_def = TaskDefinition(
        task_type=ReviewTask,
        instructions=lambda task, _: f"""
        Review the provided code focusing on {task.review_type} aspects.
        <code language="{task.language}">{task.code}</code>
        """,
        response_format=CodeReviewOutput,
    )

    # 4. Create agents
    planning_agent = Agent(
        id="planning-agent",
        instructions="You are a planning agent that creates plans for code review tasks.",
        llm=LLM(model="gpt-4o", response_format=CodeReviewPlan),
    )

    review_agent = Agent(
        id="code-reviewer",
        instructions="You are an expert code reviewer.",
        llm=LLM(model="gpt-4o", response_format=CodeReviewOutput),
    )

    # 5. Create team members
    review_member = TeamMember(
        id="senior-reviewer",
        agent=review_agent,
        task_types=[ReviewTask],
    )

    # 6. Set up swarm team
    event_handler = ConsoleEventHandler()
    swarm = Swarm()
    team = SwarmTeam(
        swarm=swarm,
        members=[review_member],
        task_definitions=[review_def],
        event_handler=event_handler,
        planning_agent=LitePlanningAgent(
            swarm=swarm,
            agent=planning_agent,
            prompt_template=build_review_prompt,
            task_definitions=[review_def],
            response_format=CodeReviewPlan,
            event_handler=event_handler,
        ),
    )

    # 7. Execute review request
    artifact = await team.execute(
        prompt="Review this Python code",
        context=ContextVariables(
            code=CODE_TO_REVIEW,
            language="python",
            review_type="general",
        ),
    )

    # 8. Show results
    if artifact.status == ArtifactStatus.COMPLETED:
        for result in artifact.task_results:
            if isinstance(result.output, CodeReviewOutput):
                assert result.assignee is not None
                print(f"\nReview by: {result.assignee.id}")
                print("\nIssues found:")
                for issue in result.output.issues:
                    print(f"- {issue}")
                print("\nSuggested fixes:")
                for fix in result.output.suggested_fixes:
                    print(f"- {fix}")
                print(f"\nApproved: {result.output.approved}")


if __name__ == "__main__":
    asyncio.run(main())

This example demonstrates:

  1. LLM-level Format (Provider-specific):

    • response_format=CodeReviewOutput in review agent's LLM
    • response_format=CodeReviewPlan in planning agent's LLM
    • OpenAI will enforce JSON schema at generation time
  2. Framework-level Format (Provider-agnostic):

    • response_format=CodeReviewOutput in task definition
    • response_format=CodeReviewPlan in planning agent
    • Framework handles parsing, validation, and repair

The two-layer approach ensures:

  • Structured outputs work with any LLM provider
  • Automatic parsing and validation
  • Consistent interface across providers
  • Fallback to prompt-based formatting
  • Response repair capabilities

See examples/structured_outputs/run.py for more examples of different structured output strategies.

Note about OpenAI Structured Outputs

OpenAI's JSON schema support has certain limitations:

  • No default values in Pydantic models
  • No oneOf in union types (must use discriminated unions)
  • Some advanced Pydantic features may not be supported

While LiteSwarm's base Task and Plan types are designed to be OpenAI-compatible, this compatibility must be maintained by users when subclassing these types. For example:

# OpenAI-compatible task type
class ReviewTask(Task):
    type: Literal["code-review"]  # Discriminator field
    code: str                     # Required field, no default
    language: str                 # Required field, no default
    
    # Not OpenAI-compatible - has default value
    review_type: str = "general"  # Will work with other providers

We provide utilities to help maintain compatibility:

  • liteswarm.utils.pydantic module contains helpers for:
    • Converting Pydantic schemas to OpenAI format
    • Restoring objects from OpenAI responses
    • Handling schema transformations

See examples/structured_outputs/strategies/openai_pydantic.py for practical examples of using these utilities.

Remember: Base Task and Plan are OpenAI-compatible, but maintaining compatibility in subclasses is the user's responsibility if OpenAI structured outputs are needed.

Key Concepts

  1. Agent: An AI entity with specific instructions and capabilities
  2. Tool: A Python function that an agent can call
  3. Swarm: Orchestrator for agent interactions and conversations
  4. SwarmTeam: Coordinator for multiple specialized agents
  5. Context Variables: Dynamic data passed to agents and tools
  6. Event Handler: Interface for processing and responding to swarm events

Best Practices

  1. Use ToolResult for wrapping tool return values:

    def my_tool() -> ToolResult:
        return ToolResult(
            content="Result",
            context_variables=ContextVariables(...)
        )
    
  2. Implement proper error handling:

    try:
        result = await team.execute(agent, prompt)
    except TaskExecutionError as e:
        logger.error(f"Task failed: {e}")
    
  3. Use context variables for dynamic behavior:

    def build_instructions(context: ContextVariables) -> str:
        return f"Help {context['user_name']} with {context['task']}"
    
  4. Leverage event handlers for real-time feedback:

    from typing_extensions import override
    
    from liteswarm.core import Swarm, SwarmEventHandler
    from liteswarm.types import LLM, Agent, SwarmEvent
    
    # Method 1: Custom event handler class
    class MyEventHandler(SwarmEventHandler):
        @override
        async def on_event(self, event: SwarmEvent) -> None:
            if event.type == "agent_response_chunk":
                # Handle content updates
                if content := event.chunk.completion.delta.content:
                    print(content, end="", flush=True)
                # Track completion
                if event.chunk.completion.finish_reason:
                    print(f"\nFinished: {event.chunk.completion.finish_reason}")
            elif event.type == "tool_call_result":
                print(f"\nTool executed: {event.tool_call_result.tool_call.function.name}")
            elif event.type == "error":
                print(f"\nError: {event.error}")
    
    # Use with execute() method
    result = await swarm.execute(
        agent=agent,
        prompt="Hello!",
        event_handler=MyEventHandler(),
    )
    
    # Method 2: Direct stream processing
    stream = swarm.stream(agent, prompt="Hello!")
    async for event in stream:
        if event.type == "agent_response_chunk":
            print(event.chunk.completion.delta.content, end="", flush=True)
        elif event.type == "tool_call_result":
            print(f"\nTool: {event.tool_call_result.tool_call.function.name}")
    
    # Get final result after streaming
    result = await stream.get_result()
    

Examples

The framework includes several example applications in the examples/ directory:

Each example demonstrates different aspects of the framework:

# Run the REPL example
python -m examples.repl.run

# Run the calculator example
python -m examples.calculator.run

# Try the mobile app team
python -m examples.mobile_app.run

# Run the parallel research example
python -m examples.parallel_research.run

# Experiment with structured outputs
python -m examples.structured_outputs.run

# Run the software team example
python -m examples.software_team.run

Contributing

We welcome contributions to LiteSwarm! We're particularly interested in:

  1. Adding Tests: We currently have minimal test coverage and welcome contributions to:

    • Add unit tests for core functionality
    • Add integration tests for agent interactions
    • Add example-based tests for common use cases
    • Set up testing infrastructure and CI
  2. Bug Reports: Open an issue describing:

    • Steps to reproduce the bug
    • Expected vs actual behavior
    • Your environment details
    • Any relevant code snippets
  3. Feature Requests: Open an issue describing:

    • The use case for the feature
    • Expected behavior
    • Example code showing how it might work
  4. Code Contributions:

    • Fork the repository
    • Create a new branch for your feature
    • Include tests for new functionality
    • Submit a pull request with a clear description
    • Ensure CI passes and code follows our style guide

Development setup:

# Clone the repository
git clone https://github.com/your-org/liteswarm.git
cd liteswarm

# Create virtual environment (choose one)
python -m venv .venv
# or
poetry install
# or
uv venv

# Install development dependencies
uv pip install -e ".[dev]"
# or
poetry install --with dev

# Run existing tests (if any)
pytest

# Run type checking
mypy .

# Run linting
ruff check .

Code Style

  • We use ruff for linting and formatting
  • Type hints are required for all functions
  • Docstrings should follow Google style
  • New features should include tests

Testing Guidelines

We're building our test suite and welcome contributions that:

  • Add pytest-based tests
  • Include both unit and integration tests
  • Cover core functionality
  • Demonstrate real-world usage
  • Help improve test coverage
  • Set up testing infrastructure

Commit Messages

Follow the Conventional Commits specification:

  • feat: New features
  • fix: Bug fixes
  • docs: Documentation changes
  • test: Adding or updating tests
  • refactor: Code changes that neither fix bugs nor add features

Citation

If you use LiteSwarm in your research or project, please cite our work:

@software{mozharovskii_2024_liteswarm,
    title = {{LiteSwarm: A Lightweight Framework for Building AI Agent Systems}},
    author = {Mozharovskii, Evgenii and {GlyphyAI}},
    year = {2024},
    url = {https://github.com/glyphyai/liteswarm},
    license = {MIT},
    version = {0.4.0}
}

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

liteswarm-0.4.0.tar.gz (149.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

liteswarm-0.4.0-py3-none-any.whl (142.4 kB view details)

Uploaded Python 3

File details

Details for the file liteswarm-0.4.0.tar.gz.

File metadata

  • Download URL: liteswarm-0.4.0.tar.gz
  • Upload date:
  • Size: 149.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.11.11

File hashes

Hashes for liteswarm-0.4.0.tar.gz
Algorithm Hash digest
SHA256 caa115df30bcbf93da2c0c60a30e0b761e69388231e44b6cb2cba60f0b50d7de
MD5 169e25678b0247643869d3d9a7ed2ad3
BLAKE2b-256 102d5661ce2dab2080edaa7ad7ff25eb9ecb98cd1c1085895ad8ed35343bd373

See more details on using hashes here.

File details

Details for the file liteswarm-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: liteswarm-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 142.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.11.11

File hashes

Hashes for liteswarm-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6891d63013ad258bc2b5209d3bcffd1659e60c82dba509605eb69067866aaed4
MD5 bd25256877873cef6af3b3d9b56fad2a
BLAKE2b-256 0bf03afb6247140cf853c31843c39e302b203d020c5010bd291a6ada76f697ec

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page