Skip to main content

Multi-Agent AI System for automation

Project description

Ambivo Agents - Multi-Agent AI System

A toolkit for AI-powered automation including data analytics with DuckDB, media processing, knowledge base operations, web scraping, YouTube downloads, HTTP/REST API integration, and more.

Alpha Release Disclaimer

This library is currently in alpha stage. While functional, it may contain bugs, undergo breaking changes, and lack complete documentation. Developers should thoroughly evaluate and test the library before considering it for production use.

For production scenarios, we recommend:

  • Extensive testing in your specific environment
  • Implementing proper error handling and monitoring
  • Having rollback plans in place
  • Staying updated with releases for critical fixes

Table of Contents

Quick Start

ModeratorAgent Example

The ModeratorAgent automatically routes queries to specialized agents:

from ambivo_agents import ModeratorAgent
import asyncio

async def main():
    # Create the moderator
    moderator, context = ModeratorAgent.create(user_id="john")
    
    print(f"Session: {context.session_id}")
    
    # Auto-routing examples
    response1 = await moderator.chat("Download audio from https://youtube.com/watch?v=example")
    response2 = await moderator.chat("Search for latest AI trends")
    response3 = await moderator.chat("Extract audio from video.mp4 as MP3")
    response4 = await moderator.chat("GET https://api.github.com/users/octocat")
    response5 = await moderator.chat("Load data from sales.csv and analyze it")
    response6 = await moderator.chat("What is machine learning?")
    
    # Check available agents
    status = await moderator.get_agent_status()
    print(f"Available agents: {list(status['active_agents'].keys())}")
    
    # Cleanup
    await moderator.cleanup_session()

asyncio.run(main())

KnowledgeSynthesisAgent Example

The KnowledgeSynthesisAgent combines multiple knowledge sources with quality assessment:

from ambivo_agents.agents.knowledge_synthesis import KnowledgeSynthesisAgent
from ambivo_agents.agents.response_quality_assessor import QualityLevel
import asyncio

async def main():
    # Create synthesis agent directly (NOT through ModeratorAgent)
    synthesis_agent, context = KnowledgeSynthesisAgent.create(
        user_id="researcher",
        quality_threshold=QualityLevel.GOOD,
        max_iterations=3,
        enable_auto_scraping=True
    )
    
    print(f"Session: {context.session_id}")
    
    # Multi-source synthesis examples using process_with_quality_assessment
    response1 = await synthesis_agent.process_with_quality_assessment(
        "search across knowledge bases: kb1, kb2 for: AI trends"
    )
    
    response2 = await synthesis_agent.process_with_quality_assessment(
        "prioritize web search - latest developments in robotics 2025"
    )
    
    response3 = await synthesis_agent.process_with_quality_assessment(
        "comprehensive search - research blockchain technology"
    )
    
    # Check quality assessment
    print(f"Quality: {response1.get('quality_assessment', {}).get('quality_level', 'unknown')}")
    print(f"Sources: {response1.get('quality_assessment', {}).get('sources_used', [])}")
    
    # Cleanup
    await synthesis_agent.cleanup_session()

asyncio.run(main())

Simple KnowledgeSynthesisAgent with Specific Knowledge Bases

For targeted knowledge base queries with automatic collection detection:

from ambivo_agents.agents.knowledge_synthesis import KnowledgeSynthesisAgent
import asyncio

async def simple_example():
    # Setup - Create agent and configure knowledge bases
    agent, context = KnowledgeSynthesisAgent.create(user_id="user123")
    agent.update_context_metadata(available_knowledge_bases=[
        'research_trends_in_cryptocurrency_20250816_193439',
        'research_trends_in_robotics_tech_2025_20250812_172007'
    ])
    agent._load_available_collections() # Required after setting knowledge bases
    
    try:
        # Method 1: Full quality assessment (recommended)
        result = await agent.process_with_quality_assessment(
            "What are cryptocurrency trends?"
        )
        print(f"Response: {result['response']}")
        print(f"Quality: {result['quality_assessment']['quality_level']}")
        print(f"Sources: {result['quality_assessment']['sources_used']}")
        
        # Method 2: Simple chat interface
        response = await agent.chat("Tell me about robotics developments")
        print(f"Chat response: {response}")
        
    finally:
        # Always cleanup
        await agent.cleanup_session()

asyncio.run(simple_example())

Command Line Usage

# Install and run
pip install ambivo-agents

# Interactive mode
ambivo-agents

# Single commands
ambivo-agents -q "Download audio from https://youtube.com/watch?v=example"
ambivo-agents -q "Search for Python tutorials"
ambivo-agents -q "Load data from sales.csv and analyze it"
ambivo-agents -q "GET https://jsonplaceholder.typicode.com/posts/1"

Agent Creation

ModeratorAgent (Recommended)

from ambivo_agents import ModeratorAgent

# Create moderator with auto-routing
moderator, context = ModeratorAgent.create(user_id="john")

# Chat with automatic agent selection
result = await moderator.chat("Download audio from https://youtube.com/watch?v=example")

# Cleanup
await moderator.cleanup_session()

Use ModeratorAgent for:

  • Multi-purpose applications
  • Intelligent routing between capabilities
  • Context-aware conversations
  • Simplified development

Direct Agent Creation

from ambivo_agents import YouTubeDownloadAgent

# Create specific agent
agent, context = YouTubeDownloadAgent.create(user_id="john")

# Use agent directly
result = await agent._download_youtube_audio("https://youtube.com/watch?v=example")

# Cleanup
await agent.cleanup_session()

Use Direct Creation for:

  • Single-purpose applications
  • Specific workflows with known requirements
  • Performance-critical applications
  • Custom integrations

Features

Core Capabilities

  • ModeratorAgent: Intelligent multi-agent orchestrator with automatic routing
  • Smart Routing: Automatically routes queries to appropriate specialized agents
  • Data Analytics: In-memory DuckDB integration with CSV/XLS ingestion and text-based visualizations
  • File Ingestion & Processing: All agents can read/parse JSON, CSV, XML, YAML files and insert into databases
  • Context Memory: Maintains conversation history across interactions
  • Docker Integration: Secure, isolated execution environment
  • Redis Memory: Persistent conversation memory with compression
  • Multi-Provider LLM: Automatic failover between OpenAI, Anthropic, and AWS Bedrock
  • Configuration-Driven: All features controlled via agent_config.yaml
  • Workflow System: Multi-agent workflows with parallel and sequential execution
  • System Messages: Customizable system prompts for agent behavior control

Available Agents

ModeratorAgent

Enhanced with Skill Assignment System

  • Intelligent orchestrator that routes to specialized agents
  • Context-aware multi-turn conversations
  • Automatic agent selection based on query analysis
  • Session management and cleanup
  • Workflow execution and coordination
  • NEW: Skill Assignment - Assign external capabilities that take priority over agent routing
  • Smart Skill Routing - Assigned skills are checked first, then falls back to normal agent routing
  • Unified Interface - Single agent that can handle both assigned skills and general orchestration
  • Use Cases: Custom integrations with existing agent orchestration, priority skill handling

KnowledgeSynthesisAgent

Advanced Multi-Source Orchestrator with Quality Assessment

** IMPORTANT**: KnowledgeSynthesisAgent is NOT a specialized agent that gets routed to by ModeratorAgent. It IS an enhanced ModeratorAgent that you use directly as your primary orchestrator.

Key Capabilities:

  • Multi-Source Synthesis - Intelligently combines knowledge bases, web search, and web scraping
  • Quality Assessment - Built-in ResponseQualityAssessor evaluates and improves responses
  • Adaptive Strategy - Automatically selects optimal search strategy based on query analysis
  • Multiple Knowledge Base Support - Query across multiple knowledge bases simultaneously
  • Web Search Integration - Supplements knowledge with real-time web search results
  • Smart Web Scraping - Automatically scrapes relevant URLs for deeper information
  • Iterative Improvement - Refines responses until quality threshold is met
  • Intelligent Routing - Inherits all ModeratorAgent capabilities plus synthesis features

Architecture:

KnowledgeSynthesisAgent extends ModeratorAgent
 Inherits: Agent routing, session management, workflows
 Adds: Multi-source synthesis, quality assessment, intelligent orchestration

Key Differences:

Feature ModeratorAgent KnowledgeSynthesisAgent
Purpose Routes queries to specialized agents Multi-source information synthesis
Response Simple agent routing Quality-assessed, synthesized responses
Sources Single agent per query Multiple sources per query
Quality Control Basic agent responses Built-in quality assessment & iteration
Use Case General orchestration Research & information synthesis
Usage Pattern Router/orchestrator Direct primary agent

** Wrong Usage Pattern:**

# DON'T DO THIS - Won't work!
moderator = ModeratorAgent.create(...)
# Expecting to route to KnowledgeSynthesisAgent - this fails
response = await moderator.chat("synthesize information...") # Fails

** Correct Usage Pattern:**

# DO THIS - Works correctly!
from ambivo_agents.agents.knowledge_synthesis import KnowledgeSynthesisAgent

# Use KnowledgeSynthesisAgent directly as your orchestrator
synthesis_agent, context = KnowledgeSynthesisAgent.create(
    user_id="user123",
    quality_threshold=QualityLevel.GOOD,
    max_iterations=3,
    enable_auto_scraping=True
)

# Multi-knowledge base query
response = await synthesis_agent.process_with_quality_assessment(
    "search across knowledge bases: kb1, kb2 for: AI trends"
)

# Web-supplemented query 
response = await synthesis_agent.process_with_quality_assessment(
    "prioritize web search - latest developments in robotics 2025"
)

# Comprehensive research
response = await synthesis_agent.process_with_quality_assessment(
    "comprehensive search - research blockchain technology trends"
)

await synthesis_agent.cleanup_session()

Quality-Assessed Response Format:

{
    'success': True,
    'response': 'Synthesized answer from multiple sources...',
    'quality_assessment': {
        'quality_level': 'good', # excellent|good|fair|poor|unacceptable
        'confidence_score': 0.85, # 0.0 - 1.0
        'sources_used': ['knowledge_base', 'web_search'],
        'strengths': ['Comprehensive coverage', 'Current information'],
        'weaknesses': ['Could use more specific examples']
    },
    'query_analysis': {
        'query_type': 'current_events',
        'strategy_used': 'web_first',
        'sources_consulted': 2
    },
    'metadata': {
        'iterations': 1,
        'total_sources_consulted': 2,
        'user_preferences': {}
    }
}

Query Patterns:

  • "search across knowledge bases: kb1, kb2, kb3 for: [query]" - Multi-KB search
  • "prioritize web search - [query]" - Web-first strategy
  • "check knowledge base first - [query]" - KB-first strategy
  • "comprehensive search - [query]" - Use all sources
  • "synthesize information about [topic]" - Full synthesis mode

** Use Cases:**

  • Research across multiple knowledge bases
  • Web-supplemented knowledge queries
  • Quality-assured information synthesis
  • Multi-source fact checking
  • Comprehensive topic research

** Configuration:**

  • Enabled by default (inherits from ModeratorAgent)
  • Environment Control: AMBIVO_AGENTS_MODERATOR_ENABLED=true/false
  • No separate enablement needed - uses existing ModeratorAgent configuration

Multi-Knowledge Base Processing Logic

Understanding how multiple knowledge bases are handled across different agents:

ModeratorAgent - Simple Routing

User Query → Intent Analysis → Route to KnowledgeBaseAgent → Return Result
  • No multi-KB logic - simply routes to KnowledgeBaseAgent
  • Single hop - passes query through without modification
  • Fallback routing - falls back to AssistantAgent if KB agent fails

KnowledgeBaseAgent - Advanced Multi-KB Selection

Query → Normalize KB List → Score KBs → Select Top 2 → Query Each → Pick Best by Source Count → Return

** Detailed Flow (Single-Pass, No Iteration):**

Step 1: KB Input Normalization

# Accepts multiple input formats:
kb_names = ["kb1", "kb2"] # List of strings
kb_names = [{"kb_name": "kb1", "description": "..."}, ...] # List of objects
kb_names = '["kb1", "kb2"]' # JSON string

# Normalizes to: [{"kb_name": "kb1", "description": None}, ...]

Step 2: KB Scoring Algorithm

