Skip to main content

Framework for building multi-agent LLM orchestrator workflows with streaming and cost tracking

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

Multi-Agent Orchestration

Comprehensive framework for building multi-agent LLM orchestrator workflows with standardized patterns, streaming support, and cost tracking.

Features

  • Abstract Base Class: Extend BaseOrchestrator for custom workflows
  • 5 Built-in Patterns: Dream Cascade, Dream Swarm, Sequential, Conditional, Iterative
  • Streaming Progress: Real-time progress callbacks via SSE/WebSocket
  • Cost Tracking: Automatic token and cost tracking
  • Parallel Execution: Configurable concurrent agent execution
  • Retry Logic: Automatic retries with timeout handling
  • Document Generation: Optional PDF/Markdown report generation

Installation

pip install multi-agent-orchestration

Quick Start

Using Built-in Orchestrators

import asyncio
from orchestration import DreamCascadeOrchestrator, DreamCascadeConfig

# Configure the orchestrator
config = DreamCascadeConfig(
    belter_count=3,      # Tier 1: Quick searches
    drummer_count=2,     # Tier 2: Analysis
    camina_count=1,      # Tier 3: Synthesis
    primary_model='gpt-4'
)

# Create with your LLM provider
orchestrator = DreamCascadeOrchestrator(config, provider=your_llm_provider)

# Execute workflow
result = asyncio.run(orchestrator.execute_workflow(
    task="Research quantum computing applications"
))

print(result.final_synthesis)
print(f"Cost: ${result.total_cost:.4f}")

Building Custom Orchestrators

from orchestration import BaseOrchestrator, OrchestratorConfig, SubTask, AgentResult, AgentType

class MyOrchestrator(BaseOrchestrator):
    async def decompose_task(self, task, context=None):
        """Break task into subtasks"""
        return [
            SubTask(
                id='research',
                description=f'Research: {task}',
                agent_type=AgentType.RESEARCHER
            ),
            SubTask(
                id='analyze',
                description='Analyze findings',
                agent_type=AgentType.ANALYST
            )
        ]

    async def execute_subtask(self, subtask, context=None):
        """Execute a single subtask"""
        response = self.provider.complete(
            messages=[{'role': 'user', 'content': subtask.description}]
        )
        return AgentResult(
            agent_id=f'agent_{subtask.id}',
            agent_type=subtask.agent_type,
            subtask_id=subtask.id,
            content=response.content,
            cost=response.usage.get('total_tokens', 0) * 0.00001
        )

    async def synthesize_results(self, agent_results, context=None):
        """Combine results into final output"""
        combined = '\n\n'.join([r.content for r in agent_results])
        return f"## Summary\n\n{combined}"

# Use it
config = OrchestratorConfig(num_agents=2, parallel_execution=True)
orchestrator = MyOrchestrator(config, provider)
result = await orchestrator.execute_workflow("My task")

Built-in Orchestrators

DreamCascadeOrchestrator

Hierarchical research with 3 agent tiers (Belter → Drummer → Camina):

from orchestration import DreamCascadeOrchestrator, DreamCascadeConfig

config = DreamCascadeConfig(
    belter_count=5,      # Tier 1: Quick parallel searches
    drummer_count=3,     # Tier 2: Deep analysis
    camina_count=1       # Tier 3: Final synthesis
)

orchestrator = DreamCascadeOrchestrator(config, provider)

Use for: Deep research, hierarchical analysis, multi-stage workflows

DreamSwarmOrchestrator

Parallel multi-domain search:

from orchestration import DreamSwarmOrchestrator, DreamSwarmConfig

config = DreamSwarmConfig(
    num_agents=5,
    domains=['arxiv', 'github', 'news', 'wikipedia'],
    max_parallel=3
)

orchestrator = DreamSwarmOrchestrator(config, provider)

Use for: Broad information gathering, parallel searches

SequentialOrchestrator

Step-by-step execution:

from orchestration import SequentialOrchestrator, OrchestratorConfig

config = OrchestratorConfig(num_agents=3, parallel_execution=False)
orchestrator = SequentialOrchestrator(config, provider)

Use for: Pipelines, staged workflows, sequential dependencies

ConditionalOrchestrator

Runtime branching:

from orchestration import ConditionalOrchestrator

def should_deep_analyze(context):
    return context.get('complexity') > 0.7

orchestrator = ConditionalOrchestrator(
    config, provider,
    condition=should_deep_analyze,
    true_branch=deep_analysis,
    false_branch=quick_analysis
)

Use for: Adaptive workflows, decision trees

IterativeOrchestrator

Looped refinement:

from orchestration import IterativeOrchestrator

orchestrator = IterativeOrchestrator(
    config, provider,
    max_iterations=5,
    convergence_fn=lambda r: r.score > 0.9
)

Use for: Optimization, iterative improvement

Configuration

from orchestration import OrchestratorConfig

config = OrchestratorConfig(
    num_agents=5,
    primary_model='gpt-4',
    fallback_model='gpt-3.5-turbo',
    max_retries=3,
    timeout_seconds=300,
    parallel_execution=True,
    max_concurrent_agents=3,
    enable_cost_tracking=True,
    generate_documents=False,
    document_formats=['markdown', 'pdf']
)

