Domain-agnostic agent framework for integrating AI agents into data pipelines
Project description
SOTA Agent - Universal Agent Workflow Template
A generic, production-ready template for integrating AI agents into any application or data pipeline.
๐ฏ This is a TEMPLATE - Use it to build agent workflows for any domain!
Originally designed for fraud detection, this architecture template applies to any domain requiring AI agent integration:
- ๐ Fraud Detection & Risk Analysis
- ๐ฌ Customer Support & Chatbots
- ๐ Content Moderation & Policy Enforcement
- ๐ฅ Healthcare & Diagnosis Support
- ๐ Data Quality & Anomaly Detection
- ๐ Analytics & Report Generation
- ๐ค Any Agent-Powered Workflow
๐ Quick Start
Installation
# Basic installation
pip install sota-agent-framework
# With optional features
pip install sota-agent-framework[mcp] # MCP tool calling
pip install sota-agent-framework[ray] # Distributed execution
pip install sota-agent-framework[databricks] # Databricks integration
pip install sota-agent-framework[optimization] # DSPy + TextGrad
pip install sota-agent-framework[all] # Everything
# Or install from GitHub
pip install git+https://github.com/somasekar278/universal-agent-template.git
Generate Your First Project
# Generate a complete project for your domain
sota-generate --domain "your_domain" --output ./your-project
# Navigate and run
cd your-project
python examples/example_usage.py # Works immediately! โ
For Contributors/Development
If you're cloning the repo to contribute:
git clone https://github.com/somasekar278/universal-agent-template.git
cd universal-agent-template
./setup.sh # or setup.bat on Windows
python template_generator.py --domain "test"
Path 2: Integrate Into Existing Code (3 lines)
from agents import AgentRouter
router = AgentRouter.from_yaml("config/agents.yaml") # 1. Load
result = await router.route("your_agent", input_data) # 2. Execute
# That's it! ๐
๐ See Getting Started Guide for detailed 5-minute guide
๐งช Benchmark Your Agents
The framework includes a production-grade evaluation suite for comprehensive agent testing:
# Install with benchmarking support
pip install sota-agent-framework[dev]
# Run benchmarks
sota-benchmark run --suite fraud --agents all --report md
# View auto-generated leaderboard
cat benchmark_results/leaderboard.md
Features:
- โ Multi-metric evaluation (tool calls, planning, hallucination, latency, coherence, accuracy)
- ๐ Auto-generated leaderboards ranking agents
- ๐ Multiple report formats (Markdown, JSON, HTML)
- ๐ Regression testing for CI/CD
- โก Parallel execution for fast evaluation
๐ See Benchmarking Guide for complete documentation
๐ง Agent-Governed Memory System
Intelligent memory management where agents decide what to store, retrieve, and forget:
from memory import MemoryManager, MemoryType, MemoryImportance
# Initialize memory
memory = MemoryManager()
# Agent stores (auto-detects importance and type)
await memory.store(
content="User prefers dark mode at night",
importance=MemoryImportance.HIGH
)
# Agent retrieves with semantic search
memories = await memory.retrieve(
query="What are user preferences?",
strategy="hybrid" # semantic + recency + importance
)
# Agent reflects and consolidates
summary = await memory.reflect()
# Agent forgets old data
forgotten = await memory.forget()
Features:
- ๐ง 5 Memory Types - Short-term, long-term, episodic, semantic, procedural
- ๐ Semantic Search - Vector embeddings for similarity-based retrieval
- ๐ค Reflection - Agents create insights and summaries from memories
- โฐ Smart Forgetting - Time/importance/capacity-based policies
- ๐ Memory Graphs - Track relationships and patterns
- ๐ฌ Context Budgeting - Automatic token management for LLMs
- ๐ค Shared Memory - Private and shared memory spaces across agents
๐ See Memory System Guide for complete documentation
๐ฏ Reasoning Optimization
Advanced reasoning optimization for continuously improving agents:
from reasoning import ReasoningOptimizer, TrajectoryOptimizer, CoTDistiller
# Initialize optimizer
optimizer = ReasoningOptimizer(agent)
# Optimize execution
result = await optimizer.optimize(input_data)
# Learn from execution
await optimizer.learn_from_execution(
trajectory=execution_trajectory,
reasoning_chain=agent_reasoning,
reward=0.85 # Reward signal
)
# Get optimization report
report = optimizer.get_optimization_report()
Features:
- ๐ Trajectory Optimization - Learn optimal action sequences from past executions
- ๐ CoT Distillation - Compress reasoning chains (50%+ token savings)
- ๐ Feedback Loops - Critique โ Revise โ Retry for self-improvement
- ๐ก๏ธ Policy Constraints - Enforce safety, cost, and latency guardrails
- ๐ RL-Style Tuning - Optimize hyperparameters via reward signals
๐ See Reasoning Optimization Guide for complete documentation
๐ฏ Prompt Optimization (DSPy + TextGrad)
Advanced prompt optimization using DSPy for task prompts and TextGrad for system prompts:
from optimization import PromptOptimizer, OptimizationPipeline
# Initialize optimizer
optimizer = PromptOptimizer()
# Optimize system prompt with TextGrad
system_result = await optimizer.optimize(
prompt="You are a fraud detection expert.",
prompt_type="system",
evaluation_data=eval_data,
objective="Maximize accuracy while being concise"
)
# Optimize task prompt with DSPy
task_result = await optimizer.optimize(
prompt="Classify the transaction",
prompt_type="task",
training_data=train_data,
task="fraud_detection"
)
# Run full optimization pipeline
pipeline = OptimizationPipeline()
result = await pipeline.run(
agent_config=agent_config,
training_data=train_data,
evaluation_data=eval_data,
stages=["system", "task", "test"]
)
# A/B test variants
from optimization import ABTestFramework
framework = ABTestFramework()
test_result = await framework.run_test(
variants=[baseline, optimized],
test_data=test_cases
)
Features:
- ๐ DSPy Integration - Few-shot learning for task prompts
- ๐ TextGrad Optimization - Gradient-based system prompt refinement
- ๐ Multi-Stage Pipelines - System โ Task โ A/B Test
- ๐งช Statistical Testing - Confidence intervals and significance
- ๐ฆ Unity Catalog Integration - Auto-versioning of optimized prompts
- ๐ Performance Tracking - Optimization history and metrics
๐ See Optimization Guide for complete documentation
๐ Databricks-Native Visualization
Built-in observability and debugging for Databricks notebooks:
from visualization import DatabricksVisualizer
# Works natively in Databricks notebooks
viz = DatabricksVisualizer()
# Execution graph (Mermaid diagram)
viz.show_execution_graph(trace)
# Timeline (Plotly chart)
viz.show_timeline(trace)
# Tool call replay
viz.show_tool_calls(tool_calls)
# Decision inspection
viz.explain_decision(decision, context)
# Log to MLflow
viz.log_to_mlflow(trace)
# Create interactive widget
create_databricks_widget(trace)
Features:
- ๐จ Execution Graphs - Mermaid diagrams showing agent workflow
- โฑ๏ธ Timeline Visualization - Plotly charts for execution timing
- ๐ง Tool Call Replay - Interactive tool call inspection
- ๐ค Decision Explainer - "Why did the agent do this?"
- ๐ Prompt Comparison - Side-by-side version diffs
- ๐ MLflow Integration - Auto-log visualizations to MLflow
- ๐๏ธ Databricks Widgets - Interactive notebook controls
Designed for Databricks:
- Uses
displayHTML()for native rendering - Integrates with MLflow UI
- Works with Databricks widgets
- Also works in Jupyter/standalone
๐ See Visualization Guide for complete documentation
Why Use This Template?
โจ Universal Design - Works for any domain, not just fraud detection
๐ Plug-and-Play - 3 lines to integrate into existing pipelines
โ๏ธ Configuration-Driven - Enable/disable agents via YAML, zero code changes
๐ฏ SLA-Aware - Control inline vs async execution based on your requirements
๐๏ธ Production-Ready - Battle-tested patterns, not toy examples
๐ฆ Complete Stack - Includes telemetry, evaluation, optimization, deployment
๐ Template Generator - Scaffold new projects in seconds
๐งช Built-in Benchmarking - Comprehensive eval suite with leaderboards
Architecture Overview
This project implements a domain-agnostic, plug-and-play agent framework that integrates into existing data pipelines with minimal code changes. The architecture leverages:
- Ephemeral Agents: Task-specific narrative agents that spin up on-demand
- Hot LLM Pools: Always-on GPU endpoints via Databricks Model Serving
- Prompt Optimization: DSPy for task prompts, TextGrad for system prompts
- Memory & Context: Lakebase for conversation history and embeddings
- MCP Tool Calling: Standardized tool interfaces via Model Context Protocol
- Observability: OTEL โ Zerobus โ Delta Lake telemetry pipeline
- Evaluation: MLflow custom scorers and continuous feedback loops
Key Features
๐ Plug-and-Play Integration - Add to existing pipelines with 3 lines of code
โ๏ธ Configuration-Driven - Enable/disable agents via YAML, no code changes
๐ง LangGraph Orchestration - Plan โ Act โ Critique โ Re-plan loops for autonomous workflows
๐ฏ SLA-Aware Execution - Control inline vs offline based on requirements
๐ Type-Safe - Pydantic schemas validate all data at runtime
๐ ASGI Support - FastAPI endpoints, SSE streaming, async HTTP
๐ Agent-to-Agent (A2A) - Event-driven agent communication via NATS/Redis (optional)
โจ Domain-Agnostic - Works for fraud, risk, support, compliance, or any use case
๐ Prompt Optimization - DSPy for task prompts, TextGrad for system prompts
๐ Comprehensive Telemetry - All events streamed to Delta Lake via Zerobus
๐ง Memory Management - Lakebase for vector embeddings and conversation history
๐ง MCP Tool Integration - Standardized external tool calling (v1.25.0+)
๐ MLflow Tracking - Experiment tracking, evaluation, and model registry
๐๏ธ Unity Catalog - Centralized prompt and model versioning
๐ข Multi-Tenant Ready - Schema adapters handle any customer format
๐งช Agent Benchmarking - Multi-metric eval suite with auto-generated leaderboards
๐ง Agent-Governed Memory - Intelligent storage, retrieval, reflection, and forgetting
๐ฏ Reasoning Optimization - Trajectory tuning, CoT distillation, feedback loops, RL-style tuning
๐ Databricks-Native Visualization - Execution graphs, timelines, tool replay, decision inspection
โ๏ธ YAML-Configurable - All infrastructure and runtime settings via unified YAML
Project Structure
.
โโโ agents/ # ๐ค Agent framework (CORE)
โ โโโ base.py # - Base agent interfaces
โ โโโ config.py # - Configuration loader
โ โโโ registry.py # - Agent registry + router
โ โโโ execution/ # - Pluggable execution backends
โโโ shared/ # ๐ฆ Shared libraries
โ โโโ schemas/ # - Pydantic data models (type-safe)
โ โโโ adapters/ # - Schema adaptation framework
โโโ config/ # โ๏ธ Configuration (plug-and-play)
โ โโโ agents/ # - Agent configurations (YAML)
โ โโโ adapters/ # - Customer schema adapters
โโโ services/ # ๐ Deployable services
โโโ optimization/ # ๐ Prompt optimization (DSPy/TextGrad)
โโโ memory/ # ๐ง Lakebase integration
โโโ orchestration/ # ๐ Databricks Workflows + LangGraph
โโโ mcp-servers/ # ๐ง Model Context Protocol tools
โโโ evaluation/ # ๐ MLflow scorers and metrics
โโโ telemetry/ # ๐ OTEL โ Zerobus โ Delta
โโโ uc-registry/ # ๐๏ธ Unity Catalog integration
โโโ data/ # ๐ Synthetic testbed
โโโ infrastructure/ # ๐๏ธ Deployment configs (DABS)
โโโ experiments/ # ๐ฌ Notebooks + MLflow tracking
โโโ tests/ # ๐งช Unit, integration, load tests
โโโ docs/ # ๐ Documentation
See Project Structure for detailed breakdown with key concepts.
Data Schemas
All data structures are defined using Pydantic models in shared/schemas/:
- transactions.py - Transaction records and payment data
- fraud_signals.py - Velocity, amount, location, device signals
- contexts.py - Merchant and customer profiles
- agent_io.py - Agent inputs, outputs, tool calls (MCP-ready)
- evaluation.py - Evaluation records and scorer metrics
- telemetry.py - OTEL traces for Zerobus ingestion
See shared/schemas/README.md for detailed documentation.
Quick Start (Plug-and-Play)
Add agents to your existing pipeline in 3 lines:
from agents import AgentRouter
from shared.schemas import AgentInput
# 1. Load agents from config (one line!)
router = AgentRouter.from_yaml("config/agents.yaml")
# 2. Convert your data to AgentInput (Pydantic validates!)
agent_input = AgentInput(
request_id=record.id,
data=YourDomainData(**record.dict()), # Your domain-specific data
# ... your contexts
)
# 3. Route to agent (inline or offline based on config!)
result = await router.route("your_agent", agent_input)
# That's it! Agent runs according to your config.
# No code changes to enable/disable or switch execution modes.
Configuration controls everything:
# config/agents.yaml
agents:
your_agent:
class: "your_package.YourAgent"
execution_mode: "offline" # or "inline" if SLA allows
enabled: true # Change to false to disable
timeout: 30
Works for any domain: Fraud detection, risk analysis, customer support, compliance, content moderation, etc.
See Configuration System for details.
Getting Started
Prerequisites
- Python 3.9+
- Databricks workspace with:
- Model Serving endpoint
- Unity Catalog
- Lakebase access
- Zerobus server endpoint (for telemetry)
Installation
# Clone the repository
git clone <repo-url>
cd "SOTA Agent"
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Or install in development mode
pip install -e ".[dev]"
Configuration
# Copy example config
cp .env.example .env
# Edit .env with your Databricks credentials
# - DATABRICKS_HOST
# - DATABRICKS_TOKEN
# - MODEL_SERVING_ENDPOINT
# - UNITY_CATALOG_NAME
# - ZEROBUS_ENDPOINT
Databricks Stack
| Component | Technology |
|---|---|
| LLM Inference | Databricks Model Serving |
| Orchestration | LangGraph + Databricks Workflows |
| Tracing & Evaluation | Databricks MLflow |
| Memory/Vector Store | Lakebase |
| Telemetry Sink | Zerobus โ Delta Lake |
| Prompt Registry | Unity Catalog |
| Dashboards | Databricks SQL |
| Compute | Databricks Clusters / Serverless |
Development
Run Tests
# Run all tests
pytest
# Run with coverage
pytest --cov=. --cov-report=html
# Run specific test suite
pytest tests/unit/
pytest tests/integration/
Code Quality
# Format code
black .
# Lint
ruff check .
# Type check
mypy .
Architecture Flows
Realtime Path (Low-latency)
Transaction โ Event Collector โ Ephemeral Narrative Agent โ MCP Tool Calls โ LLM Pool โ Risk Narrative โ Dashboard/Alerts
Async Path (Optimization)
MLflow Scorers โ Evaluate High-Risk Txns โ Log Metrics โ DSPy/TextGrad Optimization โ Update Prompts in UC โ Deploy to Agents
MCP Integration
All tool calls use Model Context Protocol for standardization:
# Tool call schema (MCP-ready)
tool_call = ToolCall(
tool_id="call_123",
tool_name="merchant_context",
tool_server="uc-query-server",
arguments={"merchant_id": "mch_001"}
)
# Tool result
tool_result = ToolResult(
tool_call_id="call_123",
success=True,
result=merchant_data,
latency_ms=45.2
)
See mcp-servers/ for tool implementations.
Telemetry
All events flow through OTEL โ Zerobus โ Delta Lake:
- Agent start/complete/error
- MCP tool calls
- LLM requests/responses
- Stream chunks
- Evaluation results
Query telemetry in Unity Catalog:
SELECT * FROM main.telemetry.agent_traces
WHERE transaction_id = 'txn_123'
ORDER BY timestamp DESC;
Prompt Optimization
DSPy (Task Prompts)
# Optimize reasoning pipeline
from optimization.dspy import MIPROOptimizer
optimizer = MIPROOptimizer(training_data)
optimized_prompt = optimizer.optimize(baseline_prompt)
TextGrad (System Prompts)
# Optimize system prompt with guardrails
from optimization.textgrad import SystemPromptOptimizer
optimizer = SystemPromptOptimizer(feedback_data)
optimized_system = optimizer.optimize(system_prompt)
Synthetic Data
Generate idempotent test data:
# Generate synthetic transactions
python -m data.synthetic.generate --seed 42 --count 5000
# Output: data/synthetic/raw/transactions.parquet
Contributing
- Create a feature branch
- Make changes with tests
- Run linters and tests
- Submit pull request
License
MIT
Documentation
๐ฏ Start Here
- Getting Started โญ - 5-minute quick start guide
- Template Guide โญ - Comprehensive guide for any domain
- Cross-Domain Examples โญ - 8 real-world examples
- Documentation Index - Complete documentation map
๐ Core Documentation
- Project Structure - Code organization and key concepts
- Configuration System - YAML-based configuration
- Schema Documentation - Data schemas and adaptation
- Use Cases - Advanced usage patterns
๐ ๏ธ Tools
- Template Generator -
python template_generator.py --help - Example Integrations -
examples/plug_and_play_integration.py
Contact
For questions, see docs/ or contact the team.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sota_agent_framework-0.2.1.tar.gz.
File metadata
- Download URL: sota_agent_framework-0.2.1.tar.gz
- Upload date:
- Size: 160.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f7b260da1f0194604989c82c6f65198077c2cd193163ed1320622ba3a9e92bbc
|
|
| MD5 |
1694ef715b3bc24a5e9ee60cbc30e7b4
|
|
| BLAKE2b-256 |
b5ac5199884ece53731eb04365b1d17a87639d8c399875c740d8565e736a1a87
|
File details
Details for the file sota_agent_framework-0.2.1-py3-none-any.whl.
File metadata
- Download URL: sota_agent_framework-0.2.1-py3-none-any.whl
- Upload date:
- Size: 188.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d231ea416b1dc841edc51ff2cc51ea91a2a074d2b2103b0a1a4d9d6aa319a8f0
|
|
| MD5 |
9cbdcd342c950794d6b2e698cd350ae1
|
|
| BLAKE2b-256 |
412b10de128634db3d2143a0f1ed54117fd3a36b180032a3efce23f28ef1c2ea
|