def score_kb_for_query(kb_entry, user_message, topics):
    # Extract meaningful keywords (>3 chars, no stopwords)
    keywords = extract_keywords(user_message + topics)
    
    score = 0.0
    kb_text = f"{kb_name} {kb_description}".lower()
    
    # Base scoring: keyword overlap
    for keyword in keywords:
        if keyword in kb_text:
            score += 1.0
    
    # Domain bonuses
    if "legal" in kb_text and legal_keywords_present:
        score += 1.5
    if "finance" in kb_text and finance_keywords_present: 
        score += 1.0
    
    return score

# Example scoring:
# Query: "blockchain technology trends"
# - crypto_research_kb: score = 2.0 (matches "blockchain", "technology")
# - legal_documents_kb: score = 0.0 (no matches)
# - tech_trends_kb: score = 2.0 (matches "technology", "trends")

Step 3: KB Selection (Performance Optimization)

# Sort by score and select top 2 candidates
scored_kbs = [(kb, score) for kb, score in kb_scores]
scored_kbs.sort(key=lambda x: x[1], reverse=True)

# Select top 2 with positive scores
top_kbs = [kb for kb, score in scored_kbs if score > 0][:2]

# Fallback: use first 2 if no positive scores
if not top_kbs:
    top_kbs = original_kb_list[:2]

Step 4: Multi-KB Querying (Single Pass - No Iteration Loop)

async def query_multiple_kbs(selected_kbs, query):
    results = {}
    best_answer = None
    best_source_count = -1
    
    # Process each KB exactly once (no retries or iterations)
    for kb in selected_kbs: # Usually 2 KBs
        # Single attempt per KB
        result = await query_single_kb(kb.name, query)
        results[kb.name] = result
        
        if result.success:
            source_count = len(result.source_details)
            # Winner = KB with most document sources
            if source_count > best_source_count:
                best_answer = result.answer
                best_source_count = source_count
                best_kb = kb.name
    
    return best_answer, full_metadata

Step 5: Best Answer Selection

  • Criteria: Number of source documents (not quality assessment)
  • Winner takes all: Returns single best answer (no synthesis)
  • Metadata: Includes results from all queried KBs

** Important Characteristics:**

  • No iteration loops - each KB queried exactly once
  • No quality assessment - purely source count based
  • No answer synthesis - returns single best answer
  • Performance optimized - maximum 2 KBs processed
  • Deterministic - same query always produces same KB selection

KnowledgeSynthesisAgent - Multi-Source with Quality Iteration

Query → Analyze Strategy → Gather from All Sources → Quality Assessment → [Iterate if Needed] → Synthesize Final Answer

** Detailed Flow (Iterative with Quality Control):**

Step 1: Query Analysis & Strategy

analysis = await analyze_query(query)
# Determines: knowledge_first, web_first, parallel, or adaptive strategy
# Based on: query type, time sensitivity, complexity

Step 2: Multi-Source Gathering

# Parallel execution across all source types
sources = await asyncio.gather(
    gather_from_knowledge_base(query), # Routes to KnowledgeBaseAgent
    gather_from_web_search(query), # Routes to WebSearchAgent 
    gather_from_web_scraping(query) # Routes to WebScraperAgent
)

Step 3: Quality Assessment & Iteration Loop

for iteration in range(max_iterations): # Usually 3 iterations max
    assessment = await quality_assessor.assess_response(sources, query)
    
    # Check if quality meets threshold
    if assessment.quality_level >= quality_threshold:
        break # Success - exit iteration loop
        
    # If quality insufficient, gather additional sources
    if assessment.needs_additional_sources:
        for suggested_source in assessment.suggested_sources:
            additional_response = await gather_from_source(suggested_source, query)
            sources.append(additional_response)

# Final synthesis of all gathered information
final_answer = await synthesize_responses(sources, query)

Step 4: Quality-Driven Source Integration

  • Quality metrics: accuracy, completeness, relevance, currency
  • Iterative refinement: adds sources until quality threshold met
  • Intelligent synthesis: combines information from all sources
  • Source attribution: tracks which sources contributed to answer

Comparison Summary

Aspect ModeratorAgent KnowledgeBaseAgent KnowledgeSynthesisAgent
Multi-KB Handling None (routes only) Advanced selection logic Routes to KB + other sources
KB Selection No selection Score-based top 2 selection Uses KB agent's selection
Iteration No No (single-pass) Yes (quality-driven)
Answer Selection First response Most sources wins Quality assessment
Synthesis None None (single answer) Multi-source synthesis
Performance Fast (single route) Fast (max 2 KBs) Slower (comprehensive)
Quality Control Basic Source count based Advanced quality assessment
Use Case General routing Multi-KB queries Research & comprehensive answers

When to Use Which Agent

Use ModeratorAgent when:

  • Simple routing to appropriate agents
  • General-purpose applications
  • Fast response times needed

Use KnowledgeBaseAgent directly when:

  • Specifically need to query multiple knowledge bases
  • Want deterministic KB selection logic
  • Need metadata about which KBs were used

Use KnowledgeSynthesisAgent when:

  • Need comprehensive, high-quality answers
  • Want information from multiple source types
  • Quality assessment and iteration is important
  • Research-oriented tasks requiring synthesis

Database Agent (Optional)

Best for: Database connections, data exploration, and basic queries

  • Multi-Database Support: MongoDB, MySQL, and PostgreSQL connections
  • Schema Analysis: Automatic database structure discovery and exploration
  • Natural Language Queries: Convert conversational requests to SQL/MongoDB queries
  • File Ingestion: Direct JSON/CSV import into database tables
  • Safety-First Design: Read-only mode by default, simple SELECT queries only
  • Export Integration: Seamless handoff to AnalyticsAgent for complex analysis
  • Intentionally Limited: Simple queries only (no JOINs, window functions, CTEs)
  • Use Cases: Data exploration, basic queries, file imports, database connections
  • Note: Requires database extra: pip install ambivo-agents[database]

Analytics Agent

Best for: Complex data analysis, advanced SQL, and statistical operations

  • Advanced SQL Engine: Full DuckDB support with complex operations
  • Complex JOINs: INNER, LEFT, RIGHT, OUTER joins across multiple datasets
  • Window Functions: ROW_NUMBER(), RANK(), SUM() OVER(), statistical analysis
  • Advanced Aggregations: GROUP BY with HAVING, complex statistical functions
  • CTEs & Subqueries: WITH clauses, correlated subqueries, complex logic
  • UNION Operations: Combine result sets with UNION/UNION ALL
  • Multi-File Analysis: Load and join multiple CSV/XLSX files simultaneously
  • Statistical Functions: Percentiles, correlations, trend analysis, outlier detection
  • Visualization: Text-based charts and intelligent chart recommendations
  • Docker Security: All operations run in isolated containers
  • Use Cases: Business intelligence, complex analytics, statistical modeling, data science

When to Use Which Agent:

  • DatabaseAgent: Simple queries, database connections, data exploration
  • AnalyticsAgent: Complex analysis, joins, statistical operations, advanced SQL

Assistant Agent

Enhanced with Skill Assignment System

  • General purpose conversational AI with intelligent skill routing
  • Context-aware responses and multi-turn conversations
  • Customizable system messages

Skill Assignment

Instead of manually routing to specialized agents, you assign skills to AssistantAgent or ModeratorAgent. They automatically detect when to use these skills and handle the technical details internally.

from ambivo_agents import AssistantAgent

assistant = AssistantAgent.create_simple(user_id="user123")

# Assign an API skill from an OpenAPI spec
await assistant.assign_api_skill(
    api_spec_path="/path/to/openapi.yaml",
    base_url="https://api.example.com/v1",
    api_token="your-token"
)

# Natural language requests now route to the API automatically
response = await assistant.chat("create a lead for John Doe")
# Agent: detects intent -> spawns APIAgent -> makes API call -> returns natural response

await assistant.cleanup_session()

Available skill types:

# 1. API Skills -- triggers on: "create lead", "call api", "list contacts", etc.
await agent.assign_api_skill(
    api_spec_path="path/to/openapi.yaml",
    base_url="https://api.example.com",
    api_token="your-token",
    skill_name="my_api"                    # optional
)

# 2. Database Skills -- triggers on: "query database", "show data", "recent sales", etc.
await agent.assign_database_skill(
    connection_string="postgresql://user:pass@host:5432/db",
    skill_name="main_db",
    description="Customer database"
)

# 3. Knowledge Base Skills -- triggers on: "search docs", "company policy", etc.
await agent.assign_kb_skill(
    documents_path="/path/to/docs/",
    collection_name="company_docs",
    skill_name="knowledge",
    temporary=True,              # use temp KB with TTL lifecycle (default: True)
    ttl_hours=24,                # TTL for temporary KBs
    vectordb_api_url=None,       # override vectordb-api URL (default: from config)
    vectordb_api_token=None,     # override auth token for vectordb-api
)

Priority: Skills are checked before normal routing. In AssistantAgent, skills run before conversation processing. In ModeratorAgent, skills take priority over agent routing. If no skills match, normal behavior continues.

# List assigned skills
skills = agent.list_assigned_skills()
# {'api_skills': ['my_api'], 'database_skills': ['main_db'], 'kb_skills': [], 'total_skills': 2}

Code Executor Agent

  • Secure Python and Bash execution in Docker
  • Isolated environment with resource limits
  • Real-time output streaming

Web Search Agent

  • Multi-provider search (Brave, AVES APIs)
  • Academic search capabilities
  • Automatic provider failover

Web Scraper Agent

  • Proxy-enabled scraping (ScraperAPI compatible)
  • Playwright and requests-based scraping
  • Batch URL processing with rate limiting

Knowledge Base Agent

  • Document ingestion (PDF, DOCX, TXT, web URLs)
  • Vector similarity search with Qdrant
  • Semantic question answering

Media Editor Agent

  • Audio/video processing with FFmpeg
  • Format conversion, resizing, trimming
  • Audio extraction and volume adjustment

YouTube Download Agent

  • Download videos and audio from YouTube using yt-dlp
  • Docker-based or local execution (configurable via use_docker)
  • Automatic title sanitization and metadata extraction
  • Bot detection handling — graceful fallback with user-friendly messages
  • Proxy support — plug in residential/SOCKS5 proxies via env vars
  • Cookie auth — support for cookies file or base64-encoded cookies

Gather Agent

Intelligent conversational form-filling with natural language understanding

  • Conversational Interface: Ask questions one at a time with natural flow
  • Multiple Question Types: free-text, yes-no, single-select, multi-select
  • Smart Questionnaire Loading: JSON/YAML from chat, files, or URLs
  • Conditional Logic: Advanced dependent question workflows
  • Natural Language Parsing (NEW): Understand conversational responses
    • "Absolutely!" → "Yes" for yes/no questions
    • "I'd prefer email" → maps to email option in single-select
    • "Both AWS and Azure" → maps to multiple selections
    • "We have about 4 people" → maps to "3-5 people" range
  • Graceful Fallback: Standard exact matching when NLP is disabled
  • Session Persistence: Remember answers across conversation (~1 hour)
  • API Submission: Configurable endpoint with collection status tracking

Configuration:

# Enable natural language understanding (requires LLM)
gather:
  enable_natural_language_parsing: true # Default: false
  
# Or via environment variable:
# export AMBIVO_AGENTS_GATHER_ENABLE_NATURAL_LANGUAGE_PARSING=true

Usage

# Interactive CLI demo (waits for your input after each question)
python examples/gather_cli.py

# Load questionnaire from a file path or URL
python examples/gather_cli.py --path ./questionnaire.yaml
python examples/gather_cli.py --path https://example.com/questionnaire.json

# Do a real HTTP submission (configure endpoint in agent_config.yaml)
python examples/gather_cli.py --real-submit

# Minimal scripted example
python examples/gather_simple.py

Configuration

agent_capabilities:
  enable_gather: true

gather:
  submission_endpoint: "https://your-api.example.com/submit"
  submission_method: "POST"
  submission_headers:
    Authorization: "Bearer your-api-token"
    Content-Type: "application/json"
  memory_ttl_seconds: 3600
  # Optional: Validate free-text answers with LLM before proceeding
  enable_llm_answer_validation: false
  answer_validation:
    default_min_length: 1
  # Optional: Let LLM rewrite prompts (off by default for predictability)
  enable_llm_prompt_rewrite: false

