Multi-Agent AI System for automation
Project description
Ambivo Agents - Multi-Agent AI System
A toolkit for AI-powered automation including data analytics with DuckDB, media processing, knowledge base operations, web scraping, YouTube downloads, HTTP/REST API integration, and more.
Alpha Release Disclaimer
This library is currently in alpha stage. While functional, it may contain bugs, undergo breaking changes, and lack complete documentation. Developers should thoroughly evaluate and test the library before considering it for production use.
For production scenarios, we recommend:
- Extensive testing in your specific environment
- Implementing proper error handling and monitoring
- Having rollback plans in place
- Staying updated with releases for critical fixes
Table of Contents
- Quick Start
- Agent Creation
- Features
- Available Agents
- Workflow System
- System Messages
- Prerequisites
- Installation
- Configuration
- Configuration Methods
- Project Structure
- Usage Examples
- Streaming System
- Session Management
- Web API Integration
- Command Line Interface
- Architecture
- Docker Setup
- Running Without Docker (Containerized Deployment)
- Troubleshooting
- Security Considerations
- Contributing
- License
- Author
- Support
Quick Start
ModeratorAgent Example
The ModeratorAgent automatically routes queries to specialized agents:
from ambivo_agents import ModeratorAgent
import asyncio
async def main():
# Create the moderator
moderator, context = ModeratorAgent.create(user_id="john")
print(f"Session: {context.session_id}")
# Auto-routing examples
response1 = await moderator.chat("Download audio from https://youtube.com/watch?v=example")
response2 = await moderator.chat("Search for latest AI trends")
response3 = await moderator.chat("Extract audio from video.mp4 as MP3")
response4 = await moderator.chat("GET https://api.github.com/users/octocat")
response5 = await moderator.chat("Load data from sales.csv and analyze it")
response6 = await moderator.chat("What is machine learning?")
# Check available agents
status = await moderator.get_agent_status()
print(f"Available agents: {list(status['active_agents'].keys())}")
# Cleanup
await moderator.cleanup_session()
asyncio.run(main())
KnowledgeSynthesisAgent Example
The KnowledgeSynthesisAgent combines multiple knowledge sources with quality assessment:
from ambivo_agents.agents.knowledge_synthesis import KnowledgeSynthesisAgent
from ambivo_agents.agents.response_quality_assessor import QualityLevel
import asyncio
async def main():
# Create synthesis agent directly (NOT through ModeratorAgent)
synthesis_agent, context = KnowledgeSynthesisAgent.create(
user_id="researcher",
quality_threshold=QualityLevel.GOOD,
max_iterations=3,
enable_auto_scraping=True
)
print(f"Session: {context.session_id}")
# Multi-source synthesis examples using process_with_quality_assessment
response1 = await synthesis_agent.process_with_quality_assessment(
"search across knowledge bases: kb1, kb2 for: AI trends"
)
response2 = await synthesis_agent.process_with_quality_assessment(
"prioritize web search - latest developments in robotics 2025"
)
response3 = await synthesis_agent.process_with_quality_assessment(
"comprehensive search - research blockchain technology"
)
# Check quality assessment
print(f"Quality: {response1.get('quality_assessment', {}).get('quality_level', 'unknown')}")
print(f"Sources: {response1.get('quality_assessment', {}).get('sources_used', [])}")
# Cleanup
await synthesis_agent.cleanup_session()
asyncio.run(main())
Simple KnowledgeSynthesisAgent with Specific Knowledge Bases
For targeted knowledge base queries with automatic collection detection:
from ambivo_agents.agents.knowledge_synthesis import KnowledgeSynthesisAgent
import asyncio
async def simple_example():
# Setup - Create agent and configure knowledge bases
agent, context = KnowledgeSynthesisAgent.create(user_id="user123")
agent.update_context_metadata(available_knowledge_bases=[
'research_trends_in_cryptocurrency_20250816_193439',
'research_trends_in_robotics_tech_2025_20250812_172007'
])
agent._load_available_collections() # Required after setting knowledge bases
try:
# Method 1: Full quality assessment (recommended)
result = await agent.process_with_quality_assessment(
"What are cryptocurrency trends?"
)
print(f"Response: {result['response']}")
print(f"Quality: {result['quality_assessment']['quality_level']}")
print(f"Sources: {result['quality_assessment']['sources_used']}")
# Method 2: Simple chat interface
response = await agent.chat("Tell me about robotics developments")
print(f"Chat response: {response}")
finally:
# Always cleanup
await agent.cleanup_session()
asyncio.run(simple_example())
Command Line Usage
# Install and run
pip install ambivo-agents
# Interactive mode
ambivo-agents
# Single commands
ambivo-agents -q "Download audio from https://youtube.com/watch?v=example"
ambivo-agents -q "Search for Python tutorials"
ambivo-agents -q "Load data from sales.csv and analyze it"
ambivo-agents -q "GET https://jsonplaceholder.typicode.com/posts/1"
Agent Creation
ModeratorAgent (Recommended)
from ambivo_agents import ModeratorAgent
# Create moderator with auto-routing
moderator, context = ModeratorAgent.create(user_id="john")
# Chat with automatic agent selection
result = await moderator.chat("Download audio from https://youtube.com/watch?v=example")
# Cleanup
await moderator.cleanup_session()
Use ModeratorAgent for:
- Multi-purpose applications
- Intelligent routing between capabilities
- Context-aware conversations
- Simplified development
Direct Agent Creation
from ambivo_agents import YouTubeDownloadAgent
# Create specific agent
agent, context = YouTubeDownloadAgent.create(user_id="john")
# Use agent directly
result = await agent._download_youtube_audio("https://youtube.com/watch?v=example")
# Cleanup
await agent.cleanup_session()
Use Direct Creation for:
- Single-purpose applications
- Specific workflows with known requirements
- Performance-critical applications
- Custom integrations
Features
Core Capabilities
- ModeratorAgent: Intelligent multi-agent orchestrator with automatic routing
- Smart Routing: Automatically routes queries to appropriate specialized agents
- Data Analytics: In-memory DuckDB integration with CSV/XLS ingestion and text-based visualizations
- File Ingestion & Processing: All agents can read/parse JSON, CSV, XML, YAML files and insert into databases
- Context Memory: Maintains conversation history across interactions
- Docker Integration: Secure, isolated execution environment
- Redis Memory: Persistent conversation memory with compression
- Multi-Provider LLM: Automatic failover between OpenAI, Anthropic, and AWS Bedrock
- Configuration-Driven: All features controlled via
agent_config.yaml - Workflow System: Multi-agent workflows with parallel and sequential execution
- System Messages: Customizable system prompts for agent behavior control
Available Agents
ModeratorAgent
Enhanced with Skill Assignment System
- Intelligent orchestrator that routes to specialized agents
- Context-aware multi-turn conversations
- Automatic agent selection based on query analysis
- Session management and cleanup
- Workflow execution and coordination
- NEW: Skill Assignment - Assign external capabilities that take priority over agent routing
- Smart Skill Routing - Assigned skills are checked first, then falls back to normal agent routing
- Unified Interface - Single agent that can handle both assigned skills and general orchestration
- Use Cases: Custom integrations with existing agent orchestration, priority skill handling
KnowledgeSynthesisAgent
Advanced Multi-Source Orchestrator with Quality Assessment
** IMPORTANT**: KnowledgeSynthesisAgent is NOT a specialized agent that gets routed to by ModeratorAgent. It IS an enhanced ModeratorAgent that you use directly as your primary orchestrator.
Key Capabilities:
- Multi-Source Synthesis - Intelligently combines knowledge bases, web search, and web scraping
- Quality Assessment - Built-in ResponseQualityAssessor evaluates and improves responses
- Adaptive Strategy - Automatically selects optimal search strategy based on query analysis
- Multiple Knowledge Base Support - Query across multiple knowledge bases simultaneously
- Web Search Integration - Supplements knowledge with real-time web search results
- Smart Web Scraping - Automatically scrapes relevant URLs for deeper information
- Iterative Improvement - Refines responses until quality threshold is met
- Intelligent Routing - Inherits all ModeratorAgent capabilities plus synthesis features
Architecture:
KnowledgeSynthesisAgent extends ModeratorAgent
Inherits: Agent routing, session management, workflows
Adds: Multi-source synthesis, quality assessment, intelligent orchestration
Key Differences:
| Feature | ModeratorAgent | KnowledgeSynthesisAgent |
|---|---|---|
| Purpose | Routes queries to specialized agents | Multi-source information synthesis |
| Response | Simple agent routing | Quality-assessed, synthesized responses |
| Sources | Single agent per query | Multiple sources per query |
| Quality Control | Basic agent responses | Built-in quality assessment & iteration |
| Use Case | General orchestration | Research & information synthesis |
| Usage Pattern | Router/orchestrator | Direct primary agent |
** Wrong Usage Pattern:**
# DON'T DO THIS - Won't work!
moderator = ModeratorAgent.create(...)
# Expecting to route to KnowledgeSynthesisAgent - this fails
response = await moderator.chat("synthesize information...") # Fails
** Correct Usage Pattern:**
# DO THIS - Works correctly!
from ambivo_agents.agents.knowledge_synthesis import KnowledgeSynthesisAgent
# Use KnowledgeSynthesisAgent directly as your orchestrator
synthesis_agent, context = KnowledgeSynthesisAgent.create(
user_id="user123",
quality_threshold=QualityLevel.GOOD,
max_iterations=3,
enable_auto_scraping=True
)
# Multi-knowledge base query
response = await synthesis_agent.process_with_quality_assessment(
"search across knowledge bases: kb1, kb2 for: AI trends"
)
# Web-supplemented query
response = await synthesis_agent.process_with_quality_assessment(
"prioritize web search - latest developments in robotics 2025"
)
# Comprehensive research
response = await synthesis_agent.process_with_quality_assessment(
"comprehensive search - research blockchain technology trends"
)
await synthesis_agent.cleanup_session()
Quality-Assessed Response Format:
{
'success': True,
'response': 'Synthesized answer from multiple sources...',
'quality_assessment': {
'quality_level': 'good', # excellent|good|fair|poor|unacceptable
'confidence_score': 0.85, # 0.0 - 1.0
'sources_used': ['knowledge_base', 'web_search'],
'strengths': ['Comprehensive coverage', 'Current information'],
'weaknesses': ['Could use more specific examples']
},
'query_analysis': {
'query_type': 'current_events',
'strategy_used': 'web_first',
'sources_consulted': 2
},
'metadata': {
'iterations': 1,
'total_sources_consulted': 2,
'user_preferences': {}
}
}
Query Patterns:
"search across knowledge bases: kb1, kb2, kb3 for: [query]"- Multi-KB search"prioritize web search - [query]"- Web-first strategy"check knowledge base first - [query]"- KB-first strategy"comprehensive search - [query]"- Use all sources"synthesize information about [topic]"- Full synthesis mode
** Use Cases:**
- Research across multiple knowledge bases
- Web-supplemented knowledge queries
- Quality-assured information synthesis
- Multi-source fact checking
- Comprehensive topic research
** Configuration:**
- Enabled by default (inherits from ModeratorAgent)
- Environment Control:
AMBIVO_AGENTS_MODERATOR_ENABLED=true/false - No separate enablement needed - uses existing ModeratorAgent configuration
Multi-Knowledge Base Processing Logic
Understanding how multiple knowledge bases are handled across different agents:
ModeratorAgent - Simple Routing
User Query → Intent Analysis → Route to KnowledgeBaseAgent → Return Result
- No multi-KB logic - simply routes to KnowledgeBaseAgent
- Single hop - passes query through without modification
- Fallback routing - falls back to AssistantAgent if KB agent fails
KnowledgeBaseAgent - Advanced Multi-KB Selection
Query → Normalize KB List → Score KBs → Select Top 2 → Query Each → Pick Best by Source Count → Return
** Detailed Flow (Single-Pass, No Iteration):**
Step 1: KB Input Normalization
# Accepts multiple input formats:
kb_names = ["kb1", "kb2"] # List of strings
kb_names = [{"kb_name": "kb1", "description": "..."}, ...] # List of objects
kb_names = '["kb1", "kb2"]' # JSON string
# Normalizes to: [{"kb_name": "kb1", "description": None}, ...]
Step 2: KB Scoring Algorithm
def score_kb_for_query(kb_entry, user_message, topics):
# Extract meaningful keywords (>3 chars, no stopwords)
keywords = extract_keywords(user_message + topics)
score = 0.0
kb_text = f"{kb_name} {kb_description}".lower()
# Base scoring: keyword overlap
for keyword in keywords:
if keyword in kb_text:
score += 1.0
# Domain bonuses
if "legal" in kb_text and legal_keywords_present:
score += 1.5
if "finance" in kb_text and finance_keywords_present:
score += 1.0
return score
# Example scoring:
# Query: "blockchain technology trends"
# - crypto_research_kb: score = 2.0 (matches "blockchain", "technology")
# - legal_documents_kb: score = 0.0 (no matches)
# - tech_trends_kb: score = 2.0 (matches "technology", "trends")
Step 3: KB Selection (Performance Optimization)
# Sort by score and select top 2 candidates
scored_kbs = [(kb, score) for kb, score in kb_scores]
scored_kbs.sort(key=lambda x: x[1], reverse=True)
# Select top 2 with positive scores
top_kbs = [kb for kb, score in scored_kbs if score > 0][:2]
# Fallback: use first 2 if no positive scores
if not top_kbs:
top_kbs = original_kb_list[:2]
Step 4: Multi-KB Querying (Single Pass - No Iteration Loop)
async def query_multiple_kbs(selected_kbs, query):
results = {}
best_answer = None
best_source_count = -1
# Process each KB exactly once (no retries or iterations)
for kb in selected_kbs: # Usually 2 KBs
# Single attempt per KB
result = await query_single_kb(kb.name, query)
results[kb.name] = result
if result.success:
source_count = len(result.source_details)
# Winner = KB with most document sources
if source_count > best_source_count:
best_answer = result.answer
best_source_count = source_count
best_kb = kb.name
return best_answer, full_metadata
Step 5: Best Answer Selection
- Criteria: Number of source documents (not quality assessment)
- Winner takes all: Returns single best answer (no synthesis)
- Metadata: Includes results from all queried KBs
** Important Characteristics:**
- No iteration loops - each KB queried exactly once
- No quality assessment - purely source count based
- No answer synthesis - returns single best answer
- Performance optimized - maximum 2 KBs processed
- Deterministic - same query always produces same KB selection
KnowledgeSynthesisAgent - Multi-Source with Quality Iteration
Query → Analyze Strategy → Gather from All Sources → Quality Assessment → [Iterate if Needed] → Synthesize Final Answer
** Detailed Flow (Iterative with Quality Control):**
Step 1: Query Analysis & Strategy
analysis = await analyze_query(query)
# Determines: knowledge_first, web_first, parallel, or adaptive strategy
# Based on: query type, time sensitivity, complexity
Step 2: Multi-Source Gathering
# Parallel execution across all source types
sources = await asyncio.gather(
gather_from_knowledge_base(query), # Routes to KnowledgeBaseAgent
gather_from_web_search(query), # Routes to WebSearchAgent
gather_from_web_scraping(query) # Routes to WebScraperAgent
)
Step 3: Quality Assessment & Iteration Loop
for iteration in range(max_iterations): # Usually 3 iterations max
assessment = await quality_assessor.assess_response(sources, query)
# Check if quality meets threshold
if assessment.quality_level >= quality_threshold:
break # Success - exit iteration loop
# If quality insufficient, gather additional sources
if assessment.needs_additional_sources:
for suggested_source in assessment.suggested_sources:
additional_response = await gather_from_source(suggested_source, query)
sources.append(additional_response)
# Final synthesis of all gathered information
final_answer = await synthesize_responses(sources, query)
Step 4: Quality-Driven Source Integration
- Quality metrics: accuracy, completeness, relevance, currency
- Iterative refinement: adds sources until quality threshold met
- Intelligent synthesis: combines information from all sources
- Source attribution: tracks which sources contributed to answer
Comparison Summary
| Aspect | ModeratorAgent | KnowledgeBaseAgent | KnowledgeSynthesisAgent |
|---|---|---|---|
| Multi-KB Handling | None (routes only) | Advanced selection logic | Routes to KB + other sources |
| KB Selection | No selection | Score-based top 2 selection | Uses KB agent's selection |
| Iteration | No | No (single-pass) | Yes (quality-driven) |
| Answer Selection | First response | Most sources wins | Quality assessment |
| Synthesis | None | None (single answer) | Multi-source synthesis |
| Performance | Fast (single route) | Fast (max 2 KBs) | Slower (comprehensive) |
| Quality Control | Basic | Source count based | Advanced quality assessment |
| Use Case | General routing | Multi-KB queries | Research & comprehensive answers |
When to Use Which Agent
Use ModeratorAgent when:
- Simple routing to appropriate agents
- General-purpose applications
- Fast response times needed
Use KnowledgeBaseAgent directly when:
- Specifically need to query multiple knowledge bases
- Want deterministic KB selection logic
- Need metadata about which KBs were used
Use KnowledgeSynthesisAgent when:
- Need comprehensive, high-quality answers
- Want information from multiple source types
- Quality assessment and iteration is important
- Research-oriented tasks requiring synthesis
Database Agent (Optional)
Best for: Database connections, data exploration, and basic queries
- Multi-Database Support: MongoDB, MySQL, and PostgreSQL connections
- Schema Analysis: Automatic database structure discovery and exploration
- Natural Language Queries: Convert conversational requests to SQL/MongoDB queries
- File Ingestion: Direct JSON/CSV import into database tables
- Safety-First Design: Read-only mode by default, simple SELECT queries only
- Export Integration: Seamless handoff to AnalyticsAgent for complex analysis
- Intentionally Limited: Simple queries only (no JOINs, window functions, CTEs)
- Use Cases: Data exploration, basic queries, file imports, database connections
- Note: Requires database extra:
pip install ambivo-agents[database]
Analytics Agent
Best for: Complex data analysis, advanced SQL, and statistical operations
- Advanced SQL Engine: Full DuckDB support with complex operations
- Complex JOINs: INNER, LEFT, RIGHT, OUTER joins across multiple datasets
- Window Functions: ROW_NUMBER(), RANK(), SUM() OVER(), statistical analysis
- Advanced Aggregations: GROUP BY with HAVING, complex statistical functions
- CTEs & Subqueries: WITH clauses, correlated subqueries, complex logic
- UNION Operations: Combine result sets with UNION/UNION ALL
- Multi-File Analysis: Load and join multiple CSV/XLSX files simultaneously
- Statistical Functions: Percentiles, correlations, trend analysis, outlier detection
- Visualization: Text-based charts and intelligent chart recommendations
- Docker Security: All operations run in isolated containers
- Use Cases: Business intelligence, complex analytics, statistical modeling, data science
When to Use Which Agent:
- DatabaseAgent: Simple queries, database connections, data exploration
- AnalyticsAgent: Complex analysis, joins, statistical operations, advanced SQL
Assistant Agent
Enhanced with Skill Assignment System
- General purpose conversational AI with intelligent skill routing
- Context-aware responses and multi-turn conversations
- Customizable system messages
Skill Assignment
Instead of manually routing to specialized agents, you assign skills to AssistantAgent or ModeratorAgent. They automatically detect when to use these skills and handle the technical details internally.
from ambivo_agents import AssistantAgent
assistant = AssistantAgent.create_simple(user_id="user123")
# Assign an API skill from an OpenAPI spec
await assistant.assign_api_skill(
api_spec_path="/path/to/openapi.yaml",
base_url="https://api.example.com/v1",
api_token="your-token"
)
# Natural language requests now route to the API automatically
response = await assistant.chat("create a lead for John Doe")
# Agent: detects intent -> spawns APIAgent -> makes API call -> returns natural response
await assistant.cleanup_session()
Available skill types:
# 1. API Skills -- triggers on: "create lead", "call api", "list contacts", etc.
await agent.assign_api_skill(
api_spec_path="path/to/openapi.yaml",
base_url="https://api.example.com",
api_token="your-token",
skill_name="my_api" # optional
)
# 2. Database Skills -- triggers on: "query database", "show data", "recent sales", etc.
await agent.assign_database_skill(
connection_string="postgresql://user:pass@host:5432/db",
skill_name="main_db",
description="Customer database"
)
# 3. Knowledge Base Skills -- triggers on: "search docs", "company policy", etc.
await agent.assign_kb_skill(
documents_path="/path/to/docs/",
collection_name="company_docs",
skill_name="knowledge",
temporary=True, # use temp KB with TTL lifecycle (default: True)
ttl_hours=24, # TTL for temporary KBs
vectordb_api_url=None, # override vectordb-api URL (default: from config)
vectordb_api_token=None, # override auth token for vectordb-api
)
Priority: Skills are checked before normal routing. In AssistantAgent, skills run before conversation processing. In ModeratorAgent, skills take priority over agent routing. If no skills match, normal behavior continues.
# List assigned skills
skills = agent.list_assigned_skills()
# {'api_skills': ['my_api'], 'database_skills': ['main_db'], 'kb_skills': [], 'total_skills': 2}
Code Executor Agent
- Secure Python and Bash execution in Docker
- Isolated environment with resource limits
- Real-time output streaming
Web Search Agent
- Multi-provider search (Brave, AVES APIs)
- Academic search capabilities
- Automatic provider failover
Web Scraper Agent
- Proxy-enabled scraping (ScraperAPI compatible)
- Playwright and requests-based scraping
- Batch URL processing with rate limiting
Knowledge Base Agent
- Document ingestion (PDF, DOCX, TXT, web URLs)
- Vector similarity search with Qdrant
- Semantic question answering
Media Editor Agent
- Audio/video processing with FFmpeg
- Format conversion, resizing, trimming
- Audio extraction and volume adjustment
YouTube Download Agent
- Download videos and audio from YouTube
- Docker-based execution with pytubefix
- Automatic title sanitization and metadata extraction
Gather Agent
Intelligent conversational form-filling with natural language understanding
- Conversational Interface: Ask questions one at a time with natural flow
- Multiple Question Types: free-text, yes-no, single-select, multi-select
- Smart Questionnaire Loading: JSON/YAML from chat, files, or URLs
- Conditional Logic: Advanced dependent question workflows
- Natural Language Parsing (NEW): Understand conversational responses
- "Absolutely!" → "Yes" for yes/no questions
- "I'd prefer email" → maps to email option in single-select
- "Both AWS and Azure" → maps to multiple selections
- "We have about 4 people" → maps to "3-5 people" range
- Graceful Fallback: Standard exact matching when NLP is disabled
- Session Persistence: Remember answers across conversation (~1 hour)
- API Submission: Configurable endpoint with collection status tracking
Configuration:
# Enable natural language understanding (requires LLM)
gather:
enable_natural_language_parsing: true # Default: false
# Or via environment variable:
# export AMBIVO_AGENTS_GATHER_ENABLE_NATURAL_LANGUAGE_PARSING=true
Usage
# Interactive CLI demo (waits for your input after each question)
python examples/gather_cli.py
# Load questionnaire from a file path or URL
python examples/gather_cli.py --path ./questionnaire.yaml
python examples/gather_cli.py --path https://example.com/questionnaire.json
# Do a real HTTP submission (configure endpoint in agent_config.yaml)
python examples/gather_cli.py --real-submit
# Minimal scripted example
python examples/gather_simple.py
Configuration
agent_capabilities:
enable_gather: true
gather:
submission_endpoint: "https://your-api.example.com/submit"
submission_method: "POST"
submission_headers:
Authorization: "Bearer your-api-token"
Content-Type: "application/json"
memory_ttl_seconds: 3600
# Optional: Validate free-text answers with LLM before proceeding
enable_llm_answer_validation: false
answer_validation:
default_min_length: 1
# Optional: Let LLM rewrite prompts (off by default for predictability)
enable_llm_prompt_rewrite: false
See also: examples/gather_cli.py and examples/gather_simple.py for end-to-end demos. You can also wrap GatherAgent behind a small REST API for chatbot UIs (start/reply/finish/abort pattern).
API Agent
- Comprehensive HTTP/REST API integration
- Multiple authentication methods (Bearer, API Key, Basic, OAuth2)
- Pre-fetch authentication with automatic token refresh
- Built-in retry logic with exponential backoff
- Security features (domain filtering, SSL verification)
- Support for all HTTP methods (GET, POST, PUT, DELETE, etc.)
API Agent Configuration
Timeout Settings:
The API Agent supports configurable timeout settings for different use cases:
# Environment variable configuration
export AMBIVO_AGENTS_API_AGENT_TIMEOUT_SECONDS=46 # Custom timeout in seconds
# Or in agent_config.yaml
api_agent:
timeout_seconds: 46 # Request timeout
max_safe_timeout: 46 # Requests above this use Docker for safety
force_docker_above_timeout: false # Enable Docker for long-running requests
Localhost and Domain Access:
For testing with local services or specific domain restrictions:
# Allow localhost access (disabled by default for security)
export AMBIVO_AGENTS_API_AGENT_ALLOWED_DOMAINS="127.0.0.1,localhost,api.example.com"
export AMBIVO_AGENTS_API_AGENT_BLOCKED_DOMAINS="" # Clear default blocks
# Or in agent_config.yaml
api_agent:
allowed_domains:
- "127.0.0.1"
- "localhost"
- "api.example.com"
- "*.trusted-domain.com" # Wildcards supported
blocked_domains: [] # Override default localhost blocks
# Default security settings (recommended for production)
# allowed_domains: null # Allows all except blocked
# blocked_domains:
# - "localhost"
# - "127.0.0.1"
# - "0.0.0.0"
# - "169.254.169.254" # AWS metadata service
Usage Examples:
from ambivo_agents import APIAgent
# Test with local transcription service
async def test_local_api():
agent = APIAgent.create_simple(user_id="tester")
# Natural language API call
response = await agent.chat("""
Make a POST request to http://127.0.0.1:8002/kh/transcribe with:
- Authorization: Bearer your-jwt-token
- Content-Type: application/json
- Body: {"s3_url": "https://your-bucket.s3.amazonaws.com/audio.wav"}
""")
await agent.cleanup_session()
# Direct API request with custom timeout
async def direct_api_call():
from ambivo_agents.agents.api_agent import APIRequest, HTTPMethod
agent = APIAgent.create_simple(user_id="api_user")
request = APIRequest(
url="http://127.0.0.1:8002/kh/transcribe",
method=HTTPMethod.POST,
headers={
"Authorization": "Bearer your-jwt-token",
"Content-Type": "application/json"
},
json_data={"s3_url": "https://your-bucket.s3.amazonaws.com/audio.wav"},
timeout=46 # Custom timeout in seconds
)
response = await agent.make_api_request(request)
print(f"Status: {response.status_code}")
print(f"Duration: {response.duration_ms}ms")
await agent.cleanup_session()
Security Notes:
- Localhost access is blocked by default for security
- Always use
allowed_domainsin production to restrict API access - Set appropriate timeouts to prevent long-running requests
- Consider using
max_safe_timeoutfor requests that might take longer
Workflow System
The workflow system enables multi-agent orchestration with sequential and parallel execution patterns:
Basic Workflow Usage
from ambivo_agents.core.workflow import WorkflowBuilder, WorkflowPatterns
from ambivo_agents import ModeratorAgent
async def workflow_example():
# Create moderator with agents
moderator, context = ModeratorAgent.create(
user_id="workflow_user",
enabled_agents=['web_search', 'web_scraper', 'knowledge_base']
)
# Create search -> scrape -> ingest workflow
workflow = WorkflowPatterns.create_search_scrape_ingest_workflow(
moderator.specialized_agents['web_search'],
moderator.specialized_agents['web_scraper'],
moderator.specialized_agents['knowledge_base']
)
# Execute workflow
result = await workflow.execute(
"Research renewable energy trends and store in knowledge base",
context.to_execution_context()
)
if result.success:
print(f"Workflow completed in {result.execution_time:.2f}s")
print(f"Nodes executed: {result.nodes_executed}")
await moderator.cleanup_session()
Advanced Workflow Features
from ambivo_agents.core.enhanced_workflow import (
AdvancedWorkflowBuilder, EnhancedModeratorAgent
)
async def advanced_workflow():
# Create enhanced moderator
base_moderator, context = ModeratorAgent.create(user_id="advanced_user")
enhanced_moderator = EnhancedModeratorAgent(base_moderator)
# Natural language workflow triggers
response1 = await enhanced_moderator.process_message_with_workflows(
"I need agents to reach consensus on climate solutions"
)
response2 = await enhanced_moderator.process_message_with_workflows(
"Create a debate between agents about AI ethics"
)
# Check workflow status
status = await enhanced_moderator.get_workflow_status()
print(f"Available workflows: {status['advanced_workflows']['registered']}")
Workflow Patterns
All patterns are implemented in ambivo_agents/core/workflow.py and enhanced_workflow.py:
| Pattern | Factory Method | Implementation |
|---|---|---|
| Sequential | WorkflowBuilder + execute() |
Iterates nodes in order, passes each response as input to the next agent |
| Parallel | WorkflowBuilder + execute_parallel() |
Groups nodes into execution levels, runs each level with asyncio.gather() |
| Consensus | AdvancedWorkflowPatterns.create_consensus_workflow() |
Loops agents up to 5 iterations; checks agreement via keyword scoring (agree/yes/correct vs disagree/no/wrong). Consensus detection is keyword-based, not LLM-based. |
| Debate | AdvancedWorkflowPatterns.create_debate_workflow() |
Multi-round structured debate between debater agents with a moderator agent at start and end |
| Error Recovery | AdvancedWorkflowPatterns.create_error_recovery_workflow() |
Conditional routing: primary agent runs first; on error (message type or keyword), falls back through a chain of backup agents |
| Map-Reduce | AdvancedWorkflowBuilder.add_map_reduce_pattern() |
Parallel mapper agents all feed into a single reducer agent |
Concurrency model: All async concurrency uses asyncio (no Celery or external task queues). Parallel workflows use asyncio.gather() for concurrent agent execution within a single process.
System Messages
System messages control agent behavior and responses. Each agent supports custom system prompts:
Default System Messages
# Agents come with role-specific system messages
assistant_agent = AssistantAgent.create_simple(user_id="user")
# Default: "You are a helpful AI assistant. Provide accurate, thoughtful responses..."
code_agent = CodeExecutorAgent.create_simple(user_id="user")
# Default: "You are a code execution specialist. Write clean, well-commented code..."
Custom System Messages
from ambivo_agents import AssistantAgent
# Create agent with custom system message
custom_system = """You are a technical documentation specialist.
Always provide detailed explanations with code examples.
Use professional terminology and structured responses."""
agent, context = AssistantAgent.create(
user_id="doc_specialist",
system_message=custom_system
)
response = await agent.chat("Explain REST API design principles")
Moderator System Messages
from ambivo_agents import ModeratorAgent
# Custom moderator behavior
moderator_system = """You are a project management assistant.
Route technical queries to appropriate agents and provide
executive summaries of complex multi-agent interactions."""
moderator, context = ModeratorAgent.create(
user_id="pm_user",
system_message=moderator_system
)
System Message Features
- Context Integration: System messages work with conversation history
- Agent-Specific: Each agent type has optimized default prompts
- Workflow Aware: System messages adapt to workflow contexts
- Provider Compatibility: Works with all LLM providers (OpenAI, Anthropic, Bedrock)
Prerequisites
Required
- Python 3.11+
- Docker (for code execution, media processing, YouTube downloads)
- Redis (Cloud Redis recommended)
API Keys (Optional - based on enabled features)
- OpenAI API Key (for GPT models)
- Anthropic API Key (for Claude models)
- AWS Credentials (for Bedrock models)
- Brave Search API Key (for web search)
- AVES API Key (for web search)
- ScraperAPI/Proxy credentials (for web scraping)
- Qdrant Cloud API Key (for Knowledge Base operations)
- Redis Cloud credentials (for memory management)
Installation
1. Install Dependencies
Core Installation (Python 3.11-3.13):
pip install ambivo-agents
The core package includes OpenAI, Anthropic, Redis, Docker, and essential utilities. All LLM calls use the provider SDKs directly (no LangChain required).
With Optional Features:
# Web capabilities (APIAgent, WebScraperAgent)
pip install ambivo-agents[web]
# Media processing (YouTubeDownloadAgent)
pip install ambivo-agents[media]
# AWS Bedrock LLM support
pip install ambivo-agents[aws]
# Database support (DatabaseAgent - MongoDB, MySQL, PostgreSQL)
pip install ambivo-agents[database]
# Document processing (PDF, DOCX, PPTX, images, unstructured)
pip install ambivo-agents[documents]
# Data analytics (AnalyticsAgent - pandas)
pip install ambivo-agents[analytics]
# Async utilities (aiohttp, aiofiles, aiosqlite)
pip install ambivo-agents[async]
# Knowledge base (KnowledgeBaseAgent - LlamaIndex, Qdrant, LangChain)
# NOTE: Python 3.11-3.12 only - LlamaIndex/LangChain do not support 3.13
pip install ambivo-agents[knowledge]
# All runtime extras (Python 3.11-3.13)
pip install ambivo-agents[full]
# Everything including dev tools (Python 3.11-3.13)
pip install ambivo-agents[all]
# Full extras + knowledge base (Python 3.11-3.12 only)
pip install ambivo-agents[all-ml]
Extras Comparison: [full] vs [all] vs [all-ml]
| Extra | Includes | Use Case |
|---|---|---|
[full] |
web, media, aws, database, documents, analytics, async | Production / integration — all runtime agent capabilities, no dev tooling |
[all] |
[full] + dev tools (pytest, black, isort, pre-commit) |
Development — for contributors working on ambivo-agents itself |
[all-ml] |
[all] + knowledge (LlamaIndex, LangChain, Qdrant) |
Development with KB — Python 3.11-3.12 only |
Recommendation: Use [full] for applications that consume ambivo-agents as a dependency. Use [all] or [all-ml] only when developing or testing the ambivo-agents package itself.
Note: None of the bundle extras include
[knowledge](LlamaIndex/LangChain/Qdrant) except[all-ml]. If your application needs KnowledgeBaseAgent, either install[knowledge]separately or use[all-ml]. Applications like vectordb-api that already list LlamaIndex and LangChain in their own requirements.txt do not need the[knowledge]extra.
Python Version Compatibility
| Extra | Python 3.11 | Python 3.12 | Python 3.13 |
|---|---|---|---|
Core (pip install ambivo-agents) |
Yes | Yes | Yes |
[web], [media], [aws], [database] |
Yes | Yes | Yes |
[documents], [analytics], [async] |
Yes | Yes | Yes |
[full], [all] |
Yes | Yes | Yes |
[knowledge] |
Yes | Yes | No |
[all-ml] |
Yes | Yes | No |
The [knowledge] extra depends on LangChain and LlamaIndex for vector database integration (Qdrant). These libraries do not yet support Python 3.13. All other features work across Python 3.11-3.13.
Local Development (recommended):
# Python 3.13
pip uninstall ambivo-agents -y && pip install -e ".[all]"
# pip uninstall ambivo-agents -y && pip install -e <local-path-to-ambivo-agent>
# Python 3.11-3.12 (includes knowledge base)
pip uninstall ambivo-agents -y && pip install -e ".[all-ml]"
2. Setup Docker Images
docker pull sgosain/amb-ubuntu-python-public-pod
Note: If deploying inside the Docker image itself (e.g., on Railway, Render), you can skip this step and set
use_docker: falsein config. See Running Without Docker below.
3. Setup Redis
Recommended: Cloud Redis
# In agent_config.yaml
redis:
host: "your-redis-cloud-endpoint.redis.cloud"
port: 6379
password: "your-redis-password"
Alternative: Local Redis
# Using Docker
docker run -d --name redis -p 6379:6379 redis:latest
Configuration
Create agent_config.yaml in your project root:
# Redis Configuration (Required)
redis:
host: "your-redis-cloud-endpoint.redis.cloud"
port: 6379
db: 0
password: "your-redis-password"
# LLM Configuration (Required - at least one provider)
llm:
preferred_provider: "openai"
temperature: 0.7
openai_api_key: "your-openai-key"
anthropic_api_key: "your-anthropic-key"
aws_access_key_id: "your-aws-key"
aws_secret_access_key: "your-aws-secret"
aws_region: "us-east-1"
# Agent Capabilities
agent_capabilities:
enable_knowledge_base: true
enable_web_search: true
enable_code_execution: true
enable_file_processing: true
enable_web_ingestion: true
enable_api_calls: true
enable_web_scraping: true
enable_proxy_mode: true
enable_media_editor: true
enable_youtube_download: true
enable_analytics: true
# ModeratorAgent default agents
moderator:
default_enabled_agents:
- knowledge_base
- web_search
- assistant
- media_editor
- youtube_download
- code_executor
- web_scraper
- analytics
# Service-specific configurations
web_search:
brave_api_key: "your-brave-api-key"
avesapi_api_key: "your-aves-api-key"
knowledge_base:
qdrant_url: "https://your-cluster.qdrant.tech"
qdrant_api_key: "your-qdrant-api-key"
chunk_size: 1024
chunk_overlap: 20
similarity_top_k: 5
youtube_download:
docker_image: "sgosain/amb-ubuntu-python-public-pod"
download_dir: "./youtube_downloads"
timeout: 600
memory_limit: "1g"
default_audio_only: true
analytics:
docker_image: "sgosain/amb-ubuntu-python-public-pod"
input_subdir: "analytics"
output_subdir: "analytics"
temp_subdir: "analytics"
handoff_subdir: "analytics"
timeout: 120
memory_limit: "2g"
docker:
timeout: 60
memory_limit: "512m"
images: ["sgosain/amb-ubuntu-python-public-pod"]
Configuration Methods
The library supports two configuration methods:
1. Environment Variables (Recommended for Production)
Quick Start with Environment Variables:
# Download and edit the full template
curl -o set_env.sh https://github.com/ambivo-corp/ambivo-agents/raw/main/set_env_template.sh
chmod +x set_env.sh
# Edit the template with your credentials, then source it
source set_env.sh
Replace ALL placeholder values with your actual credentials:
- Redis connection details
- LLM API keys (OpenAI/Anthropic)
- Web Search API keys (Brave/AVES)
- Knowledge Base credentials (Qdrant)
- Web Scraping proxy (ScraperAPI)
Minimal Environment Setup:
# Required - Redis
export AMBIVO_AGENTS_REDIS_HOST="your-redis-host.redis.cloud"
export AMBIVO_AGENTS_REDIS_PORT="6379"
export AMBIVO_AGENTS_REDIS_PASSWORD="your-redis-password"
# Required - At least one LLM provider
export AMBIVO_AGENTS_LLM_OPENAI_API_KEY="sk-your-openai-key"
# Optional - Enable specific agents
export AMBIVO_AGENTS_AGENT_CAPABILITIES_ENABLE_YOUTUBE_DOWNLOAD="true"
export AMBIVO_AGENTS_AGENT_CAPABILITIES_ENABLE_WEB_SEARCH="true"
export AMBIVO_AGENTS_AGENT_CAPABILITIES_ENABLE_ANALYTICS="true"
# Run your application
python your_app.py
2. YAML Configuration (Traditional)
Use the full YAML template:
# Download and edit the full template
curl -o agent_config_sample.yaml https://github.com/ambivo-corp/ambivo-agents/raw/main/agent_config_sample.yaml
# Copy to your config file and edit with your credentials
cp agent_config_sample.yaml agent_config.yaml
Replace ALL placeholder values with your actual credentials, then create agent_config.yaml in your project root.
Docker Deployment with Environment Variables
# docker-compose.yml
version: '3.8'
services:
ambivo-app:
image: your-app:latest
environment:
- AMBIVO_AGENTS_REDIS_HOST=redis
- AMBIVO_AGENTS_REDIS_PORT=6379
- AMBIVO_AGENTS_LLM_OPENAI_API_KEY=${OPENAI_API_KEY}
- AMBIVO_AGENTS_AGENT_CAPABILITIES_ENABLE_YOUTUBE_DOWNLOAD=true
- AMBIVO_AGENTS_AGENT_CAPABILITIES_ENABLE_ANALYTICS=true
volumes:
- ./downloads:/app/downloads
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- redis
redis:
image: redis:latest
ports:
- "6379:6379"
Note: Environment variables take precedence over YAML configuration. The agent_config.yaml file is optional when using environment variables.
Project Structure
ambivo_agents/
agents/ # Agent implementations
analytics.py # Analytics Agent (DuckDB data analysis)
api_agent.py # API Agent (HTTP/REST integration)
assistant.py # Assistant Agent (general conversation)
code_executor.py # Code Executor Agent (Docker-based execution)
database_agent.py # Database Agent (MongoDB, MySQL, PostgreSQL)
knowledge_base.py # Knowledge Base Agent (Qdrant vector search)
media_editor.py # Media Editor Agent (FFmpeg processing)
moderator.py # ModeratorAgent (main orchestrator)
web_scraper.py # Web Scraper Agent (Playwright-based)
web_search.py # Web Search Agent (Brave/AVES search)
youtube_download.py # YouTube Download Agent (pytubefix)
config/ # Configuration management
core/ # Core functionality
base.py
llm.py
memory.py
workflow.py # Basic workflow system
enhanced_workflow.py # Advanced workflow patterns
executors/ # Execution environments
services/ # Service layer
__init__.py # Package initialization
cli.py # Command line interface
Skill Assignment System
Overview
The Skill Assignment System allows AssistantAgent and ModeratorAgent to be "assigned" external capabilities like API specifications, database connections, and knowledge bases. The agents then intelligently detect when to use these skills and internally spawn specialized agents on-demand.
Key Features
- Intelligent Intent Detection - Automatically detects when user requests should use assigned skills
- Dynamic Agent Spawning - Creates APIAgent, DatabaseAgent, KnowledgeBaseAgent internally as needed
- Natural Language Translation - Converts technical responses to conversational language
- Graceful Fallback - Falls back to normal agent behavior when no skills match
- Priority System - Assigned skills take precedence over normal agent routing
API Skill Assignment
from ambivo_agents import AssistantAgent
async def api_skill_example():
# Create agent
assistant = AssistantAgent.create_simple(user_id="developer")
# Assign API skill from OpenAPI spec
result = await assistant.assign_api_skill(
api_spec_path="/path/to/openapi.yaml",
base_url="https://api.example.com/v1",
api_token="your-api-token",
skill_name="my_api"
)
# Now natural language API requests work automatically!
response = await assistant.chat("create a lead for John Doe")
# Agent automatically:
# 1. Detects API intent
# 2. Spawns APIAgent internally
# 3. Makes the API call
# 4. Returns natural language response
await assistant.cleanup_session()
Database Skill Assignment
async def database_skill_example():
assistant = AssistantAgent.create_simple(user_id="analyst")
# Assign database skill
await assistant.assign_database_skill(
connection_string="postgresql://user:pass@localhost:5432/sales_db",
skill_name="sales_database",
description="Sales and customer data"
)
# Natural language database queries
response = await assistant.chat("show me recent sales data")
# Internally creates DatabaseAgent and executes query
await assistant.cleanup_session()
Knowledge Base Skill Assignment
async def kb_skill_example():
assistant = AssistantAgent.create_simple(user_id="support")
# Assign knowledge base skill
await assistant.assign_kb_skill(
documents_path="/path/to/company/docs/",
collection_name="company_knowledge",
skill_name="company_docs"
)
# Document search requests
response = await assistant.chat("what do our docs say about pricing?")
# Internally creates KnowledgeBaseAgent and searches documents
await assistant.cleanup_session()
Multiple Skills with ModeratorAgent
async def multiple_skills_example():
moderator = ModeratorAgent.create_simple(
user_id="power_user",
enabled_agents=["assistant", "api_agent", "database_agent"]
)
# Assign multiple skills
await moderator.assign_api_skill("/path/to/api_spec.yaml", "https://api.example.com")
await moderator.assign_database_skill("postgresql://localhost/db", "main_db")
await moderator.assign_kb_skill("/docs/", skill_name="knowledge")
# Skills take priority over agent routing
response1 = await moderator.chat("create a lead") # → Uses API skill
response2 = await moderator.chat("query the database") # → Uses DB skill
response3 = await moderator.chat("search documentation") # → Uses KB skill
response4 = await moderator.chat("what's the weather?") # → Normal routing
await moderator.cleanup_session()
Skill Management
async def skill_management():
assistant = AssistantAgent.create_simple(user_id="user")
# Assign skills
await assistant.assign_api_skill("/api/spec.yaml", skill_name="api1")
await assistant.assign_database_skill("mysql://localhost/db", "db1")
# Check assigned skills
skills = assistant.list_assigned_skills()
print(f"Assigned skills: {skills}")
# Output: {'api_skills': ['api1'], 'database_skills': ['db1'], 'kb_skills': [], 'total_skills': 2}
# Agent status includes skill information
status = assistant.get_agent_status()
print(f"Capabilities: {status['capabilities']}")
print(f"Assigned skills: {status['assigned_skills']}")
await assistant.cleanup_session()
Usage Examples
ModeratorAgent with Auto-Routing
from ambivo_agents import ModeratorAgent
import asyncio
async def basic_moderator():
moderator, context = ModeratorAgent.create(user_id="demo_user")
# Auto-routing examples
examples = [
"Download audio from https://youtube.com/watch?v=example",
"Search for latest artificial intelligence news",
"Load data from sales.csv and analyze trends",
"Extract audio from video.mp4 as high quality MP3",
"What is machine learning and how does it work?",
]
for query in examples:
response = await moderator.chat(query)
print(f"Response: {response[:100]}...")
await moderator.cleanup_session()
asyncio.run(basic_moderator())
Context-Aware Conversations
async def context_conversation():
moderator, context = ModeratorAgent.create(user_id="context_demo")
# Initial request
response1 = await moderator.chat("Download audio from https://youtube.com/watch?v=example")
# Follow-up using context
response2 = await moderator.chat("Actually, download the video instead of just audio")
# Another follow-up
response3 = await moderator.chat("Get information about that video")
await moderator.cleanup_session()
YouTube Downloads
from ambivo_agents import YouTubeDownloadAgent
async def download_youtube():
agent, context = YouTubeDownloadAgent.create(user_id="media_user")
# Download audio
result = await agent._download_youtube_audio(
"https://youtube.com/watch?v=example"
)
if result['success']:
print(f"Audio downloaded: {result['filename']}")
print(f"Path: {result['file_path']}")
await agent.cleanup_session()
Database Operations
DatabaseAgent - Basic Queries & Exploration
from ambivo_agents import DatabaseAgent
async def database_exploration_demo():
"""DatabaseAgent - Perfect for database connections and basic queries"""
agent = DatabaseAgent.create_simple(user_id="db_user")
# Connect to databases
await agent.chat("Connect to MySQL database at localhost:3306, database: mydb, username: user, password: pass")
# OR: await agent.chat("Connect to MongoDB using URI mongodb://localhost:27017/myapp")
# OR: await agent.chat("Connect to PostgreSQL at localhost:5432 database mydb user postgres password secret")
# Schema discovery and exploration
schema = await agent.chat("show me the database schema")
tables = await agent.chat("list all tables and collections")
structure = await agent.chat("describe the users table structure")
# Simple natural language queries (safety-limited)
users = await agent.chat("show me all users") # → SELECT * FROM users LIMIT 10
count = await agent.chat("count total orders") # → SELECT COUNT(*) FROM orders
recent = await agent.chat("show recent sales") # → SELECT * FROM sales ORDER BY date DESC LIMIT 10
# File ingestion into database
await agent.chat("ingest users.csv into users table")
await agent.chat("load sales.json into MongoDB sales collection")
# Export data for complex analysis
await agent.chat("export sales data for analytics") # → Hands off to AnalyticsAgent
await agent.cleanup_session()
AnalyticsAgent - Advanced SQL & Complex Analysis
from ambivo_agents import AnalyticsAgent
async def advanced_analytics_demo():
"""AnalyticsAgent - Advanced SQL operations and complex analysis"""
agent = AnalyticsAgent.create_simple(user_id="analyst")
# Load multiple datasets for complex analysis
await agent.chat("load data from sales.csv, customers.csv, and products.xlsx")
# Complex JOINs and multi-table analysis
result = await agent.chat("""
Find top customers by revenue with their order history:
JOIN sales with customers and calculate total revenue per customer
""")
# → Generates: SELECT c.name, c.email, SUM(s.amount) as total_revenue, COUNT(s.id) as order_count
# FROM customers c JOIN sales s ON c.id = s.customer_id
# GROUP BY c.id, c.name, c.email ORDER BY total_revenue DESC LIMIT 10
# Window functions for advanced analytics
trends = await agent.chat("""
Calculate monthly sales trends with running totals and growth rates
""")
# → Generates: SELECT month, sales,
# SUM(sales) OVER (ORDER BY month) as running_total,
# LAG(sales) OVER (ORDER BY month) as prev_month,
# (sales - LAG(sales) OVER (ORDER BY month)) / LAG(sales) OVER (ORDER BY month) * 100 as growth_rate
# FROM monthly_sales ORDER BY month
# Common Table Expressions (CTEs) for complex logic
cohort = await agent.chat("""
Analyze customer cohort retention using CTEs to track repeat purchases
""")
# → Generates complex CTE-based cohort analysis
# Statistical analysis and correlations
stats = await agent.chat("find correlations between price, quantity, and customer satisfaction")
outliers = await agent.chat("identify outliers in sales data using statistical methods")
seasonality = await agent.chat("analyze seasonal patterns in sales with time series functions")
# Advanced aggregations with HAVING clauses
segments = await agent.chat("""
Group customers by purchase behavior and find high-value segments
""")
# → Generates: SELECT segment, COUNT(*) as customers, AVG(total_spent) as avg_spent
# FROM customer_segments GROUP BY segment HAVING AVG(total_spent) > 1000
# UNION operations for combining datasets
combined = await agent.chat("combine Q1 and Q2 sales data and analyze trends")
await agent.cleanup_session()
Database to Analytics Workflow - Best of Both Worlds
async def complete_data_workflow():
"""Combining DatabaseAgent exploration with AnalyticsAgent advanced analysis"""
from ambivo_agents import ModeratorAgent
# Use ModeratorAgent for automatic routing
moderator = ModeratorAgent.create_simple(
user_id="workflow_user",
enabled_agents=["database_agent", "analytics", "assistant"]
)
# Step 1: DatabaseAgent - Connect and explore (automatic routing)
await moderator.chat("Connect to MySQL localhost:3306 database ecommerce user admin password secret")
schema = await moderator.chat("show me the database schema and table relationships")
# Step 2: DatabaseAgent - Export data for complex analysis
await moderator.chat("export sales data joined with customer data for advanced analytics")
# Step 3: AnalyticsAgent - Advanced analysis (automatic routing)
analysis = await moderator.chat("""
Analyze the exported sales data:
1. Calculate customer lifetime value using window functions
2. Identify seasonal trends with time series analysis
3. Find correlations between customer demographics and purchase behavior
4. Create customer segmentation using statistical clustering
""")
# Step 4: AnalyticsAgent - Generate insights and recommendations
insights = await moderator.chat("create executive summary with key insights and recommendations")
await moderator.cleanup_session()
Feature Comparison Summary
| Capability | DatabaseAgent | AnalyticsAgent |
|---|---|---|
| Database Connections | MySQL, PostgreSQL, MongoDB | File-based only |
| Schema Discovery | Full database exploration | File schema analysis |
| Simple Queries | Basic SELECT, COUNT, etc. | All SQL operations |
| Complex JOINs | Safety-limited | Full JOIN support |
| Window Functions | Not supported | Complete support |
| CTEs & Subqueries | Not supported | Advanced SQL |
| Statistical Analysis | Basic only | Advanced statistics |
| Multi-File Analysis | Single connection | Load multiple files |
| File Ingestion | Direct to database | In-memory processing |
| Best Use Case | Database exploration & connection | Complex analysis & business intelligence |
File Reading and Database Ingestion
All agents have built-in file reading capabilities for JSON, CSV, XML, and YAML files. Database insertion requires the optional database package.
from ambivo_agents import AssistantAgent
async def read_file_and_insert_to_database():
"""Reads a JSON file and attempts database insertion with graceful fallback"""
# Step 1: Read and parse file (always available)
agent = AssistantAgent.create_simple(user_id="file_user")
result = await agent.read_and_parse_file("./data/users.json")
if not result['success']:
print(f" Failed to read file: {result.get('error', 'Unknown error')}")
await agent.cleanup_session()
return
json_data = result['parse_result']['data']
print(f" Successfully loaded {len(json_data)} records from users.json")
# Step 2: Attempt database insertion
try:
from ambivo_agents import DatabaseAgent
# DatabaseAgent is available - proceed with insertion
db_agent = DatabaseAgent.create_simple(user_id="db_user")
# Connect to MongoDB
await db_agent.chat("connect to mongodb://localhost:27017 database myapp")
# Insert the data
response = await db_agent.chat(f"insert this data into users collection: {json_data}")
print(f" Successfully inserted data into MongoDB: {response}")
await db_agent.cleanup_session()
except ImportError:
# DatabaseAgent not available - provide polite warning and alternatives
print("\n Database insertion not available")
print(" To enable database features: pip install ambivo-agents[database]")
print("\n Available alternatives:")
print(" • File successfully read and parsed")
print(" • Data can be transformed to other formats")
# Show what we can still do
csv_result = await agent.convert_json_to_csv(json_data)
if csv_result['success']:
print(" • Converted to CSV format (available for export)")
await agent.cleanup_session()
# Alternative: Natural language approach with graceful handling
async def natural_language_file_ingestion():
"""Uses natural language commands with automatic fallback"""
try:
from ambivo_agents import DatabaseAgent
agent = DatabaseAgent.create_simple(user_id="user")
# Full database workflow available
await agent.chat("connect to mongodb://localhost:27017 database myapp")
response = await agent.chat("read users.json file and insert all records into users collection")
print(f" Database ingestion completed: {response}")
await agent.cleanup_session()
except ImportError:
# Fallback to file reading only
from ambivo_agents import AssistantAgent
agent = AssistantAgent.create_simple(user_id="user")
print(" DatabaseAgent not installed. Reading file only...")
response = await agent.chat("read and analyze the users.json file structure")
print(f" File analysis: {response}")
print(" Install database support with: pip install ambivo-agents[database]")
await agent.cleanup_session()
Data Analytics
from ambivo_agents import AnalyticsAgent
async def analytics_demo():
agent, context = AnalyticsAgent.create(user_id="analyst_user")
# Load and analyze CSV data
response = await agent.chat("load data from sales.csv and analyze it")
print(f"Analysis: {response}")
# Schema exploration
schema = await agent.chat("show me the schema of the current dataset")
print(f"Schema: {schema}")
# Natural language queries
top_sales = await agent.chat("what are the top 5 products by sales?")
print(f"Top Sales: {top_sales}")
# SQL queries
sql_result = await agent.chat("SELECT region, SUM(sales) as total FROM data GROUP BY region")
print(f"SQL Result: {sql_result}")
# Visualizations
chart = await agent.chat("create a bar chart showing sales by region")
print(f"Chart: {chart}")
await agent.cleanup_session()
# Context Preservation Example
async def context_preservation_demo():
"""Demonstrates context/state preservation between chat messages"""
agent = AnalyticsAgent.create_simple(user_id="user123")
try:
# Load data once
await agent.chat("load data from transactions.xlsx and analyze it")
# Multiple queries without reload - uses cached context
schema = await agent.chat("show schema") # Uses cached data
top_items = await agent.chat("what are the top 5 amounts?") # Uses cached data
summary = await agent.chat("summary statistics") # Uses cached data
counts = await agent.chat("count by category") # Uses cached data
print("All queries executed using cached dataset - no reload needed!")
finally:
await agent.cleanup_session() # Clean up resources
Knowledge Base Operations
from ambivo_agents import KnowledgeBaseAgent
async def knowledge_base_demo():
agent, context = KnowledgeBaseAgent.create(user_id="kb_user")
# Ingest document
result = await agent._ingest_document(
kb_name="company_kb",
doc_path="/path/to/document.pdf",
custom_meta={"department": "HR", "type": "policy"}
)
if result['success']:
# Query the knowledge base
answer = await agent._query_knowledge_base(
kb_name="company_kb",
query="What is the remote work policy?"
)
if answer['success']:
print(f"Answer: {answer['answer']}")
await agent.cleanup_session()
API Integration
from ambivo_agents import APIAgent
from ambivo_agents.agents.api_agent import APIRequest, AuthConfig, HTTPMethod, AuthType
async def api_integration_demo():
agent, context = APIAgent.create(user_id="api_user")
# Basic GET request
request = APIRequest(
url="https://jsonplaceholder.typicode.com/posts/1",
method=HTTPMethod.GET
)
response = await agent.make_api_request(request)
if not response.error:
print(f"Status: {response.status_code}")
print(f"Data: {response.json_data}")
# POST with authentication
auth_config = AuthConfig(
auth_type=AuthType.BEARER,
token="your-api-token"
)
post_request = APIRequest(
url="https://api.example.com/data",
method=HTTPMethod.POST,
auth_config=auth_config,
json_data={"name": "test", "value": "123"}
)
post_response = await agent.make_api_request(post_request)
# Google OAuth2 with pre-fetch
google_auth = AuthConfig(
auth_type=AuthType.BEARER,
pre_auth_url="https://www.googleapis.com/oauth2/v4/token",
pre_auth_method=HTTPMethod.POST,
pre_auth_payload={
"client_id": "your-client-id",
"client_secret": "your-secret",
"refresh_token": "your-refresh-token",
"grant_type": "refresh_token"
},
token_path="access_token"
)
sheets_request = APIRequest(
url="https://sheets.googleapis.com/v4/spreadsheets/sheet-id/values/A1:B10",
method=HTTPMethod.GET,
auth_config=google_auth
)
# APIAgent will automatically fetch access_token first, then make the request
sheets_response = await agent.make_api_request(sheets_request)
await agent.cleanup_session()
# Conversational API usage
async def conversational_api():
agent = APIAgent.create_simple(user_id="chat_user")
# Use natural language for API requests
response = await agent.chat("GET https://jsonplaceholder.typicode.com/users/1")
print(response)
response = await agent.chat(
"POST https://api.example.com/data with headers: {\"Authorization\": \"Bearer token\"} "
"and data: {\"message\": \"Hello API\"}"
)
print(response)
await agent.cleanup_session()
Context Manager Pattern
from ambivo_agents import ModeratorAgent, AgentSession
import asyncio
async def main():
# Auto-cleanup with context manager
async with AgentSession(ModeratorAgent, user_id="sarah") as moderator:
response = await moderator.chat("Download audio from https://youtube.com/watch?v=example")
print(response)
# Moderator automatically cleaned up
asyncio.run(main())
Workflow Examples
from ambivo_agents.core.workflow import WorkflowBuilder
async def custom_workflow():
# Create agents
moderator, context = ModeratorAgent.create(user_id="workflow_demo")
# Build custom workflow
builder = WorkflowBuilder()
builder.add_agent(moderator.specialized_agents['web_search'], "search")
builder.add_agent(moderator.specialized_agents['assistant'], "analyze")
builder.add_edge("search", "analyze")
builder.set_start_node("search")
builder.set_end_node("analyze")
workflow = builder.build()
# Execute workflow
result = await workflow.execute(
"Research AI safety and provide analysis",
context.to_execution_context()
)
print(f"Workflow result: {result.success}")
await moderator.cleanup_session()
Streaming System
The library uses an SSE-aligned StreamChunk system with structured event types for real-time streaming responses.
SSE Event Types
Since v1.4.1, streaming events align with the Server-Sent Events wire format:
from ambivo_agents.core.base import SSEEventType, StreamSubType
# SSE event types (used in StreamChunk.to_dict() as the "type" field):
SSEEventType.START # "stream_start" - Stream initialized
SSEEventType.CONTENT # "stream_chunk" - Text token (append to buffer)
SSEEventType.STATUS # "stream_status" - Progress info (do NOT show in chat)
SSEEventType.COMPLETE # "stream_complete" - Final answer + metadata
SSEEventType.ERROR # "stream_error" - Error event
SSEEventType.CANCELLED # "stream_cancelled" - Stream was cancelled
SSEEventType.KEEPALIVE # "keepalive" - Connection keep-alive (ignore)
# Legacy StreamSubType (still supported for backward compatibility):
StreamSubType.CONTENT # Maps to SSEEventType.CONTENT
StreamSubType.STATUS # Maps to SSEEventType.STATUS
StreamSubType.RESULT # Maps to SSEEventType.COMPLETE
StreamSubType.ERROR # Maps to SSEEventType.ERROR
StreamSubType.METADATA # Maps to SSEEventType.STATUS
Basic Streaming Usage
from ambivo_agents import ModeratorAgent
from ambivo_agents.core.base import StreamSubType
async def streaming_example():
moderator, context = ModeratorAgent.create(user_id="stream_user")
async for chunk in moderator.chat_stream("Search for Python tutorials"):
if chunk.sub_type == StreamSubType.CONTENT:
print(chunk.text, end='', flush=True)
elif chunk.sub_type == StreamSubType.STATUS:
print(f"\n[{chunk.text.strip()}]", end='', flush=True)
elif chunk.sub_type == StreamSubType.ERROR:
print(f"\n[ERROR: {chunk.text}]", end='', flush=True)
await moderator.cleanup_session()
Streaming in Web APIs (SSE Format)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
moderator, context = ModeratorAgent.create(user_id=request.user_id)
async def generate_stream():
# Send session event
yield f"event: session\ndata: {context.session_id}\n\n"
async for chunk in moderator.chat_stream(request.message):
# to_dict() returns SSE-aligned type field
yield f"data: {json.dumps(chunk.to_dict())}\n\n"
yield "event: done\ndata:\n\n"
return StreamingResponse(generate_stream(), media_type="text/event-stream")
SSE Wire Format
The StreamChunk.to_dict() method produces JSON matching the SSE protocol:
{"type": "stream_start", "text": "", "sub_type": "status", "metadata": {}, "timestamp": "..."}
{"type": "stream_chunk", "text": "The answer is", "sub_type": "content", "metadata": {}, "timestamp": "..."}
{"type": "stream_status", "text": "Querying knowledge base...", "sub_type": "status", "metadata": {}, "timestamp": "..."}
{"type": "stream_complete", "text": "", "sub_type": "result", "metadata": {"complete": true}, "timestamp": "..."}
{"type": "stream_error", "text": "Error message", "sub_type": "error", "metadata": {}, "timestamp": "..."}
Critical buffering rule: buffer += data.text -- never trim, strip, or add spaces between chunks.
Frontend Integration
async function streamChat(baseUrl, message, sessionId, callbacks) {
const response = await fetch(`${baseUrl}/chat/stream`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, session_id: sessionId }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let partial = '';
let textBuffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
partial += decoder.decode(value, { stream: true });
const events = partial.split('\n\n');
partial = events.pop();
for (const block of events) {
const lines = block.split('\n');
let eventName = null, dataLine = null;
for (const line of lines) {
if (line.startsWith('event: ')) eventName = line.slice(7).trim();
else if (line.startsWith('data: ')) dataLine = line.slice(6);
}
if (eventName === 'session') { callbacks.onSession?.(dataLine.trim()); continue; }
if (eventName === 'done') return;
if (!dataLine) continue;
const data = JSON.parse(dataLine);
switch (data.type) {
case 'stream_chunk': textBuffer += data.text; callbacks.onChunk?.(textBuffer, data.text); break;
case 'stream_complete': callbacks.onComplete?.(data.text || textBuffer, data.metadata); break;
case 'stream_error': callbacks.onError?.(data.text); break;
case 'stream_status': callbacks.onStatus?.(data.text); break;
}
}
}
}
Session Management
Understanding Session vs Conversation IDs
The library uses two identifiers for context management:
- session_id: Represents a broader user session or application context
- conversation_id: Represents a specific conversation thread within a session
# Single conversation (most common)
moderator, context = ModeratorAgent.create(
user_id="john",
session_id="user_john_main",
conversation_id="user_john_main" # Same as session_id
)
# Multiple conversations per session
session_key = "user_john_tenant_abc"
# Conversation 1: Data Analysis
moderator1, context1 = ModeratorAgent.create(
user_id="john",
session_id=session_key,
conversation_id="john_data_analysis_conv"
)
# Conversation 2: YouTube Downloads
moderator2, context2 = ModeratorAgent.create(
user_id="john",
session_id=session_key,
conversation_id="john_youtube_downloads_conv"
)
Web API Integration
from ambivo_agents import ModeratorAgent
import asyncio
import time
class ChatAPI:
def __init__(self):
self.active_moderators = {}
async def chat_endpoint(self, request_data):
user_message = request_data.get('message', '')
user_id = request_data.get('user_id', f"user_{uuid.uuid4()}")
session_id = request_data.get('session_id', f"session_{user_id}_{int(time.time())}")
try:
if session_id not in self.active_moderators:
moderator, context = ModeratorAgent.create(
user_id=user_id,
session_id=session_id
)
self.active_moderators[session_id] = moderator
else:
moderator = self.active_moderators[session_id]
response_content = await moderator.chat(user_message)
return {
'success': True,
'response': response_content,
'session_id': session_id,
'timestamp': time.time()
}
except Exception as e:
return {
'success': False,
'error': str(e),
'timestamp': time.time()
}
async def cleanup_session(self, session_id):
if session_id in self.active_moderators:
await self.active_moderators[session_id].cleanup_session()
del self.active_moderators[session_id]
Command Line Interface
# Interactive mode with auto-routing
ambivo-agents
# Single queries
ambivo-agents -q "Download audio from https://youtube.com/watch?v=example"
ambivo-agents -q "Search for latest AI trends"
ambivo-agents -q "Load data from sales.csv and show top products"
ambivo-agents -q "Extract audio from video.mp4"
# Check agent status
ambivo-agents status
# Test all agents
ambivo-agents --test
# Debug mode
ambivo-agents --debug -q "test query"
Architecture
ModeratorAgent Architecture
The ModeratorAgent acts as an intelligent orchestrator:
[User Query]
↓
[ModeratorAgent] ← Analyzes intent and context
↓
[Intent Analysis] ← Uses LLM + patterns + keywords
↓
[Route Selection] ← Chooses best agent(s)
↓
[Specialized Agent] ← YouTubeAgent, SearchAgent, etc.
↓
[Response] ← Combined and contextualized
↓
[User]
Workflow Architecture
[WorkflowBuilder] → [Workflow Definition]
↓ ↓
[Workflow Executor] → [Sequential/Parallel Execution]
↓ ↓
[State Management] → [Persistent Checkpoints]
↓ ↓
[Result Aggregation] → [Final Response]
Memory System
- Redis-based persistence with compression and caching
- Built-in conversation history for every agent
- Session-aware context with automatic cleanup
- Multi-session support with isolation
LLM Provider Management
- Automatic failover between OpenAI, Anthropic, AWS Bedrock
- Rate limiting and error handling
- Provider rotation based on availability and performance
Docker Setup
Consolidated Docker File Sharing System
Ambivo Agents uses a consolidated Docker-shared directory structure for all file operations across agents. This provides consistent, secure, and efficient file sharing between your host filesystem and Docker containers.
Consolidated Directory Structure
All Docker-based agents (AnalyticsAgent, MediaEditorAgent, CodeExecutorAgent, WebScraperAgent, APIAgent) use the same base structure:
Your Project Directory/
docker_shared/ # Consolidated base directory
input/ # Read-only input files
analytics/ → /docker_shared/input/analytics (AnalyticsAgent)
media/ → /docker_shared/input/media (MediaEditorAgent)
code/ → /docker_shared/input/code (CodeExecutorAgent)
scraper/ → /docker_shared/input/scraper (WebScraperAgent)
output/ # Read-write output files
analytics/ → /docker_shared/output/analytics (Analysis results)
media/ → /docker_shared/output/media (Processed media)
code/ → /docker_shared/output/code (Code execution results)
scraper/ → /docker_shared/output/scraper (Scraped data)
temp/ # Read-write temporary workspace
analytics/ → /docker_shared/temp/analytics (Analytics temp files)
media/ → /docker_shared/temp/media (Media processing temp)
code/ → /docker_shared/temp/code (Code execution temp)
handoff/ # Read-write inter-agent file sharing
database/ → /docker_shared/handoff/database (Database exports)
analytics/ → /docker_shared/handoff/analytics (Analytics results)
media/ → /docker_shared/handoff/media (Media for processing)
work/ → /docker_shared/work # General workspace
How the System Works
When you request operations like "convert data.csv to xlsx" or "process video.mp4":
- File Detection: System detects file paths in your request
- Directory Setup: Auto-creates agent-specific subdirectories in
./docker_shared/ - File Copying: Copies your files to appropriate input directories
- Docker Execution: Runs containers with consistent
/docker_shared/mount points - Result Retrieval: Outputs appear in agent-specific output directories
Inter-Agent Workflows
The consolidated structure enables seamless workflows between agents:
Database → Analytics Workflow:
1. DatabaseAgent exports data → ./docker_shared/handoff/database/export.csv
2. AnalyticsAgent automatically → reads from /docker_shared/handoff/database/
3. AnalyticsAgent processes data → outputs to /docker_shared/output/analytics/
4. Results available at → ./docker_shared/output/analytics/results.json
Agent Handoff Mechanism
Output vs Handoff Directory Logic:
The system uses two distinct destination directories based on the agent's role in the workflow:
-
output/directories: For final deliverables - when an agent produces end results for user consumption- MediaEditorAgent image/video processing →
docker_shared/output/media/ - AnalyticsAgent charts and reports →
docker_shared/output/analytics/ - CodeExecutorAgent script results →
docker_shared/output/code/
- MediaEditorAgent image/video processing →
-
handoff/directories: For inter-agent communication - when one agent needs to pass data to another agent- DatabaseAgent exports for AnalyticsAgent →
docker_shared/handoff/database/ - WebScraperAgent data for KnowledgeBaseAgent →
docker_shared/handoff/scraper/ - MediaEditorAgent processed files for further processing →
docker_shared/handoff/media/
- DatabaseAgent exports for AnalyticsAgent →
Decision Logic:
- Terminal operations (user-requested, final results) →
output/ - Workflow operations (agent-to-agent data passing) →
handoff/
The handoff system uses the handoff_subdir parameter to enable seamless file transfers between agents:
DatabaseAgent → AnalyticsAgent Handoff:
# DatabaseAgent automatically exports to handoff directory
result = await db_agent.chat("export sales data for analytics",
handoff_subdir="sales_analysis_2024")
# Creates: ./docker_shared/handoff/database/sales_analysis_2024/
# AnalyticsAgent automatically detects handoff files
analytics_result = await analytics_agent.chat("analyze sales data",
handoff_subdir="sales_analysis_2024")
# Reads from: ./docker_shared/handoff/database/sales_analysis_2024/
Handoff Directory Management:
- Automatic creation: Subdirectories created based on
handoff_subdirparameter - File naming:
{agent_type}_{timestamp}_{operation}.{ext} - Cleanup: Handoff files older than 24 hours automatically removed
- Thread-safe: Multiple concurrent handoffs supported
- Cross-platform: Works consistently across Windows, macOS, and Linux
Configuration in agent_config.yaml:
docker:
shared_base_dir: "./docker_shared"
agent_subdirs:
database: ["handoff/database"]
analytics: ["input/analytics", "output/analytics", "temp/analytics", "handoff/analytics"]
media: ["input/media", "output/media", "temp/media", "handoff/media"]
Enhanced Fallback (CSV→XLSX Conversion):
1. User: "convert sales.csv to xlsx"
2. ModeratorAgent detects file operation need
3. Copies sales.csv → ./docker_shared/input/code/sales.csv
4. CodeExecutorAgent processes → from /docker_shared/input/code/sales.csv
5. Outputs converted file → to /docker_shared/output/code/sales.xlsx
6. User accesses result at → ./docker_shared/output/code/sales.xlsx
Media Processing Workflow:
1. User places video → ./docker_shared/input/media/input.mp4
2. MediaEditorAgent processes → from /docker_shared/input/media/input.mp4
3. Outputs processed file → to /docker_shared/output/media/output.mp3
4. User gets result from → ./docker_shared/output/media/output.mp3
Third-Party Developer Integration
For developers building custom agents:
from ambivo_agents.core import get_shared_manager
# Get the consolidated shared manager
shared_manager = get_shared_manager()
# Prepare environment for your custom agent
input_path, output_path = shared_manager.prepare_agent_environment(
agent="my_custom_agent",
input_files=["./my_data.csv"]
)
# Get Docker volume configuration
volumes = shared_manager.get_docker_volumes()
# volumes = {
# '/path/to/docker_shared/input': {'bind': '/docker_shared/input', 'mode': 'ro'},
# '/path/to/docker_shared/output': {'bind': '/docker_shared/output', 'mode': 'rw'},
# # ... other mounts
# }
# In your Docker container, access files at:
# - Input: /docker_shared/input/my_custom_agent/
# - Output: /docker_shared/output/my_custom_agent/
# - Temp: /docker_shared/temp/my_custom_agent/
# - Handoff: /docker_shared/handoff/my_custom_agent/
# After processing, check results:
output_files = shared_manager.list_outputs("my_custom_agent")
latest_output = shared_manager.get_latest_output("my_custom_agent", ".xlsx")
Example Usage
import asyncio
from ambivo_agents import ModeratorAgent
async def process_files_with_consolidated_structure():
# Create moderator with auto-routing
moderator, context = ModeratorAgent.create(user_id="file_processor")
# File operations use consolidated Docker structure
await moderator.chat("convert sales_data.csv to xlsx format") # → ./docker_shared/output/code/
await moderator.chat("extract audio from video.mp4 as MP3") # → ./docker_shared/output/media/
await moderator.chat("analyze customer_data.csv and chart") # → ./docker_shared/output/analytics/
# All results organized by agent type in docker_shared/output/
await moderator.cleanup_session()
# Run the example
asyncio.run(process_files_with_consolidated_structure())
File Locations After Operations
# Directory structure after various operations
your-project/
sales_data.csv # Your original files
video.mp4
customer_data.csv
docker_shared/ # Consolidated results
output/
code/
sales_data.xlsx # CSV→XLSX conversion
media/
video_audio.mp3 # Audio extraction
analytics/
analysis_report.json # Data analysis
customer_charts.png # Generated charts
Configuration
The consolidated structure is configured in agent_config.yaml:
docker:
shared_base_dir: "./docker_shared" # Host base directory
work_dir: "/opt/ambivo/work_dir" # Container working directory
agent_subdirs:
analytics: ["input/analytics", "output/analytics", "temp/analytics", "handoff/analytics"]
media: ["input/media", "output/media", "temp/media", "handoff/media"]
code: ["input/code", "output/code", "temp/code", "handoff/code"]
database: ["handoff/database"]
scraper: ["output/scraper", "temp/scraper", "handoff/scraper"]
Third-Party Developer Project Structure
When developers install ambivo-agents via pip install ambivo-agents, the Docker shared directory is created relative to their project root. Here's how the directory structure would look:
my-ai-project/ # Third-party developer's project
main.py # Their application code
requirements.txt # Including ambivo-agents
agent_config.yaml # Their configuration file
data/ # Their project data
input_files.csv
documents.pdf
docker_shared/ # Auto-created by ambivo-agents
input/ # Container read-only mounts
analytics/ # For data analysis tasks
uploaded_data.csv
media/ # For media processing
video_to_process.mp4
code/ # For code execution
user_script.py
output/ # Container write-enabled results
analytics/ # Analysis results
report.json
charts.png
media/ # Processed media
audio_extracted.mp3
compressed_video.mp4
code/ # Code execution results
execution_results.txt
temp/ # Temporary files during processing
analytics/
media/
code/
handoff/ # Cross-agent file sharing
analytics/ # Database → Analytics
database/ # Database exports
media/ # Media processing handoffs
scraper/ # Web scraper results
work/ # Container workspace
venv/ # Their virtual environment
lib/python3.x/site-packages/
ambivo_agents/ # Installed package
Environment Variable Configuration:
Developers can customize the shared directory location:
# In their .env or environment
export AMBIVO_AGENTS_DOCKER_SHARED_BASE_DIR="/custom/path/shared"
Example Usage in Developer's Code:
# my-ai-project/main.py
from ambivo_agents.agents.analytics import AnalyticsAgent
from ambivo_agents.agents.moderator import ModeratorAgent
# Create agents - they automatically use configured shared directory
moderator = ModeratorAgent.create_simple(user_id="developer123")
# Process data - files are managed in docker_shared/
response = await moderator.chat("analyze the sales data in my CSV file")
# The docker_shared/ directory is automatically created and managed
# Input files are accessible at docker_shared/input/analytics/
# Results appear in docker_shared/output/analytics/
Benefits for Third-Party Developers:
- Isolated: Each project gets its own
docker_shared/directory - Portable: Directory structure is relative to project root
- Configurable: Can be customized via environment variables
- Auto-managed: Created and organized automatically
- Secure: Container access is properly restricted
Security & Permissions
- Input Security: All input directories mounted read-only (
ro) - Output Isolation: Each agent has isolated output directories
- Network Isolation: Docker containers run with
network_disabled=True - Memory Limits: Configurable memory restrictions per agent
- Auto-Cleanup: Temporary files cleaned based on age (configurable)
- Permission Control: Directory permissions managed automatically
File Access Security Configuration
Restricted Directories Protection:
The system includes built-in protection against accessing sensitive system directories:
# agent_config.yaml
security:
file_access:
restricted_directories:
- "/etc" # System configuration
- "/root" # Root user directory
- "/var/log" # System logs
- "/proc" # Process information
- "/sys" # System information
- "/dev" # Device files
- "/boot" # Boot files
- "~/.ssh" # SSH keys
- "~/.aws" # AWS credentials
- "~/.config" # User configuration
- "/usr/bin" # System binaries
- "/usr/sbin" # System admin binaries
Environment Variable Configuration:
# Alternative to YAML configuration
export AMBIVO_AGENTS_FILE_ACCESS_RESTRICTED_DIRS="/etc,/var/log,/sys,/proc,/dev"
How Restricted Directories Work:
- Path Resolution: Uses
Path.expanduser().resolve()for proper path handling - Security by Default: Common sensitive directories blocked by default
- Symbolic Link Protection: Resolves symbolic links to prevent bypass attempts
- Cross-Platform: Works on Windows, macOS, and Linux
- Agent Coverage: Protects both
BaseAgent.read_file()andDatabaseAgent.ingest_file_to_mongodb()
Example Usage:
from ambivo_agents import DatabaseAgent
# This will be blocked by default security settings
result = await db_agent.chat("ingest data from /etc/passwd")
# Returns: {"success": False, "error": "Access denied: File path '/etc/passwd' is in a restricted directory"}
# This works normally (assuming file exists)
result = await db_agent.chat("ingest data from ./data/users.csv")
# Returns: {"success": True, ...}
Security Best Practices:
- Always use the default restricted directories in production
- Add custom restricted paths for your specific environment
- Test security settings before deployment
- Monitor access attempts to restricted directories
- Regular audits of file access patterns
Monitoring & Maintenance
from ambivo_agents.core import get_shared_manager
shared_manager = get_shared_manager()
# Monitor disk usage
usage = shared_manager.get_disk_usage()
print(f"Total usage: {usage['total_bytes'] / (1024**3):.2f} GB")
# Cleanup old temporary files
cleaned_count = shared_manager.cleanup_temp_files(max_age_hours=24)
print(f"Cleaned {cleaned_count} temporary files")
# List outputs for specific agent
output_files = shared_manager.list_outputs("analytics")
print(f"Analytics outputs: {output_files}")
Supported File Types & Detection
The system automatically detects file paths in natural language and supports:
Data Files: .csv, .xlsx, .xls, .json, .xml, .parquet
Media Files: .mp4, .avi, .mov, .mp3, .wav, .flac
Text Files: .txt, .md, .log, .py, .js, .sql
Documents: .pdf (read-only)
# These requests automatically trigger file sharing:
"convert data.csv to xlsx" → Detects: data.csv → ./docker_shared/input/code/
"extract audio from video.mp4" → Detects: video.mp4 → ./docker_shared/input/media/
"analyze quarterly_report.xlsx" → Detects: quarterly_report.xlsx → ./docker_shared/input/analytics/
"scrape data from website" → No file detected → ./docker_shared/output/scraper/
Docker Image Configuration
Default Image: sgosain/amb-ubuntu-python-public-pod
Custom Docker Image for Consolidated Structure:
FROM sgosain/amb-ubuntu-python-public-pod
# Install additional packages for your use case
RUN pip install openpyxl xlsxwriter plotly seaborn
# Create consolidated mount points
RUN mkdir -p /docker_shared/{input,output,temp,handoff,work}
# Add custom scripts that work with consolidated structure
COPY your-scripts/ /opt/scripts/
# Set working directory
WORKDIR /docker_shared/work
# Example script that uses consolidated paths
RUN echo '#!/bin/bash\n\
echo "Input files: $(ls -la /docker_shared/input/)"\n\
echo "Output directory: /docker_shared/output/"\n\
echo "Temp directory: /docker_shared/temp/"\n\
echo "Handoff directory: /docker_shared/handoff/"' > /opt/scripts/show_structure.sh
RUN chmod +x /opt/scripts/show_structure.sh
Troubleshooting
Directory Issues:
# Check if docker_shared structure exists
ls -la docker_shared/
# Verify agent subdirectories
ls -la docker_shared/output/
File Access Issues:
# Check permissions
chmod 755 docker_shared/
find docker_shared/ -type d -exec chmod 755 {} \;
# Verify Docker can access the directory
docker run --rm -v $(pwd)/docker_shared:/docker_shared alpine ls -la /docker_shared
Volume Mount Issues:
# Test consolidated volume mounting
docker run --rm \
-v $(pwd)/docker_shared/input:/docker_shared/input:ro \
-v $(pwd)/docker_shared/output:/docker_shared/output:rw \
alpine ls -la /docker_shared/
Benefits of Consolidated Structure
Consistency: All agents use the same directory structure Inter-Agent Workflows: Seamless file handoffs between agents Security: Proper read-only/read-write permissions Organization: Files organized by agent and purpose Monitoring: Centralized disk usage and cleanup Third-Party Integration: Easy for custom agent development Auto-Management: Directories created and managed automatically
Running Without Docker (Containerized Deployment)
When deploying ambivo_agents inside the sgosain/amb-ubuntu-python-public-pod Docker image (e.g., on Railway, Render, or similar platforms that don't support Docker-in-Docker), you can disable Docker executor spawning. The image already contains all required dependencies (ffmpeg, pytubefix, playwright, pandas, duckdb, etc.).
Configuration
Via YAML (agent_config.yaml):
docker:
use_docker: false
shared_base_dir: "./docker_shared" # still used for file organization
# ... rest of docker config remains the same
Via Environment Variable:
export AMBIVO_AGENTS_DOCKER_USE_DOCKER=false
How It Works
| Mode | use_docker |
What Happens |
|---|---|---|
| Docker (default) | true |
Agents spawn Docker containers using sgosain/amb-ubuntu-python-public-pod — requires a Docker daemon |
| Local | false |
Agents use local subprocess execution (Python, Bash, ffmpeg, pytubefix) — no Docker daemon needed |
When use_docker: false, each agent uses a local executor:
| Agent | Docker Executor | Local Executor | Host Requirement |
|---|---|---|---|
| CodeExecutorAgent | DockerCodeExecutor |
LocalCodeExecutor |
Python, Bash |
| MediaEditorAgent | MediaDockerExecutor |
MediaLocalExecutor |
ffmpeg, ffprobe |
| YouTubeDownloadAgent | YouTubeDockerExecutor |
YouTubeLocalExecutor |
pytubefix |
| AnalyticsAgent | DockerCodeExecutor |
LocalCodeExecutor |
Python, pandas |
| WebScraperAgent | SimpleDockerExecutor |
Falls back to local playwright/requests | playwright or requests |
Important Notes
- Default is
true— existing deployments are completely unaffected. - This is intended for running inside the fat Docker image where all dependencies already exist, not for bare developer machines.
- The
docker_shared/directory structure is used by both modes for consistent file organization and cross-agent handoffs. - The
code_execution_policysafety layer applies to both Docker and local execution forLocalCodeExecutor. - Local executors raise clear errors if dependencies are missing (e.g.,
"ffmpeg not found","pytubefix not installed").
Typical Railway Deployment
# Use the fat image as base — all agent deps included
FROM sgosain/amb-ubuntu-python-public-pod:latest
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# No Docker daemon needed
ENV AMBIVO_AGENTS_DOCKER_USE_DOCKER=false
CMD ["gunicorn", "your_app:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
Troubleshooting
Common Issues
-
Redis Connection Failed
# Check if Redis is running redis-cli ping # Should return "PONG"
-
Docker Not Available
# Check Docker is running docker ps
-
Agent Creation Errors
from ambivo_agents import ModeratorAgent try: moderator, context = ModeratorAgent.create(user_id="test") print(f"Success: {context.session_id}") await moderator.cleanup_session() except Exception as e: print(f"Error: {e}")
-
Import Errors
python -c "from ambivo_agents import ModeratorAgent; print('Import success')"
Debug Mode
Enable verbose logging:
service:
log_level: "DEBUG"
log_to_file: true
Security Considerations
- Docker Isolation: All code execution happens in isolated containers
- Network Restrictions: Containers run with
network_disabled=Trueby default - Resource Limits: Memory and CPU limits prevent resource exhaustion
- API Key Management: Store sensitive keys in environment variables
- Input Sanitization: All user inputs are validated and sanitized
- Session Isolation: Each agent session is completely isolated
Contributing
We welcome contributions! Please see our Contributing Guidelines for details.
Development Setup
# Clone repository
git clone https://github.com/ambivo-corp/ambivo-agents.git
cd ambivo-agents
# Install in development mode (base only)
pip install -e .
# Install with all extras (recommended for local development)
pip uninstall ambivo-agents -y && pip install -e ".[all]"
# Test ModeratorAgent
python -c "
from ambivo_agents import ModeratorAgent
import asyncio
async def test():
moderator, context = ModeratorAgent.create(user_id='test')
response = await moderator.chat('Hello!')
print(f'Response: {response}')
await moderator.cleanup_session()
asyncio.run(test())
"
Publishing to PyPI
Build & Upload
# Clean previous builds
rm -rf dist/ build/
# Build — IMPORTANT: use setuptools < 75 to produce Metadata-Version 2.1.
# setuptools >= 75 generates Metadata-Version 2.4 which adds a `license-file`
# field that twine 6.x rejects ("unrecognized or malformed field 'license-file'")
# and twine 5.x rejects ("Metadata is missing required fields: Name, Version").
# Pinning setuptools below 75 avoids this entirely.
pip install 'setuptools>=61.0,<75' wheel
python -m build --no-isolation
# Verify metadata is 2.1 (not 2.4)
python -c "from pkginfo import Wheel; w = Wheel('dist/ambivo_agents-*.whl'); print(w.metadata_version)"
# Expected: 2.1
# Upload to PyPI
python -m twine upload dist/*
Tag & Push
# Tag the release
git tag -a v1.x.x -m "v1.x.x - Description"
git push origin main --tags
Why setuptools < 75?
setuptools >= 75 produces Metadata-Version: 2.4 which includes a license-file field. This field is not recognized by current versions of twine (both 5.x and 6.x), causing upload failures:
- twine 5.x:
InvalidDistribution: Metadata is missing required fields: Name, Version(pkginfo can't parse 2.4) - twine 6.x:
InvalidDistribution: Invalid distribution metadata: unrecognized or malformed field 'license-file'
Building with setuptools < 75 produces Metadata-Version: 2.1 which is fully compatible. The --no-isolation flag ensures the locally installed setuptools is used instead of an auto-downloaded latest version.
License
MIT License - see LICENSE file for details.
Author
Hemant Gosain 'Sunny'
- Company: Ambivo
- Email: info@ambivo.com
Support
- Email: info@ambivo.com
- Website: https://www.ambivo.com
- Issues: GitHub Issues
Attributions & Third-Party Technologies
This project leverages several open-source libraries and commercial services:
Core Technologies
- Docker: Container runtime for secure code execution
- Redis: In-memory data store for session management
- Python: Core programming language
AI/ML Frameworks
- OpenAI: GPT models and API services
- Anthropic: Claude models and API services
- AWS Bedrock: Cloud-based AI model access
- LangChain: Framework for building AI applications (by LangChain, Inc.)
- LlamaIndex: Data framework for LLM applications (by Jerry Liu)
- Hugging Face: Model hub and transformers library
Data Processing
- pandas: Data analysis and manipulation library
- DuckDB: In-process SQL OLAP database
- Qdrant: Vector database for semantic search
- tabulate: ASCII table formatting library
Media & Web
- FFmpeg: Multimedia processing framework
- YouTube: Video platform (via public APIs)
- pytubefix: YouTube video downloader library
- Brave Search: Web search API service
- Beautiful Soup: HTML/XML parsing library
Development Tools
- pytest: Testing framework
- black: Code formatting tool
- Docker Hub: Container image repository
Legal Disclaimer
IMPORTANT: This software is provided "as is" without warranty of any kind. Users are responsible for:
- API Compliance: Ensuring compliance with all third-party service terms (OpenAI, Anthropic, AWS, YouTube, etc.)
- Data Privacy: Protecting user data and complying with applicable privacy laws
- Usage Limits: Respecting rate limits and usage policies of external services
- Security: Implementing appropriate security measures for production use
- Licensing: Ensuring compliance with all third-party library licenses
The authors and contributors are not liable for any damages arising from the use of this software. Users should thoroughly test and validate the software before production deployment.
Third-Party Services: This library integrates with external services that have their own terms of service, privacy policies, and usage limitations. Users must comply with all applicable terms.
Web Scraping & Content Access: Users must practice ethical web scraping by respecting robots.txt, rate limits, and website terms of service. YouTube content access must comply with YouTube's Terms of Service and API policies - downloading copyrighted content without permission is prohibited.
Developed by the Ambivo team.
Query Across Multiple Knowledge Bases
You can query multiple knowledge bases by passing kb_names via metadata on either the ExecutionContext or the AgentMessage. The agent accepts kb_names as a list of strings, a list of dicts ({kb_name, description}), or a JSON string.
from ambivo_agents.agents.knowledge_base import KnowledgeBaseAgent
from ambivo_agents.core.base import ExecutionContext, AgentMessage
import asyncio
async def demo():
agent = KnowledgeBaseAgent()
# 1) Via ExecutionContext.metadata — list of strings
ctx = ExecutionContext(
user_id="u",
session_id="s",
conversation_id="c",
metadata={"kb_names": ["product_docs", "engineering_wiki"]},
)
resp = await agent.process_message("What changed in v2.0 API?", context=ctx)
print(resp.content)
print(resp.metadata) # includes used_kbs, primary_kb, sources_dict
# 2) Via AgentMessage.metadata — list of dicts
msg = AgentMessage(
id="m1",
sender_id="u",
recipient_id=agent.agent_id,
content="Summarize PTO policy.",
metadata={
"kb_names": [
{"kb_name": "hr_policies", "description": "HR docs"},
{"kb_name": "employee_handbook"},
]
},
)
resp2 = await agent.process_message(msg)
print(resp2.content)
print(resp2.metadata)
asyncio.run(demo())
See examples/knowledge_base_multiple.py for a more complete example, including passing kb_names as a JSON string.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ambivo_agents-1.4.21.tar.gz.
File metadata
- Download URL: ambivo_agents-1.4.21.tar.gz
- Upload date:
- Size: 451.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2bdf4801d313b1c6bbee0f341cb785349411bca84e9daa0ac62d50954963d3b3
|
|
| MD5 |
e3a72144f3ee9dabadd38c6a03fe04f1
|
|
| BLAKE2b-256 |
1e51818519b82c2d757d370804c0e885f26d4bcfc41bfc21ea8058b2b7870efe
|
File details
Details for the file ambivo_agents-1.4.21-py3-none-any.whl.
File metadata
- Download URL: ambivo_agents-1.4.21-py3-none-any.whl
- Upload date:
- Size: 414.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3bc474219f63d8bfa9a33441273d07d3bc2b5d3d83aad690962332b114922c9e
|
|
| MD5 |
10321b814dc082a1266d25970cb6cdac
|
|
| BLAKE2b-256 |
bc9cff6373a986f5a3f5747732c4a997497ee9930aa60e45d00cd51f3d4deb00
|