# Validate configuration
errors = config.validate()
if errors:
    print(f"Config errors: {errors}")

Streaming Progress

async def progress_handler(event):
    print(f"[{event.event_type}] {event.data}")
    if event.progress:
        print(f"Progress: {event.progress:.1f}%")

result = await orchestrator.execute_workflow(
    task="Research task",
    stream_callback=progress_handler
)

Event types:

  • WORKFLOW_START / WORKFLOW_COMPLETE / WORKFLOW_ERROR
  • DECOMPOSITION_START / DECOMPOSITION_COMPLETE
  • AGENT_START / AGENT_COMPLETE / AGENT_ERROR
  • SYNTHESIS_START / SYNTHESIS_COMPLETE
  • DOCUMENT_GENERATION_START / DOCUMENT_GENERATION_COMPLETE

Data Models

from orchestration import (
    SubTask,
    AgentResult,
    OrchestratorResult,
    TaskStatus,
    AgentType,
    StreamEvent,
    EventType
)

# Create a subtask
subtask = SubTask(
    id='task-1',
    description='Analyze data',
    agent_type=AgentType.ANALYST,
    priority=1,
    dependencies=['task-0']
)

# Agent result
result = AgentResult(
    agent_id='agent-1',
    agent_type=AgentType.ANALYST,
    subtask_id='task-1',
    content='Analysis results...',
    status=TaskStatus.COMPLETED,
    execution_time=5.2,
    cost=0.003
)

# Full orchestrator result
orchestrator_result = OrchestratorResult(
    task_id='workflow-1',
    title='Research Task',
    status=TaskStatus.COMPLETED,
    agent_results=[result],
    final_synthesis='Summary...',
    execution_time=45.2,
    total_cost=0.05
)

Utilities

from orchestration import (
    ProgressTracker,
    CostTracker,
    calculate_progress,
    format_duration,
    format_cost,
    retry_async,
    chunk_list,
    deduplicate_by_key
)

# Progress tracking
tracker = ProgressTracker(total_tasks=10)
tracker.update(completed=3)
print(f"{tracker.percentage:.1f}% complete")

# Cost tracking
cost_tracker = CostTracker()
cost_tracker.add_cost(0.05, model='gpt-4')
print(f"Total: {format_cost(cost_tracker.total_cost)}")

# Retry decorator
@retry_async(max_retries=3, base_delay=1.0)
async def unstable_api_call():
    return await api.fetch()

# Helper functions
chunks = chunk_list([1, 2, 3, 4, 5], size=2)  # [[1, 2], [3, 4], [5]]
unique = deduplicate_by_key(items, key='id')

Streaming Helpers

from orchestration import (
    create_sse_callback,
    create_websocket_callback,
    create_progress_bar_callback
)

# For Server-Sent Events
sse_callback = create_sse_callback(response_stream)

# For WebSockets
ws_callback = create_websocket_callback(websocket)

# For CLI progress bar
pb_callback = create_progress_bar_callback()

LLM Provider Integration

Works with any LLM provider that has complete() method:

# OpenAI
from openai import OpenAI
client = OpenAI()

class OpenAIProvider:
    def complete(self, messages, **kwargs):
        response = client.chat.completions.create(
            model=kwargs.get('model', 'gpt-4'),
            messages=messages
        )
        return response.choices[0].message

# Anthropic
from anthropic import Anthropic
client = Anthropic()

class AnthropicProvider:
    def complete(self, messages, **kwargs):
        response = client.messages.create(
            model=kwargs.get('model', 'claude-3-sonnet-20240229'),
            messages=messages
        )
        return response.content[0]

Error Handling

from orchestration import TaskStatus

result = await orchestrator.execute_workflow(task)

if result.status == TaskStatus.FAILED:
    print(f"Workflow failed: {result.error}")
elif result.status == TaskStatus.COMPLETED:
    # Check individual agent results
    for agent_result in result.agent_results:
        if agent_result.status == TaskStatus.FAILED:
            print(f"Agent {agent_result.agent_id} failed: {agent_result.error}")

License

MIT License - see LICENSE file

Author

Luke Steuber

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dreamwalker-1.0.1.tar.gz (35.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dreamwalker-1.0.1-py3-none-any.whl (44.0 kB view details)

Uploaded Python 3

File details

Details for the file dreamwalker-1.0.1.tar.gz.

File metadata

  • Download URL: dreamwalker-1.0.1.tar.gz
  • Upload date:
  • Size: 35.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for dreamwalker-1.0.1.tar.gz
Algorithm Hash digest
SHA256 1e19b7a6a9da45dc3d2a51b0adc84ce77e9f654b95b155305b4884f4058c4845
MD5 bf5d8f17b6cf1128dfae745bb5a0ca9d
BLAKE2b-256 2e6e62731a336e9d5a1bdb554025a946343c626b32749d2b331fcea831bf9ac0

See more details on using hashes here.

File details

Details for the file dreamwalker-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: dreamwalker-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 44.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for dreamwalker-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 84ae3c44eb782d0197eefe18f83915cfbe7d100a1ba835dd716e991f795bb1f7
MD5 cf6c0cf691f2d410fee2522f14474eae
BLAKE2b-256 2ffa505334e8b048e58469e19f5627e48d06c21c1e0775f82b05379c2995f4cf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page