See also: examples/gather_cli.py and examples/gather_simple.py for end-to-end demos. You can also wrap GatherAgent behind a small REST API for chatbot UIs (start/reply/finish/abort pattern).

API Agent

  • Comprehensive HTTP/REST API integration
  • Multiple authentication methods (Bearer, API Key, Basic, OAuth2)
  • Pre-fetch authentication with automatic token refresh
  • Built-in retry logic with exponential backoff
  • Security features (domain filtering, SSL verification)
  • Support for all HTTP methods (GET, POST, PUT, DELETE, etc.)

API Agent Configuration

Timeout Settings:

The API Agent supports configurable timeout settings for different use cases:

# Environment variable configuration
export AMBIVO_AGENTS_API_AGENT_TIMEOUT_SECONDS=46 # Custom timeout in seconds

# Or in agent_config.yaml
api_agent:
  timeout_seconds: 46 # Request timeout
  max_safe_timeout: 46 # Requests above this use Docker for safety
  force_docker_above_timeout: false # Enable Docker for long-running requests

Localhost and Domain Access:

For testing with local services or specific domain restrictions:

# Allow localhost access (disabled by default for security)
export AMBIVO_AGENTS_API_AGENT_ALLOWED_DOMAINS="127.0.0.1,localhost,api.example.com"
export AMBIVO_AGENTS_API_AGENT_BLOCKED_DOMAINS="" # Clear default blocks

# Or in agent_config.yaml
api_agent:
  allowed_domains:
    - "127.0.0.1"
    - "localhost" 
    - "api.example.com"
    - "*.trusted-domain.com" # Wildcards supported
  blocked_domains: [] # Override default localhost blocks
  
  # Default security settings (recommended for production)
  # allowed_domains: null # Allows all except blocked
  # blocked_domains:
  # - "localhost"
  # - "127.0.0.1"
  # - "0.0.0.0"
  # - "169.254.169.254" # AWS metadata service

Usage Examples:

from ambivo_agents import APIAgent

# Test with local transcription service
async def test_local_api():
    agent = APIAgent.create_simple(user_id="tester")
    
    # Natural language API call
    response = await agent.chat("""
    Make a POST request to http://127.0.0.1:8002/kh/transcribe with:
    - Authorization: Bearer your-jwt-token
    - Content-Type: application/json 
    - Body: {"s3_url": "https://your-bucket.s3.amazonaws.com/audio.wav"}
    """)
    
    await agent.cleanup_session()

# Direct API request with custom timeout
async def direct_api_call():
    from ambivo_agents.agents.api_agent import APIRequest, HTTPMethod
    
    agent = APIAgent.create_simple(user_id="api_user")
    
    request = APIRequest(
        url="http://127.0.0.1:8002/kh/transcribe",
        method=HTTPMethod.POST,
        headers={
            "Authorization": "Bearer your-jwt-token",
            "Content-Type": "application/json"
        },
        json_data={"s3_url": "https://your-bucket.s3.amazonaws.com/audio.wav"},
        timeout=46 # Custom timeout in seconds
    )
    
    response = await agent.make_api_request(request)
    print(f"Status: {response.status_code}")
    print(f"Duration: {response.duration_ms}ms")
    
    await agent.cleanup_session()

Security Notes:

  • Localhost access is blocked by default for security
  • Always use allowed_domains in production to restrict API access
  • Set appropriate timeouts to prevent long-running requests
  • Consider using max_safe_timeout for requests that might take longer

Workflow System

The workflow system enables multi-agent orchestration with sequential and parallel execution patterns:

Basic Workflow Usage

from ambivo_agents.core.workflow import WorkflowBuilder, WorkflowPatterns
from ambivo_agents import ModeratorAgent

async def workflow_example():
    # Create moderator with agents
    moderator, context = ModeratorAgent.create(
        user_id="workflow_user",
        enabled_agents=['web_search', 'web_scraper', 'knowledge_base']
    )
    
    # Create search -> scrape -> ingest workflow
    workflow = WorkflowPatterns.create_search_scrape_ingest_workflow(
        moderator.specialized_agents['web_search'],
        moderator.specialized_agents['web_scraper'], 
        moderator.specialized_agents['knowledge_base']
    )
    
    # Execute workflow
    result = await workflow.execute(
        "Research renewable energy trends and store in knowledge base",
        context.to_execution_context()
    )
    
    if result.success:
        print(f"Workflow completed in {result.execution_time:.2f}s")
        print(f"Nodes executed: {result.nodes_executed}")
    
    await moderator.cleanup_session()

Advanced Workflow Features

from ambivo_agents.core.enhanced_workflow import (
    AdvancedWorkflowBuilder, EnhancedModeratorAgent
)

async def advanced_workflow():
    # Create enhanced moderator
    base_moderator, context = ModeratorAgent.create(user_id="advanced_user")
    enhanced_moderator = EnhancedModeratorAgent(base_moderator)
    
    # Natural language workflow triggers
    response1 = await enhanced_moderator.process_message_with_workflows(
        "I need agents to reach consensus on climate solutions"
    )
    
    response2 = await enhanced_moderator.process_message_with_workflows(
        "Create a debate between agents about AI ethics"
    )
    
    # Check workflow status
    status = await enhanced_moderator.get_workflow_status()
    print(f"Available workflows: {status['advanced_workflows']['registered']}")

Workflow Patterns

All patterns are implemented in ambivo_agents/core/workflow.py and enhanced_workflow.py:

Pattern Factory Method Implementation
Sequential WorkflowBuilder + execute() Iterates nodes in order, passes each response as input to the next agent
Parallel WorkflowBuilder + execute_parallel() Groups nodes into execution levels, runs each level with asyncio.gather()
Consensus AdvancedWorkflowPatterns.create_consensus_workflow() Loops agents up to 5 iterations; checks agreement via keyword scoring (agree/yes/correct vs disagree/no/wrong). Consensus detection is keyword-based, not LLM-based.
Debate AdvancedWorkflowPatterns.create_debate_workflow() Multi-round structured debate between debater agents with a moderator agent at start and end
Error Recovery AdvancedWorkflowPatterns.create_error_recovery_workflow() Conditional routing: primary agent runs first; on error (message type or keyword), falls back through a chain of backup agents
Map-Reduce AdvancedWorkflowBuilder.add_map_reduce_pattern() Parallel mapper agents all feed into a single reducer agent

Concurrency model: All async concurrency uses asyncio (no Celery or external task queues). Parallel workflows use asyncio.gather() for concurrent agent execution within a single process.

System Messages

System messages control agent behavior and responses. Each agent supports custom system prompts:

Default System Messages

# Agents come with role-specific system messages
assistant_agent = AssistantAgent.create_simple(user_id="user")
# Default: "You are a helpful AI assistant. Provide accurate, thoughtful responses..."

code_agent = CodeExecutorAgent.create_simple(user_id="user") 
# Default: "You are a code execution specialist. Write clean, well-commented code..."

Custom System Messages

from ambivo_agents import AssistantAgent

# Create agent with custom system message
custom_system = """You are a technical documentation specialist. 
Always provide detailed explanations with code examples. 
Use professional terminology and structured responses."""

agent, context = AssistantAgent.create(
    user_id="doc_specialist",
    system_message=custom_system
)

response = await agent.chat("Explain REST API design principles")

Moderator System Messages

from ambivo_agents import ModeratorAgent

# Custom moderator behavior
moderator_system = """You are a project management assistant.
Route technical queries to appropriate agents and provide 
executive summaries of complex multi-agent interactions."""

moderator, context = ModeratorAgent.create(
    user_id="pm_user",
    system_message=moderator_system
)

System Message Features

  • Context Integration: System messages work with conversation history
  • Agent-Specific: Each agent type has optimized default prompts
  • Workflow Aware: System messages adapt to workflow contexts
  • Provider Compatibility: Works with all LLM providers (OpenAI, Anthropic, Bedrock)

Prerequisites

Required

  • Python 3.11+
  • Docker (for code execution, media processing, YouTube downloads)
  • Redis (Cloud Redis recommended)

API Keys (Optional - based on enabled features)

  • OpenAI API Key (for GPT models)
  • Anthropic API Key (for Claude models)
  • AWS Credentials (for Bedrock models)
  • Brave Search API Key (for web search)
  • AVES API Key (for web search)
  • ScraperAPI/Proxy credentials (for web scraping)
  • Qdrant Cloud API Key (for Knowledge Base operations)
  • Redis Cloud credentials (for memory management)

Installation

1. Install Dependencies

Core Installation (Python 3.11-3.13):

pip install ambivo-agents

The core package includes OpenAI, Anthropic, Redis, Docker, and essential utilities. All LLM calls use the provider SDKs directly (no LangChain required).

With Optional Features:

# Web capabilities (APIAgent, WebScraperAgent)
pip install ambivo-agents[web]

# Media processing (YouTubeDownloadAgent)
pip install ambivo-agents[media]

# AWS Bedrock LLM support
pip install ambivo-agents[aws]

# Database support (DatabaseAgent - MongoDB, MySQL, PostgreSQL)
pip install ambivo-agents[database]

# Document processing (PDF, DOCX, PPTX, images, unstructured)
pip install ambivo-agents[documents]

# Data analytics (AnalyticsAgent - pandas)
pip install ambivo-agents[analytics]

# Async utilities (aiohttp, aiofiles, aiosqlite)
pip install ambivo-agents[async]

# Knowledge base (KnowledgeBaseAgent - LlamaIndex, Qdrant, LangChain)
# NOTE: Python 3.11-3.12 only - LlamaIndex/LangChain do not support 3.13
pip install ambivo-agents[knowledge]

# All runtime extras (Python 3.11-3.13)
pip install ambivo-agents[full]

# Everything including dev tools (Python 3.11-3.13)
pip install ambivo-agents[all]

# Full extras + knowledge base (Python 3.11-3.12 only)
pip install ambivo-agents[all-ml]

Extras Comparison: [full] vs [all] vs [all-ml]

Extra Includes Use Case
[full] web, media, aws, database, documents, analytics, async Production / integration — all runtime agent capabilities, no dev tooling
[all] [full] + dev tools (pytest, black, isort, pre-commit) Development — for contributors working on ambivo-agents itself
[all-ml] [all] + knowledge (LlamaIndex, LangChain, Qdrant) Development with KB — Python 3.11-3.12 only

Recommendation: Use [full] for applications that consume ambivo-agents as a dependency. Use [all] or [all-ml] only when developing or testing the ambivo-agents package itself.

Note: None of the bundle extras include [knowledge] (LlamaIndex/LangChain/Qdrant) except [all-ml]. If your application needs KnowledgeBaseAgent, either install [knowledge] separately or use [all-ml]. Applications like vectordb-api that already list LlamaIndex and LangChain in their own requirements.txt do not need the [knowledge] extra.

Python Version Compatibility

Extra Python 3.11 Python 3.12 Python 3.13
Core (pip install ambivo-agents) Yes Yes Yes
[web], [media], [aws], [database] Yes Yes Yes
[documents], [analytics], [async] Yes Yes Yes
[full], [all] Yes Yes Yes
[knowledge] Yes Yes No
[all-ml] Yes Yes No

The [knowledge] extra depends on LangChain and LlamaIndex for vector database integration (Qdrant). These libraries do not yet support Python 3.13. All other features work across Python 3.11-3.13.

Local Development (recommended):

# Python 3.13
pip uninstall ambivo-agents -y && pip install -e ".[all]"
# pip uninstall ambivo-agents -y && pip install -e <local-path-to-ambivo-agent>

# Python 3.11-3.12 (includes knowledge base)
pip uninstall ambivo-agents -y && pip install -e ".[all-ml]"

2. Setup Docker Images

docker pull sgosain/amb-ubuntu-python-public-pod

Note: If deploying inside the Docker image itself (e.g., on Railway, Render), you can skip this step and set use_docker: false in config. See Running Without Docker below.

3. Setup Redis

Recommended: Cloud Redis

# In agent_config.yaml
redis:
  host: "your-redis-cloud-endpoint.redis.cloud"
  port: 6379
  password: "your-redis-password"

Alternative: Local Redis

# Using Docker
docker run -d --name redis -p 6379:6379 redis:latest

Configuration

Create agent_config.yaml in your project root:

# Redis Configuration (Required)
redis:
  host: "your-redis-cloud-endpoint.redis.cloud"
  port: 6379
  db: 0
  password: "your-redis-password"

# LLM Configuration (Required - at least one provider)
llm:
  preferred_provider: "openai"
  temperature: 0.7
  openai_api_key: "your-openai-key"
  anthropic_api_key: "your-anthropic-key"
  aws_access_key_id: "your-aws-key"
  aws_secret_access_key: "your-aws-secret"
  aws_region: "us-east-1"

# Agent Capabilities
agent_capabilities:
  enable_knowledge_base: true
  enable_web_search: true
  enable_code_execution: true
  enable_file_processing: true
  enable_web_ingestion: true
  enable_api_calls: true
  enable_web_scraping: true
  enable_proxy_mode: true
  enable_media_editor: true
  enable_youtube_download: true
  enable_analytics: true

# ModeratorAgent default agents
moderator:
  default_enabled_agents:
    - knowledge_base
    - web_search
    - assistant
    - media_editor
    - youtube_download
    - code_executor
    - web_scraper
    - analytics

# Service-specific configurations
web_search:
  brave_api_key: "your-brave-api-key"
  avesapi_api_key: "your-aves-api-key"

knowledge_base:
  qdrant_url: "https://your-cluster.qdrant.tech"
  qdrant_api_key: "your-qdrant-api-key"
  chunk_size: 1024
  chunk_overlap: 20
  similarity_top_k: 5

youtube_download:
  docker_image: "sgosain/amb-ubuntu-python-public-pod"
  download_dir: "./youtube_downloads"
  timeout: 600
  memory_limit: "1g"
  default_audio_only: true

analytics:
  docker_image: "sgosain/amb-ubuntu-python-public-pod"
  input_subdir: "analytics"
  output_subdir: "analytics"
  temp_subdir: "analytics"
  handoff_subdir: "analytics"
  timeout: 120
  memory_limit: "2g"

docker:
  timeout: 60
  memory_limit: "512m"
  images: ["sgosain/amb-ubuntu-python-public-pod"]

Configuration Methods

The library supports two configuration methods:

1. Environment Variables (Recommended for Production)

Quick Start with Environment Variables:

# Download and edit the full template
curl -o set_env.sh https://github.com/ambivo-corp/ambivo-agents/raw/main/set_env_template.sh
chmod +x set_env.sh

# Edit the template with your credentials, then source it
source set_env.sh

Replace ALL placeholder values with your actual credentials:

  • Redis connection details
  • LLM API keys (OpenAI/Anthropic)
  • Web Search API keys (Brave/AVES)
  • Knowledge Base credentials (Qdrant)
  • Web Scraping proxy (ScraperAPI)

Minimal Environment Setup:

# Required - Redis
export AMBIVO_AGENTS_REDIS_HOST="your-redis-host.redis.cloud"
export AMBIVO_AGENTS_REDIS_PORT="6379"
export AMBIVO_AGENTS_REDIS_PASSWORD="your-redis-password"

# Required - At least one LLM provider
export AMBIVO_AGENTS_LLM_OPENAI_API_KEY="sk-your-openai-key"

# Optional - Enable specific agents
export AMBIVO_AGENTS_AGENT_CAPABILITIES_ENABLE_YOUTUBE_DOWNLOAD="true"
export AMBIVO_AGENTS_AGENT_CAPABILITIES_ENABLE_WEB_SEARCH="true"
export AMBIVO_AGENTS_AGENT_CAPABILITIES_ENABLE_ANALYTICS="true"

# Run your application
python your_app.py

2. YAML Configuration (Traditional)

Use the full YAML template:

# Download and edit the full template
curl -o agent_config_sample.yaml https://github.com/ambivo-corp/ambivo-agents/raw/main/agent_config_sample.yaml

# Copy to your config file and edit with your credentials
cp agent_config_sample.yaml agent_config.yaml

Replace ALL placeholder values with your actual credentials, then create agent_config.yaml in your project root.

Docker Deployment with Environment Variables

# docker-compose.yml
version: '3.8'
services:
  ambivo-app:
    image: your-app:latest
    environment:
      - AMBIVO_AGENTS_REDIS_HOST=redis
      - AMBIVO_AGENTS_REDIS_PORT=6379
      - AMBIVO_AGENTS_LLM_OPENAI_API_KEY=${OPENAI_API_KEY}
      - AMBIVO_AGENTS_AGENT_CAPABILITIES_ENABLE_YOUTUBE_DOWNLOAD=true
      - AMBIVO_AGENTS_AGENT_CAPABILITIES_ENABLE_ANALYTICS=true
    volumes:
      - ./downloads:/app/downloads
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - redis
  
  redis:
    image: redis:latest
    ports:
      - "6379:6379"

Note: Environment variables take precedence over YAML configuration. The agent_config.yaml file is optional when using environment variables.

Project Structure

ambivo_agents/
 agents/ # Agent implementations
    analytics.py # Analytics Agent (DuckDB data analysis)
    api_agent.py # API Agent (HTTP/REST integration)
    assistant.py # Assistant Agent (general conversation)
    code_executor.py # Code Executor Agent (Docker-based execution)
    database_agent.py # Database Agent (MongoDB, MySQL, PostgreSQL)
    knowledge_base.py # Knowledge Base Agent (Qdrant vector search)
    media_editor.py # Media Editor Agent (FFmpeg processing)
    moderator.py # ModeratorAgent (main orchestrator)
    web_scraper.py # Web Scraper Agent (Playwright-based)
    web_search.py # Web Search Agent (Brave/AVES search)
    youtube_download.py # YouTube Download Agent (yt-dlp)
 config/ # Configuration management
 core/ # Core functionality
    base.py
    llm.py
    memory.py
    workflow.py # Basic workflow system
    enhanced_workflow.py # Advanced workflow patterns
 executors/ # Execution environments
 services/ # Service layer
 __init__.py # Package initialization
 cli.py # Command line interface

Skill Assignment System

Overview

The Skill Assignment System allows AssistantAgent and ModeratorAgent to be "assigned" external capabilities like API specifications, database connections, and knowledge bases. The agents then intelligently detect when to use these skills and internally spawn specialized agents on-demand.

Key Features

  • Intelligent Intent Detection - Automatically detects when user requests should use assigned skills
  • Dynamic Agent Spawning - Creates APIAgent, DatabaseAgent, KnowledgeBaseAgent internally as needed
  • Natural Language Translation - Converts technical responses to conversational language
  • Graceful Fallback - Falls back to normal agent behavior when no skills match
  • Priority System - Assigned skills take precedence over normal agent routing

API Skill Assignment

from ambivo_agents import AssistantAgent

async def api_skill_example():
    # Create agent
    assistant = AssistantAgent.create_simple(user_id="developer")
    
    # Assign API skill from OpenAPI spec
    result = await assistant.assign_api_skill(
        api_spec_path="/path/to/openapi.yaml",
        base_url="https://api.example.com/v1",
        api_token="your-api-token",
        skill_name="my_api"
    )
    
    # Now natural language API requests work automatically!
    response = await assistant.chat("create a lead for John Doe")
    # Agent automatically:
    # 1. Detects API intent
    # 2. Spawns APIAgent internally 
    # 3. Makes the API call
    # 4. Returns natural language response
    
    await assistant.cleanup_session()

Database Skill Assignment

async def database_skill_example():
    assistant = AssistantAgent.create_simple(user_id="analyst")
    
    # Assign database skill
    await assistant.assign_database_skill(
        connection_string="postgresql://user:pass@localhost:5432/sales_db",
        skill_name="sales_database",
        description="Sales and customer data"
    )
    
    # Natural language database queries
    response = await assistant.chat("show me recent sales data")
    # Internally creates DatabaseAgent and executes query
    
    await assistant.cleanup_session()

Knowledge Base Skill Assignment

async def kb_skill_example():
    assistant = AssistantAgent.create_simple(user_id="support")
    
    # Assign knowledge base skill
    await assistant.assign_kb_skill(
        documents_path="/path/to/company/docs/",
        collection_name="company_knowledge",
        skill_name="company_docs"
    )
    
    # Document search requests
    response = await assistant.chat("what do our docs say about pricing?")
    # Internally creates KnowledgeBaseAgent and searches documents
    
    await assistant.cleanup_session()

Multiple Skills with ModeratorAgent

async def multiple_skills_example():
    moderator = ModeratorAgent.create_simple(
        user_id="power_user",
        enabled_agents=["assistant", "api_agent", "database_agent"]
    )
    
    # Assign multiple skills
    await moderator.assign_api_skill("/path/to/api_spec.yaml", "https://api.example.com")
    await moderator.assign_database_skill("postgresql://localhost/db", "main_db")
    await moderator.assign_kb_skill("/docs/", skill_name="knowledge")
    
    # Skills take priority over agent routing
    response1 = await moderator.chat("create a lead") # → Uses API skill
    response2 = await moderator.chat("query the database") # → Uses DB skill 
    response3 = await moderator.chat("search documentation") # → Uses KB skill
    response4 = await moderator.chat("what's the weather?") # → Normal routing
    
    await moderator.cleanup_session()

Skill Management

async def skill_management():
    assistant = AssistantAgent.create_simple(user_id="user")
    
    # Assign skills
    await assistant.assign_api_skill("/api/spec.yaml", skill_name="api1")
    await assistant.assign_database_skill("mysql://localhost/db", "db1")
    
    # Check assigned skills
    skills = assistant.list_assigned_skills()
    print(f"Assigned skills: {skills}")
    # Output: {'api_skills': ['api1'], 'database_skills': ['db1'], 'kb_skills': [], 'total_skills': 2}
    
    # Agent status includes skill information
    status = assistant.get_agent_status()
    print(f"Capabilities: {status['capabilities']}")
    print(f"Assigned skills: {status['assigned_skills']}")
    
    await assistant.cleanup_session()

Usage Examples

ModeratorAgent with Auto-Routing

from ambivo_agents import ModeratorAgent
import asyncio

async def basic_moderator():
    moderator, context = ModeratorAgent.create(user_id="demo_user")
    
    # Auto-routing examples
    examples = [
        "Download audio from https://youtube.com/watch?v=example",
        "Search for latest artificial intelligence news", 
        "Load data from sales.csv and analyze trends",
        "Extract audio from video.mp4 as high quality MP3",
        "What is machine learning and how does it work?",
    ]
    
    for query in examples:
        response = await moderator.chat(query)
        print(f"Response: {response[:100]}...")
    
    await moderator.cleanup_session()

asyncio.run(basic_moderator())

Context-Aware Conversations

async def context_conversation():
    moderator, context = ModeratorAgent.create(user_id="context_demo")
    
    # Initial request 
    response1 = await moderator.chat("Download audio from https://youtube.com/watch?v=example")
    
    # Follow-up using context
    response2 = await moderator.chat("Actually, download the video instead of just audio")
    
    # Another follow-up
    response3 = await moderator.chat("Get information about that video")
    
    await moderator.cleanup_session()

YouTube Downloads

The YouTube Download Agent uses yt-dlp for reliable video/audio downloads with built-in bot-detection handling.

from ambivo_agents import YouTubeDownloadAgent

async def download_youtube():
    agent, context = YouTubeDownloadAgent.create(user_id="media_user")
    
    # Download audio
    result = await agent._download_youtube_audio(
        "https://youtube.com/watch?v=example"
    )
    
    if result['success']:
        print(f"Audio downloaded: {result['filename']}")
        print(f"Path: {result['file_path']}")
    
    await agent.cleanup_session()

YouTube Environment Variables

YouTube may block downloads from cloud/server IPs. Configure authentication and proxy settings via environment variables:

Variable Description Example
YT_DLP_COOKIES_FILE Path to a Netscape-format cookies.txt file /app/cookies.txt
YT_DLP_COOKIES_BASE64 Base64-encoded cookies.txt content (ideal for PaaS like Railway) IyBOZXRzY2FwZS...
YT_DLP_COOKIES_FROM_BROWSER Browser to extract cookies from (local dev only) chrome, firefox
YT_DLP_PROXY HTTP/SOCKS5 proxy URL for download requests socks5://user:pass@host:port

Priority order: YT_DLP_COOKIES_FILE > YT_DLP_COOKIES_BASE64 > YT_DLP_COOKIES_FROM_BROWSER

Generating cookies for server deployment:

# 1. On your local machine, export cookies from your browser
yt-dlp --cookies-from-browser chrome --cookies cookies.txt "https://www.youtube.com/watch?v=dQw4w9WgXcQ"

# 2. Base64-encode the cookies file
base64 -i cookies.txt | tr -d '\n'

# 3. Set the base64 string as an environment variable on your server
export YT_DLP_COOKIES_BASE64="<paste base64 string>"

Using a proxy (e.g., Bright Data, Oxylabs):

# HTTP proxy
export YT_DLP_PROXY="http://user:pass@proxy.example.com:8080"

# SOCKS5 proxy (residential IPs recommended)
export YT_DLP_PROXY="socks5://user:pass@proxy.example.com:1080"

Note: When YouTube blocks a download due to bot detection, the agent returns a user-friendly message explaining the issue and available options instead of a raw error.

Database Operations

DatabaseAgent - Basic Queries & Exploration

from ambivo_agents import DatabaseAgent

async def database_exploration_demo():
    """DatabaseAgent - Perfect for database connections and basic queries"""
    agent = DatabaseAgent.create_simple(user_id="db_user")
    
    # Connect to databases
    await agent.chat("Connect to MySQL database at localhost:3306, database: mydb, username: user, password: pass")
    # OR: await agent.chat("Connect to MongoDB using URI mongodb://localhost:27017/myapp")
    # OR: await agent.chat("Connect to PostgreSQL at localhost:5432 database mydb user postgres password secret")
    
    # Schema discovery and exploration
    schema = await agent.chat("show me the database schema")
    tables = await agent.chat("list all tables and collections")
    structure = await agent.chat("describe the users table structure")
    
    # Simple natural language queries (safety-limited)
    users = await agent.chat("show me all users") # → SELECT * FROM users LIMIT 10
    count = await agent.chat("count total orders") # → SELECT COUNT(*) FROM orders
    recent = await agent.chat("show recent sales") # → SELECT * FROM sales ORDER BY date DESC LIMIT 10
    
    # File ingestion into database
    await agent.chat("ingest users.csv into users table")
    await agent.chat("load sales.json into MongoDB sales collection")
    
    # Export data for complex analysis
    await agent.chat("export sales data for analytics") # → Hands off to AnalyticsAgent
    
    await agent.cleanup_session()

AnalyticsAgent - Advanced SQL & Complex Analysis

from ambivo_agents import AnalyticsAgent

async def advanced_analytics_demo():
    """AnalyticsAgent - Advanced SQL operations and complex analysis"""
    agent = AnalyticsAgent.create_simple(user_id="analyst")
    
    # Load multiple datasets for complex analysis
    await agent.chat("load data from sales.csv, customers.csv, and products.xlsx")
    
    # Complex JOINs and multi-table analysis
    result = await agent.chat("""
    Find top customers by revenue with their order history:
    JOIN sales with customers and calculate total revenue per customer
    """)
    # → Generates: SELECT c.name, c.email, SUM(s.amount) as total_revenue, COUNT(s.id) as order_count
    # FROM customers c JOIN sales s ON c.id = s.customer_id 
    # GROUP BY c.id, c.name, c.email ORDER BY total_revenue DESC LIMIT 10
    
    # Window functions for advanced analytics
    trends = await agent.chat("""
    Calculate monthly sales trends with running totals and growth rates
    """)
    # → Generates: SELECT month, sales, 
    # SUM(sales) OVER (ORDER BY month) as running_total,
    # LAG(sales) OVER (ORDER BY month) as prev_month,
    # (sales - LAG(sales) OVER (ORDER BY month)) / LAG(sales) OVER (ORDER BY month) * 100 as growth_rate
    # FROM monthly_sales ORDER BY month
    
    # Common Table Expressions (CTEs) for complex logic
    cohort = await agent.chat("""
    Analyze customer cohort retention using CTEs to track repeat purchases
    """)
    # → Generates complex CTE-based cohort analysis
    
    # Statistical analysis and correlations
    stats = await agent.chat("find correlations between price, quantity, and customer satisfaction")
    outliers = await agent.chat("identify outliers in sales data using statistical methods")
    seasonality = await agent.chat("analyze seasonal patterns in sales with time series functions")
    
    # Advanced aggregations with HAVING clauses
    segments = await agent.chat("""
    Group customers by purchase behavior and find high-value segments
    """)
    # → Generates: SELECT segment, COUNT(*) as customers, AVG(total_spent) as avg_spent
    # FROM customer_segments GROUP BY segment HAVING AVG(total_spent) > 1000
    
    # UNION operations for combining datasets
    combined = await agent.chat("combine Q1 and Q2 sales data and analyze trends")
    
    await agent.cleanup_session()

Database to Analytics Workflow - Best of Both Worlds

async def complete_data_workflow():
    """Combining DatabaseAgent exploration with AnalyticsAgent advanced analysis"""
    from ambivo_agents import ModeratorAgent
    
    # Use ModeratorAgent for automatic routing
    moderator = ModeratorAgent.create_simple(
        user_id="workflow_user",
        enabled_agents=["database_agent", "analytics", "assistant"]
    )
    
    # Step 1: DatabaseAgent - Connect and explore (automatic routing)
    await moderator.chat("Connect to MySQL localhost:3306 database ecommerce user admin password secret")
    schema = await moderator.chat("show me the database schema and table relationships")
    
    # Step 2: DatabaseAgent - Export data for complex analysis 
    await moderator.chat("export sales data joined with customer data for advanced analytics")
    
    # Step 3: AnalyticsAgent - Advanced analysis (automatic routing)
    analysis = await moderator.chat("""
    Analyze the exported sales data:
    1. Calculate customer lifetime value using window functions
    2. Identify seasonal trends with time series analysis 
    3. Find correlations between customer demographics and purchase behavior
    4. Create customer segmentation using statistical clustering
    """)
    
    # Step 4: AnalyticsAgent - Generate insights and recommendations
    insights = await moderator.chat("create executive summary with key insights and recommendations")
    
    await moderator.cleanup_session()

Feature Comparison Summary

Capability DatabaseAgent AnalyticsAgent
Database Connections MySQL, PostgreSQL, MongoDB File-based only
Schema Discovery Full database exploration File schema analysis
Simple Queries Basic SELECT, COUNT, etc. All SQL operations
Complex JOINs Safety-limited Full JOIN support
Window Functions Not supported Complete support
CTEs & Subqueries Not supported Advanced SQL
Statistical Analysis Basic only Advanced statistics
Multi-File Analysis Single connection Load multiple files
File Ingestion Direct to database In-memory processing
Best Use Case Database exploration & connection Complex analysis & business intelligence

File Reading and Database Ingestion

All agents have built-in file reading capabilities for JSON, CSV, XML, and YAML files. Database insertion requires the optional database package.

from ambivo_agents import AssistantAgent

async def read_file_and_insert_to_database():
    """Reads a JSON file and attempts database insertion with graceful fallback"""
    
    # Step 1: Read and parse file (always available)
    agent = AssistantAgent.create_simple(user_id="file_user")
    
    result = await agent.read_and_parse_file("./data/users.json")
    if not result['success']:
        print(f" Failed to read file: {result.get('error', 'Unknown error')}")
        await agent.cleanup_session()
        return
    
    json_data = result['parse_result']['data']
    print(f" Successfully loaded {len(json_data)} records from users.json")
    
    # Step 2: Attempt database insertion
    try:
        from ambivo_agents import DatabaseAgent
        
        # DatabaseAgent is available - proceed with insertion
        db_agent = DatabaseAgent.create_simple(user_id="db_user")
        
        # Connect to MongoDB
        await db_agent.chat("connect to mongodb://localhost:27017 database myapp")
        
        # Insert the data
        response = await db_agent.chat(f"insert this data into users collection: {json_data}")
        print(f" Successfully inserted data into MongoDB: {response}")
        
        await db_agent.cleanup_session()
        
    except ImportError:
        # DatabaseAgent not available - provide polite warning and alternatives
        print("\n Database insertion not available")
        print(" To enable database features: pip install ambivo-agents[database]")
        print("\n Available alternatives:")
        print(" • File successfully read and parsed")
        print(" • Data can be transformed to other formats")
        
        # Show what we can still do
        csv_result = await agent.convert_json_to_csv(json_data)
        if csv_result['success']:
            print(" • Converted to CSV format (available for export)")
    
    await agent.cleanup_session()

# Alternative: Natural language approach with graceful handling
async def natural_language_file_ingestion():
    """Uses natural language commands with automatic fallback"""
    
    try:
        from ambivo_agents import DatabaseAgent
        agent = DatabaseAgent.create_simple(user_id="user")
        
        # Full database workflow available
        await agent.chat("connect to mongodb://localhost:27017 database myapp")
        response = await agent.chat("read users.json file and insert all records into users collection")
        print(f" Database ingestion completed: {response}")
        
        await agent.cleanup_session()
        
    except ImportError:
        # Fallback to file reading only
        from ambivo_agents import AssistantAgent
        agent = AssistantAgent.create_simple(user_id="user")
        
        print(" DatabaseAgent not installed. Reading file only...")
        response = await agent.chat("read and analyze the users.json file structure")
        print(f" File analysis: {response}")
        print(" Install database support with: pip install ambivo-agents[database]")
        
        await agent.cleanup_session()

Data Analytics

from ambivo_agents import AnalyticsAgent

async def analytics_demo():
    agent, context = AnalyticsAgent.create(user_id="analyst_user")
    
    # Load and analyze CSV data
    response = await agent.chat("load data from sales.csv and analyze it")
    print(f"Analysis: {response}")
    
    # Schema exploration
    schema = await agent.chat("show me the schema of the current dataset")
    print(f"Schema: {schema}")
    
    # Natural language queries
    top_sales = await agent.chat("what are the top 5 products by sales?")
    print(f"Top Sales: {top_sales}")
    
    # SQL queries
    sql_result = await agent.chat("SELECT region, SUM(sales) as total FROM data GROUP BY region")
    print(f"SQL Result: {sql_result}")
    
    # Visualizations
    chart = await agent.chat("create a bar chart showing sales by region")
    print(f"Chart: {chart}")
    
    await agent.cleanup_session()

# Context Preservation Example
async def context_preservation_demo():
    """Demonstrates context/state preservation between chat messages"""
    agent = AnalyticsAgent.create_simple(user_id="user123")
    
    try:
        # Load data once
        await agent.chat("load data from transactions.xlsx and analyze it")
        
        # Multiple queries without reload - uses cached context
        schema = await agent.chat("show schema") # Uses cached data
        top_items = await agent.chat("what are the top 5 amounts?") # Uses cached data
        summary = await agent.chat("summary statistics") # Uses cached data
        counts = await agent.chat("count by category") # Uses cached data
        
        print("All queries executed using cached dataset - no reload needed!")
        
    finally:
        await agent.cleanup_session() # Clean up resources

Knowledge Base Operations

from ambivo_agents import KnowledgeBaseAgent

async def knowledge_base_demo():
    agent, context = KnowledgeBaseAgent.create(user_id="kb_user")
    
    # Ingest document
    result = await agent._ingest_document(
        kb_name="company_kb",
        doc_path="/path/to/document.pdf",
        custom_meta={"department": "HR", "type": "policy"}
    )
    
    if result['success']:
        # Query the knowledge base
        answer = await agent._query_knowledge_base(
            kb_name="company_kb",
            query="What is the remote work policy?"
        )
        
        if answer['success']:
            print(f"Answer: {answer['answer']}")
    
    await agent.cleanup_session()

API Integration

from ambivo_agents import APIAgent
from ambivo_agents.agents.api_agent import APIRequest, AuthConfig, HTTPMethod, AuthType

async def api_integration_demo():
    agent, context = APIAgent.create(user_id="api_user")
    
    # Basic GET request
    request = APIRequest(
        url="https://jsonplaceholder.typicode.com/posts/1",
        method=HTTPMethod.GET
    )
    
    response = await agent.make_api_request(request)
    if not response.error:
        print(f"Status: {response.status_code}")
        print(f"Data: {response.json_data}")
    
    # POST with authentication
    auth_config = AuthConfig(
        auth_type=AuthType.BEARER,
        token="your-api-token"
    )
    
    post_request = APIRequest(
        url="https://api.example.com/data",
        method=HTTPMethod.POST,
        auth_config=auth_config,
        json_data={"name": "test", "value": "123"}
    )
    
    post_response = await agent.make_api_request(post_request)
    
    # Google OAuth2 with pre-fetch
    google_auth = AuthConfig(
        auth_type=AuthType.BEARER,
        pre_auth_url="https://www.googleapis.com/oauth2/v4/token",
        pre_auth_method=HTTPMethod.POST,
        pre_auth_payload={
            "client_id": "your-client-id",
            "client_secret": "your-secret",
            "refresh_token": "your-refresh-token",
            "grant_type": "refresh_token"
        },
        token_path="access_token"
    )
    
    sheets_request = APIRequest(
        url="https://sheets.googleapis.com/v4/spreadsheets/sheet-id/values/A1:B10",
        method=HTTPMethod.GET,
        auth_config=google_auth
    )
    
    # APIAgent will automatically fetch access_token first, then make the request
    sheets_response = await agent.make_api_request(sheets_request)
    
    await agent.cleanup_session()

# Conversational API usage
async def conversational_api():
    agent = APIAgent.create_simple(user_id="chat_user")
    
    # Use natural language for API requests
    response = await agent.chat("GET https://jsonplaceholder.typicode.com/users/1")
    print(response)
    
    response = await agent.chat(
        "POST https://api.example.com/data with headers: {\"Authorization\": \"Bearer token\"} "
        "and data: {\"message\": \"Hello API\"}"
    )
    print(response)
    
    await agent.cleanup_session()

Context Manager Pattern

from ambivo_agents import ModeratorAgent, AgentSession
import asyncio

async def main():
    # Auto-cleanup with context manager
    async with AgentSession(ModeratorAgent, user_id="sarah") as moderator:
        response = await moderator.chat("Download audio from https://youtube.com/watch?v=example")
        print(response)
    # Moderator automatically cleaned up

asyncio.run(main())

Workflow Examples

from ambivo_agents.core.workflow import WorkflowBuilder

async def custom_workflow():
    # Create agents
    moderator, context = ModeratorAgent.create(user_id="workflow_demo")
    
    # Build custom workflow
    builder = WorkflowBuilder()
    builder.add_agent(moderator.specialized_agents['web_search'], "search")
    builder.add_agent(moderator.specialized_agents['assistant'], "analyze")
    builder.add_edge("search", "analyze")
    builder.set_start_node("search")
    builder.set_end_node("analyze")
    
    workflow = builder.build()
    
    # Execute workflow
    result = await workflow.execute(
        "Research AI safety and provide analysis",
        context.to_execution_context()
    )
    
    print(f"Workflow result: {result.success}")
    await moderator.cleanup_session()

Streaming System

The library uses an SSE-aligned StreamChunk system with structured event types for real-time streaming responses.

SSE Event Types

Since v1.4.1, streaming events align with the Server-Sent Events wire format:

from ambivo_agents.core.base import SSEEventType, StreamSubType

# SSE event types (used in StreamChunk.to_dict() as the "type" field):
SSEEventType.START # "stream_start" - Stream initialized
SSEEventType.CONTENT # "stream_chunk" - Text token (append to buffer)
SSEEventType.STATUS # "stream_status" - Progress info (do NOT show in chat)
SSEEventType.COMPLETE # "stream_complete" - Final answer + metadata
SSEEventType.ERROR # "stream_error" - Error event
SSEEventType.CANCELLED # "stream_cancelled" - Stream was cancelled
SSEEventType.KEEPALIVE # "keepalive" - Connection keep-alive (ignore)

# Legacy StreamSubType (still supported for backward compatibility):
StreamSubType.CONTENT # Maps to SSEEventType.CONTENT
StreamSubType.STATUS # Maps to SSEEventType.STATUS
StreamSubType.RESULT # Maps to SSEEventType.COMPLETE
StreamSubType.ERROR # Maps to SSEEventType.ERROR
StreamSubType.METADATA # Maps to SSEEventType.STATUS

Basic Streaming Usage

from ambivo_agents import ModeratorAgent
from ambivo_agents.core.base import StreamSubType

async def streaming_example():
    moderator, context = ModeratorAgent.create(user_id="stream_user")

    async for chunk in moderator.chat_stream("Search for Python tutorials"):
        if chunk.sub_type == StreamSubType.CONTENT:
            print(chunk.text, end='', flush=True)
        elif chunk.sub_type == StreamSubType.STATUS:
            print(f"\n[{chunk.text.strip()}]", end='', flush=True)
        elif chunk.sub_type == StreamSubType.ERROR:
            print(f"\n[ERROR: {chunk.text}]", end='', flush=True)

    await moderator.cleanup_session()

Streaming in Web APIs (SSE Format)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    moderator, context = ModeratorAgent.create(user_id=request.user_id)

    async def generate_stream():
        # Send session event
        yield f"event: session\ndata: {context.session_id}\n\n"

        async for chunk in moderator.chat_stream(request.message):
            # to_dict() returns SSE-aligned type field
            yield f"data: {json.dumps(chunk.to_dict())}\n\n"

        yield "event: done\ndata:\n\n"

    return StreamingResponse(generate_stream(), media_type="text/event-stream")

SSE Wire Format

The StreamChunk.to_dict() method produces JSON matching the SSE protocol:

{"type": "stream_start", "text": "", "sub_type": "status", "metadata": {}, "timestamp": "..."}
{"type": "stream_chunk", "text": "The answer is", "sub_type": "content", "metadata": {}, "timestamp": "..."}
{"type": "stream_status", "text": "Querying knowledge base...", "sub_type": "status", "metadata": {}, "timestamp": "..."}
{"type": "stream_complete", "text": "", "sub_type": "result", "metadata": {"complete": true}, "timestamp": "..."}
{"type": "stream_error", "text": "Error message", "sub_type": "error", "metadata": {}, "timestamp": "..."}

Critical buffering rule: buffer += data.text -- never trim, strip, or add spaces between chunks.

Frontend Integration

async function streamChat(baseUrl, message, sessionId, callbacks) {
  const response = await fetch(`${baseUrl}/chat/stream`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message, session_id: sessionId }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let partial = '';
  let textBuffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    partial += decoder.decode(value, { stream: true });
    const events = partial.split('\n\n');
    partial = events.pop();

    for (const block of events) {
      const lines = block.split('\n');
      let eventName = null, dataLine = null;

      for (const line of lines) {
        if (line.startsWith('event: ')) eventName = line.slice(7).trim();
        else if (line.startsWith('data: ')) dataLine = line.slice(6);
      }

      if (eventName === 'session') { callbacks.onSession?.(dataLine.trim()); continue; }
      if (eventName === 'done') return;
      if (!dataLine) continue;

      const data = JSON.parse(dataLine);
      switch (data.type) {
        case 'stream_chunk': textBuffer += data.text; callbacks.onChunk?.(textBuffer, data.text); break;
        case 'stream_complete': callbacks.onComplete?.(data.text || textBuffer, data.metadata); break;
        case 'stream_error': callbacks.onError?.(data.text); break;
        case 'stream_status': callbacks.onStatus?.(data.text); break;
      }
    }
  }
}

Session Management

Understanding Session vs Conversation IDs

The library uses two identifiers for context management:

  • session_id: Represents a broader user session or application context
  • conversation_id: Represents a specific conversation thread within a session
# Single conversation (most common)
moderator, context = ModeratorAgent.create(
    user_id="john",
    session_id="user_john_main", 
    conversation_id="user_john_main" # Same as session_id
)

# Multiple conversations per session
session_key = "user_john_tenant_abc"

# Conversation 1: Data Analysis
moderator1, context1 = ModeratorAgent.create(
    user_id="john",
    session_id=session_key,
    conversation_id="john_data_analysis_conv"
)

# Conversation 2: YouTube Downloads 
moderator2, context2 = ModeratorAgent.create(
    user_id="john", 
    session_id=session_key,
    conversation_id="john_youtube_downloads_conv"
)

Web API Integration

from ambivo_agents import ModeratorAgent
import asyncio
import time

class ChatAPI:
    def __init__(self):
        self.active_moderators = {}
    
    async def chat_endpoint(self, request_data):
        user_message = request_data.get('message', '')
        user_id = request_data.get('user_id', f"user_{uuid.uuid4()}")
        session_id = request_data.get('session_id', f"session_{user_id}_{int(time.time())}")
        
        try:
            if session_id not in self.active_moderators:
                moderator, context = ModeratorAgent.create(
                    user_id=user_id,
                    session_id=session_id
                )
                self.active_moderators[session_id] = moderator
            else:
                moderator = self.active_moderators[session_id]
            
            response_content = await moderator.chat(user_message)
            
            return {
                'success': True,
                'response': response_content,
                'session_id': session_id,
                'timestamp': time.time()
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'timestamp': time.time()
            }
    
    async def cleanup_session(self, session_id):
        if session_id in self.active_moderators:
            await self.active_moderators[session_id].cleanup_session()
            del self.active_moderators[session_id]

Command Line Interface

# Interactive mode with auto-routing
ambivo-agents

# Single queries
ambivo-agents -q "Download audio from https://youtube.com/watch?v=example"
ambivo-agents -q "Search for latest AI trends"
ambivo-agents -q "Load data from sales.csv and show top products"
ambivo-agents -q "Extract audio from video.mp4"

# Check agent status
ambivo-agents status

# Test all agents
ambivo-agents --test

# Debug mode
ambivo-agents --debug -q "test query"

Architecture

ModeratorAgent Architecture

The ModeratorAgent acts as an intelligent orchestrator:

[User Query] 
     ↓
[ModeratorAgent] ← Analyzes intent and context
     ↓
[Intent Analysis] ← Uses LLM + patterns + keywords
     ↓
[Route Selection] ← Chooses best agent(s)
     ↓
[Specialized Agent] ← YouTubeAgent, SearchAgent, etc.
     ↓
[Response] ← Combined and contextualized
     ↓
[User]

Workflow Architecture

[WorkflowBuilder] → [Workflow Definition]
        ↓ ↓
[Workflow Executor] → [Sequential/Parallel Execution]
        ↓ ↓
[State Management] → [Persistent Checkpoints]
        ↓ ↓
[Result Aggregation] → [Final Response]

Memory System

  • Redis-based persistence with compression and caching
  • Built-in conversation history for every agent
  • Session-aware context with automatic cleanup
  • Multi-session support with isolation

LLM Provider Management

  • Automatic failover between OpenAI, Anthropic, AWS Bedrock
  • Rate limiting and error handling
  • Provider rotation based on availability and performance

Docker Setup

Consolidated Docker File Sharing System

Ambivo Agents uses a consolidated Docker-shared directory structure for all file operations across agents. This provides consistent, secure, and efficient file sharing between your host filesystem and Docker containers.

Consolidated Directory Structure

All Docker-based agents (AnalyticsAgent, MediaEditorAgent, CodeExecutorAgent, WebScraperAgent, APIAgent) use the same base structure:

Your Project Directory/
 docker_shared/ # Consolidated base directory
     input/ # Read-only input files
        analytics/ → /docker_shared/input/analytics (AnalyticsAgent)
        media/ → /docker_shared/input/media (MediaEditorAgent)
        code/ → /docker_shared/input/code (CodeExecutorAgent)
        scraper/ → /docker_shared/input/scraper (WebScraperAgent)
     output/ # Read-write output files
        analytics/ → /docker_shared/output/analytics (Analysis results)
        media/ → /docker_shared/output/media (Processed media)
        code/ → /docker_shared/output/code (Code execution results)
        scraper/ → /docker_shared/output/scraper (Scraped data)
     temp/ # Read-write temporary workspace
        analytics/ → /docker_shared/temp/analytics (Analytics temp files)
        media/ → /docker_shared/temp/media (Media processing temp)
        code/ → /docker_shared/temp/code (Code execution temp)
     handoff/ # Read-write inter-agent file sharing
        database/ → /docker_shared/handoff/database (Database exports)
        analytics/ → /docker_shared/handoff/analytics (Analytics results)
        media/ → /docker_shared/handoff/media (Media for processing)
     work/ → /docker_shared/work # General workspace

How the System Works

When you request operations like "convert data.csv to xlsx" or "process video.mp4":

  1. File Detection: System detects file paths in your request
  2. Directory Setup: Auto-creates agent-specific subdirectories in ./docker_shared/
  3. File Copying: Copies your files to appropriate input directories
  4. Docker Execution: Runs containers with consistent /docker_shared/ mount points
  5. Result Retrieval: Outputs appear in agent-specific output directories

Inter-Agent Workflows

The consolidated structure enables seamless workflows between agents:

Database → Analytics Workflow:

1. DatabaseAgent exports data → ./docker_shared/handoff/database/export.csv
2. AnalyticsAgent automatically → reads from /docker_shared/handoff/database/
3. AnalyticsAgent processes data → outputs to /docker_shared/output/analytics/
4. Results available at → ./docker_shared/output/analytics/results.json

Agent Handoff Mechanism

Output vs Handoff Directory Logic:

The system uses two distinct destination directories based on the agent's role in the workflow:

  • output/ directories: For final deliverables - when an agent produces end results for user consumption

    • MediaEditorAgent image/video processing → docker_shared/output/media/
    • AnalyticsAgent charts and reports → docker_shared/output/analytics/
    • CodeExecutorAgent script results → docker_shared/output/code/
  • handoff/ directories: For inter-agent communication - when one agent needs to pass data to another agent

    • DatabaseAgent exports for AnalyticsAgent → docker_shared/handoff/database/
    • WebScraperAgent data for KnowledgeBaseAgent → docker_shared/handoff/scraper/
    • MediaEditorAgent processed files for further processing → docker_shared/handoff/media/

Decision Logic:

  • Terminal operations (user-requested, final results) → output/
  • Workflow operations (agent-to-agent data passing) → handoff/

The handoff system uses the handoff_subdir parameter to enable seamless file transfers between agents:

DatabaseAgent → AnalyticsAgent Handoff:

# DatabaseAgent automatically exports to handoff directory
result = await db_agent.chat("export sales data for analytics", 
                            handoff_subdir="sales_analysis_2024")
# Creates: ./docker_shared/handoff/database/sales_analysis_2024/

# AnalyticsAgent automatically detects handoff files
analytics_result = await analytics_agent.chat("analyze sales data",
                                              handoff_subdir="sales_analysis_2024")
# Reads from: ./docker_shared/handoff/database/sales_analysis_2024/

Handoff Directory Management:

  • Automatic creation: Subdirectories created based on handoff_subdir parameter
  • File naming: {agent_type}_{timestamp}_{operation}.{ext}
  • Cleanup: Handoff files older than 24 hours automatically removed
  • Thread-safe: Multiple concurrent handoffs supported
  • Cross-platform: Works consistently across Windows, macOS, and Linux

Configuration in agent_config.yaml:

docker:
  shared_base_dir: "./docker_shared"

  agent_subdirs:
    database: ["handoff/database"]
    analytics: ["input/analytics", "output/analytics", "temp/analytics", "handoff/analytics"]
    media: ["input/media", "output/media", "temp/media", "handoff/media"]

Enhanced Fallback (CSV→XLSX Conversion):

1. User: "convert sales.csv to xlsx"
2. ModeratorAgent detects file operation need
3. Copies sales.csv → ./docker_shared/input/code/sales.csv
4. CodeExecutorAgent processes → from /docker_shared/input/code/sales.csv
5. Outputs converted file → to /docker_shared/output/code/sales.xlsx
6. User accesses result at → ./docker_shared/output/code/sales.xlsx

Media Processing Workflow:

1. User places video → ./docker_shared/input/media/input.mp4
2. MediaEditorAgent processes → from /docker_shared/input/media/input.mp4
3. Outputs processed file → to /docker_shared/output/media/output.mp3
4. User gets result from → ./docker_shared/output/media/output.mp3

Third-Party Developer Integration

For developers building custom agents:

from ambivo_agents.core import get_shared_manager

# Get the consolidated shared manager
shared_manager = get_shared_manager()

# Prepare environment for your custom agent
input_path, output_path = shared_manager.prepare_agent_environment(
    agent="my_custom_agent",
    input_files=["./my_data.csv"]
)

# Get Docker volume configuration
volumes = shared_manager.get_docker_volumes()
# volumes = {
# '/path/to/docker_shared/input': {'bind': '/docker_shared/input', 'mode': 'ro'},
# '/path/to/docker_shared/output': {'bind': '/docker_shared/output', 'mode': 'rw'},
# # ... other mounts
# }

# In your Docker container, access files at:
# - Input: /docker_shared/input/my_custom_agent/
# - Output: /docker_shared/output/my_custom_agent/
# - Temp: /docker_shared/temp/my_custom_agent/
# - Handoff: /docker_shared/handoff/my_custom_agent/

# After processing, check results:
output_files = shared_manager.list_outputs("my_custom_agent")
latest_output = shared_manager.get_latest_output("my_custom_agent", ".xlsx")

Example Usage

import asyncio
from ambivo_agents import ModeratorAgent

async def process_files_with_consolidated_structure():
    # Create moderator with auto-routing
    moderator, context = ModeratorAgent.create(user_id="file_processor")
    
    # File operations use consolidated Docker structure
    await moderator.chat("convert sales_data.csv to xlsx format") # → ./docker_shared/output/code/
    await moderator.chat("extract audio from video.mp4 as MP3") # → ./docker_shared/output/media/
    await moderator.chat("analyze customer_data.csv and chart") # → ./docker_shared/output/analytics/
    
    # All results organized by agent type in docker_shared/output/
    await moderator.cleanup_session()

# Run the example
asyncio.run(process_files_with_consolidated_structure())

File Locations After Operations

# Directory structure after various operations
your-project/
 sales_data.csv # Your original files
 video.mp4
 customer_data.csv
 docker_shared/ # Consolidated results
     output/
         code/
            sales_data.xlsx # CSV→XLSX conversion
         media/
            video_audio.mp3 # Audio extraction
         analytics/
             analysis_report.json # Data analysis
             customer_charts.png # Generated charts

Configuration

The consolidated structure is configured in agent_config.yaml:

docker:
  shared_base_dir: "./docker_shared"  # Host base directory
  work_dir: "/opt/ambivo/work_dir"    # Container working directory
  agent_subdirs:
    analytics: ["input/analytics", "output/analytics", "temp/analytics", "handoff/analytics"]
    media: ["input/media", "output/media", "temp/media", "handoff/media"]
    code: ["input/code", "output/code", "temp/code", "handoff/code"]
    database: ["handoff/database"]
    scraper: ["output/scraper", "temp/scraper", "handoff/scraper"]

Third-Party Developer Project Structure

When developers install ambivo-agents via pip install ambivo-agents, the Docker shared directory is created relative to their project root. Here's how the directory structure would look:

my-ai-project/ # Third-party developer's project
 main.py # Their application code
 requirements.txt # Including ambivo-agents
 agent_config.yaml # Their configuration file
 data/ # Their project data
    input_files.csv
    documents.pdf
 docker_shared/ # Auto-created by ambivo-agents
    input/ # Container read-only mounts
       analytics/ # For data analysis tasks
          uploaded_data.csv
       media/ # For media processing
          video_to_process.mp4
       code/ # For code execution
           user_script.py
    output/ # Container write-enabled results
       analytics/ # Analysis results
          report.json
          charts.png
       media/ # Processed media
          audio_extracted.mp3
          compressed_video.mp4
       code/ # Code execution results
           execution_results.txt
    temp/ # Temporary files during processing
       analytics/
       media/
       code/
    handoff/ # Cross-agent file sharing
       analytics/ # Database → Analytics
       database/ # Database exports
       media/ # Media processing handoffs
       scraper/ # Web scraper results
    work/ # Container workspace
 venv/ # Their virtual environment
     lib/python3.x/site-packages/
         ambivo_agents/ # Installed package

Environment Variable Configuration:

Developers can customize the shared directory location:

# In their .env or environment
export AMBIVO_AGENTS_DOCKER_SHARED_BASE_DIR="/custom/path/shared"

Example Usage in Developer's Code:

# my-ai-project/main.py
from ambivo_agents.agents.analytics import AnalyticsAgent
from ambivo_agents.agents.moderator import ModeratorAgent

# Create agents - they automatically use configured shared directory
moderator = ModeratorAgent.create_simple(user_id="developer123")

# Process data - files are managed in docker_shared/
response = await moderator.chat("analyze the sales data in my CSV file")

# The docker_shared/ directory is automatically created and managed
# Input files are accessible at docker_shared/input/analytics/
# Results appear in docker_shared/output/analytics/

Benefits for Third-Party Developers:

  • Isolated: Each project gets its own docker_shared/ directory
  • Portable: Directory structure is relative to project root
  • Configurable: Can be customized via environment variables
  • Auto-managed: Created and organized automatically
  • Secure: Container access is properly restricted

Security & Permissions

  • Input Security: All input directories mounted read-only (ro)
  • Output Isolation: Each agent has isolated output directories
  • Network Isolation: Docker containers run with network_disabled=True
  • Memory Limits: Configurable memory restrictions per agent
  • Auto-Cleanup: Temporary files cleaned based on age (configurable)
  • Permission Control: Directory permissions managed automatically

File Access Security Configuration

Restricted Directories Protection:

The system includes built-in protection against accessing sensitive system directories:

# agent_config.yaml
security:
  file_access:
    restricted_directories:
      - "/etc" # System configuration
      - "/root" # Root user directory
      - "/var/log" # System logs
      - "/proc" # Process information
      - "/sys" # System information
      - "/dev" # Device files
      - "/boot" # Boot files
      - "~/.ssh" # SSH keys
      - "~/.aws" # AWS credentials
      - "~/.config" # User configuration
      - "/usr/bin" # System binaries
      - "/usr/sbin" # System admin binaries

Environment Variable Configuration:

# Alternative to YAML configuration
export AMBIVO_AGENTS_FILE_ACCESS_RESTRICTED_DIRS="/etc,/var/log,/sys,/proc,/dev"

How Restricted Directories Work:

  • Path Resolution: Uses Path.expanduser().resolve() for proper path handling
  • Security by Default: Common sensitive directories blocked by default
  • Symbolic Link Protection: Resolves symbolic links to prevent bypass attempts
  • Cross-Platform: Works on Windows, macOS, and Linux
  • Agent Coverage: Protects both BaseAgent.read_file() and DatabaseAgent.ingest_file_to_mongodb()

Example Usage:

from ambivo_agents import DatabaseAgent

# This will be blocked by default security settings
result = await db_agent.chat("ingest data from /etc/passwd")
# Returns: {"success": False, "error": "Access denied: File path '/etc/passwd' is in a restricted directory"}

# This works normally (assuming file exists)
result = await db_agent.chat("ingest data from ./data/users.csv")
# Returns: {"success": True, ...}

Security Best Practices:

  • Always use the default restricted directories in production
  • Add custom restricted paths for your specific environment
  • Test security settings before deployment
  • Monitor access attempts to restricted directories
  • Regular audits of file access patterns

Monitoring & Maintenance

from ambivo_agents.core import get_shared_manager

shared_manager = get_shared_manager()

# Monitor disk usage
usage = shared_manager.get_disk_usage()
print(f"Total usage: {usage['total_bytes'] / (1024**3):.2f} GB")

# Cleanup old temporary files
cleaned_count = shared_manager.cleanup_temp_files(max_age_hours=24)
print(f"Cleaned {cleaned_count} temporary files")

# List outputs for specific agent
output_files = shared_manager.list_outputs("analytics")
print(f"Analytics outputs: {output_files}")

Supported File Types & Detection

The system automatically detects file paths in natural language and supports:

Data Files: .csv, .xlsx, .xls, .json, .xml, .parquet Media Files: .mp4, .avi, .mov, .mp3, .wav, .flac Text Files: .txt, .md, .log, .py, .js, .sql Documents: .pdf (read-only)

# These requests automatically trigger file sharing:
"convert data.csv to xlsx" → Detects: data.csv → ./docker_shared/input/code/
"extract audio from video.mp4" → Detects: video.mp4 → ./docker_shared/input/media/
"analyze quarterly_report.xlsx" → Detects: quarterly_report.xlsx → ./docker_shared/input/analytics/
"scrape data from website" → No file detected → ./docker_shared/output/scraper/

Docker Image Configuration

Default Image: sgosain/amb-ubuntu-python-public-pod

Custom Docker Image for Consolidated Structure:

FROM sgosain/amb-ubuntu-python-public-pod

# Install additional packages for your use case
RUN pip install openpyxl xlsxwriter plotly seaborn

# Create consolidated mount points
RUN mkdir -p /docker_shared/{input,output,temp,handoff,work}

# Add custom scripts that work with consolidated structure
COPY your-scripts/ /opt/scripts/

# Set working directory
WORKDIR /docker_shared/work

# Example script that uses consolidated paths
RUN echo '#!/bin/bash\n\
echo "Input files: $(ls -la /docker_shared/input/)"\n\
echo "Output directory: /docker_shared/output/"\n\
echo "Temp directory: /docker_shared/temp/"\n\
echo "Handoff directory: /docker_shared/handoff/"' > /opt/scripts/show_structure.sh

RUN chmod +x /opt/scripts/show_structure.sh

Troubleshooting

Directory Issues:

# Check if docker_shared structure exists
ls -la docker_shared/

# Verify agent subdirectories
ls -la docker_shared/output/

File Access Issues:

# Check permissions
chmod 755 docker_shared/
find docker_shared/ -type d -exec chmod 755 {} \;

# Verify Docker can access the directory
docker run --rm -v $(pwd)/docker_shared:/docker_shared alpine ls -la /docker_shared

Volume Mount Issues:

# Test consolidated volume mounting
docker run --rm \
  -v $(pwd)/docker_shared/input:/docker_shared/input:ro \
  -v $(pwd)/docker_shared/output:/docker_shared/output:rw \
  alpine ls -la /docker_shared/

Benefits of Consolidated Structure

Consistency: All agents use the same directory structure Inter-Agent Workflows: Seamless file handoffs between agents Security: Proper read-only/read-write permissions Organization: Files organized by agent and purpose Monitoring: Centralized disk usage and cleanup Third-Party Integration: Easy for custom agent development Auto-Management: Directories created and managed automatically

Running Without Docker (Containerized Deployment)

When deploying ambivo_agents inside the sgosain/amb-ubuntu-python-public-pod Docker image (e.g., on Railway, Render, or similar platforms that don't support Docker-in-Docker), you can disable Docker executor spawning. The image already contains all required dependencies (ffmpeg, yt-dlp, playwright, pandas, duckdb, etc.).

Configuration

Via YAML (agent_config.yaml):

docker:
  use_docker: false
  shared_base_dir: "./docker_shared"  # still used for file organization
  # ... rest of docker config remains the same

Via Environment Variable:

export AMBIVO_AGENTS_DOCKER_USE_DOCKER=false

How It Works

Mode use_docker What Happens
Docker (default) true Agents spawn Docker containers using sgosain/amb-ubuntu-python-public-pod — requires a Docker daemon
Local false Agents use local subprocess execution (Python, Bash, ffmpeg, yt-dlp) — no Docker daemon needed

When use_docker: false, each agent uses a local executor:

Agent Docker Executor Local Executor Host Requirement
CodeExecutorAgent DockerCodeExecutor LocalCodeExecutor Python, Bash
MediaEditorAgent MediaDockerExecutor MediaLocalExecutor ffmpeg, ffprobe
YouTubeDownloadAgent YouTubeDockerExecutor YouTubeLocalExecutor yt-dlp
AnalyticsAgent DockerCodeExecutor LocalCodeExecutor Python, pandas
WebScraperAgent SimpleDockerExecutor Falls back to local playwright/requests playwright or requests

Important Notes

  • Default is true — existing deployments are completely unaffected.
  • This is intended for running inside the fat Docker image where all dependencies already exist, not for bare developer machines.
  • The docker_shared/ directory structure is used by both modes for consistent file organization and cross-agent handoffs.
  • The code_execution_policy safety layer applies to both Docker and local execution for LocalCodeExecutor.
  • Local executors raise clear errors if dependencies are missing (e.g., "ffmpeg not found", "yt-dlp not installed").

Typical Railway Deployment

# Use the fat image as base — all agent deps included
FROM sgosain/amb-ubuntu-python-public-pod:latest

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

# No Docker daemon needed
ENV AMBIVO_AGENTS_DOCKER_USE_DOCKER=false
CMD ["gunicorn", "your_app:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]

Troubleshooting

Common Issues

  1. Redis Connection Failed

    # Check if Redis is running
    redis-cli ping # Should return "PONG"
    
  2. Docker Not Available

    # Check Docker is running
    docker ps
    
  3. Agent Creation Errors

    from ambivo_agents import ModeratorAgent
    try:
        moderator, context = ModeratorAgent.create(user_id="test")
        print(f"Success: {context.session_id}")
        await moderator.cleanup_session()
    except Exception as e:
        print(f"Error: {e}")
    
  4. Import Errors

    python -c "from ambivo_agents import ModeratorAgent; print('Import success')"
    

Debug Mode

Enable verbose logging:

service:
  log_level: "DEBUG"
  log_to_file: true

Security Considerations

  • Docker Isolation: All code execution happens in isolated containers
  • Network Restrictions: Containers run with network_disabled=True by default
  • Resource Limits: Memory and CPU limits prevent resource exhaustion
  • API Key Management: Store sensitive keys in environment variables
  • Input Sanitization: All user inputs are validated and sanitized
  • Session Isolation: Each agent session is completely isolated

Contributing

We welcome contributions! Please see our Contributing Guidelines for details.

Development Setup

# Clone repository
git clone https://github.com/ambivo-corp/ambivo-agents.git
cd ambivo-agents

# Install in development mode (base only)
pip install -e .

# Install with all extras (recommended for local development)
pip uninstall ambivo-agents -y && pip install -e ".[all]"

# Test ModeratorAgent
python -c "
from ambivo_agents import ModeratorAgent
import asyncio

async def test():
    moderator, context = ModeratorAgent.create(user_id='test')
    response = await moderator.chat('Hello!')
    print(f'Response: {response}')
    await moderator.cleanup_session()

asyncio.run(test())
"

Publishing to PyPI

Build & Upload

# Clean previous builds
rm -rf dist/ build/

# Build — IMPORTANT: use setuptools < 75 to produce Metadata-Version 2.1.
# setuptools >= 75 generates Metadata-Version 2.4 which adds a `license-file`
# field that twine 6.x rejects ("unrecognized or malformed field 'license-file'")
# and twine 5.x rejects ("Metadata is missing required fields: Name, Version").
# Pinning setuptools below 75 avoids this entirely.
pip install 'setuptools>=61.0,<75' wheel
python -m build --no-isolation

# Verify metadata is 2.1 (not 2.4)
python -c "from pkginfo import Wheel; w = Wheel('dist/ambivo_agents-*.whl'); print(w.metadata_version)"
# Expected: 2.1

# Upload to PyPI
python -m twine upload dist/*

Tag & Push

# Tag the release
git tag -a v1.x.x -m "v1.x.x - Description"
git push origin main --tags

Why setuptools < 75?

setuptools >= 75 produces Metadata-Version: 2.4 which includes a license-file field. This field is not recognized by current versions of twine (both 5.x and 6.x), causing upload failures:

  • twine 5.x: InvalidDistribution: Metadata is missing required fields: Name, Version (pkginfo can't parse 2.4)
  • twine 6.x: InvalidDistribution: Invalid distribution metadata: unrecognized or malformed field 'license-file'

Building with setuptools < 75 produces Metadata-Version: 2.1 which is fully compatible. The --no-isolation flag ensures the locally installed setuptools is used instead of an auto-downloaded latest version.

License

MIT License - see LICENSE file for details.

Author

Hemant Gosain 'Sunny'

Support

Attributions & Third-Party Technologies

This project leverages several open-source libraries and commercial services:

Core Technologies

  • Docker: Container runtime for secure code execution
  • Redis: In-memory data store for session management
  • Python: Core programming language

AI/ML Frameworks

  • OpenAI: GPT models and API services
  • Anthropic: Claude models and API services
  • AWS Bedrock: Cloud-based AI model access
  • LangChain: Framework for building AI applications (by LangChain, Inc.)
  • LlamaIndex: Data framework for LLM applications (by Jerry Liu)
  • Hugging Face: Model hub and transformers library

Data Processing

  • pandas: Data analysis and manipulation library
  • DuckDB: In-process SQL OLAP database
  • Qdrant: Vector database for semantic search
  • tabulate: ASCII table formatting library

Media & Web

  • FFmpeg: Multimedia processing framework
  • YouTube: Video platform (via public APIs)
  • yt-dlp: YouTube video downloader library
  • Brave Search: Web search API service
  • Beautiful Soup: HTML/XML parsing library

Development Tools

  • pytest: Testing framework
  • black: Code formatting tool
  • Docker Hub: Container image repository

Legal Disclaimer

IMPORTANT: This software is provided "as is" without warranty of any kind. Users are responsible for:

  1. API Compliance: Ensuring compliance with all third-party service terms (OpenAI, Anthropic, AWS, YouTube, etc.)
  2. Data Privacy: Protecting user data and complying with applicable privacy laws
  3. Usage Limits: Respecting rate limits and usage policies of external services
  4. Security: Implementing appropriate security measures for production use
  5. Licensing: Ensuring compliance with all third-party library licenses

The authors and contributors are not liable for any damages arising from the use of this software. Users should thoroughly test and validate the software before production deployment.

Third-Party Services: This library integrates with external services that have their own terms of service, privacy policies, and usage limitations. Users must comply with all applicable terms.

Web Scraping & Content Access: Users must practice ethical web scraping by respecting robots.txt, rate limits, and website terms of service. YouTube content access must comply with YouTube's Terms of Service and API policies - downloading copyrighted content without permission is prohibited.


Developed by the Ambivo team.

Query Across Multiple Knowledge Bases

You can query multiple knowledge bases by passing kb_names via metadata on either the ExecutionContext or the AgentMessage. The agent accepts kb_names as a list of strings, a list of dicts ({kb_name, description}), or a JSON string.

from ambivo_agents.agents.knowledge_base import KnowledgeBaseAgent
from ambivo_agents.core.base import ExecutionContext, AgentMessage
import asyncio

async def demo():
    agent = KnowledgeBaseAgent()

    # 1) Via ExecutionContext.metadata — list of strings
    ctx = ExecutionContext(
        user_id="u",
        session_id="s",
        conversation_id="c",
        metadata={"kb_names": ["product_docs", "engineering_wiki"]},
    )
    resp = await agent.process_message("What changed in v2.0 API?", context=ctx)
    print(resp.content)
    print(resp.metadata) # includes used_kbs, primary_kb, sources_dict

    # 2) Via AgentMessage.metadata — list of dicts
    msg = AgentMessage(
        id="m1",
        sender_id="u",
        recipient_id=agent.agent_id,
        content="Summarize PTO policy.",
        metadata={
            "kb_names": [
                {"kb_name": "hr_policies", "description": "HR docs"},
                {"kb_name": "employee_handbook"},
            ]
        },
    )
    resp2 = await agent.process_message(msg)
    print(resp2.content)
    print(resp2.metadata)

asyncio.run(demo())

See examples/knowledge_base_multiple.py for a more complete example, including passing kb_names as a JSON string.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ambivo_agents-1.4.28.tar.gz (456.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ambivo_agents-1.4.28-py3-none-any.whl (418.2 kB view details)

Uploaded Python 3

File details

Details for the file ambivo_agents-1.4.28.tar.gz.

File metadata

  • Download URL: ambivo_agents-1.4.28.tar.gz
  • Upload date:
  • Size: 456.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for ambivo_agents-1.4.28.tar.gz
Algorithm Hash digest
SHA256 95c8d1616de06467c7dcfa781f101d991567cb610cbe5ea177d989a900f3f029
MD5 7e710dc3a46511e66efdb87a8926cd05
BLAKE2b-256 16e6e0a3a23d744c907749213575713643144dc3f8ee1c73ec2108690b7bc6ec

See more details on using hashes here.

File details

Details for the file ambivo_agents-1.4.28-py3-none-any.whl.

File metadata

  • Download URL: ambivo_agents-1.4.28-py3-none-any.whl
  • Upload date:
  • Size: 418.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for ambivo_agents-1.4.28-py3-none-any.whl
Algorithm Hash digest
SHA256 3951d1497f68d278f350183e2a717d1bbc466ff0a1cc51959540787df8ff5505
MD5 17bc04e0f5c379a90179c0d4a52ecc38
BLAKE2b-256 14515852c85455e43f40f5811f380b9b96930932dcc8fcfce235d1fb7cdaaa3e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page