Domain-agnostic agent framework for integrating AI agents into data pipelines
Project description
SOTA Agent - Universal Agent Workflow Template
A generic, production-ready template for integrating AI agents into any application or data pipeline.
๐ฏ This is a TEMPLATE - Use it to build agent workflows for any domain!
Originally designed for fraud detection, this architecture template applies to any domain requiring AI agent integration:
- ๐ Fraud Detection & Risk Analysis
- ๐ฌ Customer Support & Chatbots
- ๐ Content Moderation & Policy Enforcement
- ๐ฅ Healthcare & Diagnosis Support
- ๐ Data Quality & Anomaly Detection
- ๐ Analytics & Report Generation
- ๐ค Any Agent-Powered Workflow
๐ Quick Start
Installation
# Install from PyPI (coming soon!)
pip install sota-agent-framework
# Or install from GitHub
pip install git+https://github.com/somasekar278/universal-agent-template.git
Generate Your First Project
# Generate a complete project for your domain
sota-generate --domain "your_domain" --output ./your-project
# Navigate and run
cd your-project
python examples/example_usage.py # Works immediately! โ
For Contributors/Development
If you're cloning the repo to contribute:
git clone https://github.com/somasekar278/universal-agent-template.git
cd universal-agent-template
./setup.sh # or setup.bat on Windows
python template_generator.py --domain "test"
Path 2: Integrate Into Existing Code (3 lines)
from agents import AgentRouter
router = AgentRouter.from_yaml("config/agents.yaml") # 1. Load
result = await router.route("your_agent", input_data) # 2. Execute
# That's it! ๐
๐ See Getting Started Guide for detailed 5-minute guide
Why Use This Template?
โจ Universal Design - Works for any domain, not just fraud detection
๐ Plug-and-Play - 3 lines to integrate into existing pipelines
โ๏ธ Configuration-Driven - Enable/disable agents via YAML, zero code changes
๐ฏ SLA-Aware - Control inline vs async execution based on your requirements
๐๏ธ Production-Ready - Battle-tested patterns, not toy examples
๐ฆ Complete Stack - Includes telemetry, evaluation, optimization, deployment
๐ Template Generator - Scaffold new projects in seconds
Architecture Overview
This project implements a domain-agnostic, plug-and-play agent framework that integrates into existing data pipelines with minimal code changes. The architecture leverages:
- Ephemeral Agents: Task-specific narrative agents that spin up on-demand
- Hot LLM Pools: Always-on GPU endpoints via Databricks Model Serving
- Prompt Optimization: DSPy for task prompts, TextGrad for system prompts
- Memory & Context: Lakebase for conversation history and embeddings
- MCP Tool Calling: Standardized tool interfaces via Model Context Protocol
- Observability: OTEL โ Zerobus โ Delta Lake telemetry pipeline
- Evaluation: MLflow custom scorers and continuous feedback loops
Key Features
๐ Plug-and-Play Integration - Add to existing pipelines with 3 lines of code
โ๏ธ Configuration-Driven - Enable/disable agents via YAML, no code changes
๐ฏ SLA-Aware Execution - Control inline vs offline based on requirements
๐ Type-Safe - Pydantic schemas validate all data at runtime
๐ ASGI Support - FastAPI endpoints, SSE streaming, async HTTP
๐ Agent-to-Agent (A2A) - Event-driven agent communication via NATS/Redis (optional)
โจ Domain-Agnostic - Works for fraud, risk, support, compliance, or any use case
๐ Prompt Optimization - DSPy for task prompts, TextGrad for system prompts
๐ Comprehensive Telemetry - All events streamed to Delta Lake via Zerobus
๐ง Memory Management - Lakebase for vector embeddings and conversation history
๐ง MCP Tool Integration - Standardized external tool calling
๐ MLflow Tracking - Experiment tracking, evaluation, and model registry
๐๏ธ Unity Catalog - Centralized prompt and model versioning
๐ข Multi-Tenant Ready - Schema adapters handle any customer format
Project Structure
.
โโโ agents/ # ๐ค Agent framework (CORE)
โ โโโ base.py # - Base agent interfaces
โ โโโ config.py # - Configuration loader
โ โโโ registry.py # - Agent registry + router
โ โโโ execution/ # - Pluggable execution backends
โโโ shared/ # ๐ฆ Shared libraries
โ โโโ schemas/ # - Pydantic data models (type-safe)
โ โโโ adapters/ # - Schema adaptation framework
โโโ config/ # โ๏ธ Configuration (plug-and-play)
โ โโโ agents/ # - Agent configurations (YAML)
โ โโโ adapters/ # - Customer schema adapters
โโโ services/ # ๐ Deployable services
โโโ optimization/ # ๐ Prompt optimization (DSPy/TextGrad)
โโโ memory/ # ๐ง Lakebase integration
โโโ orchestration/ # ๐ Databricks Workflows + LangGraph
โโโ mcp-servers/ # ๐ง Model Context Protocol tools
โโโ evaluation/ # ๐ MLflow scorers and metrics
โโโ telemetry/ # ๐ OTEL โ Zerobus โ Delta
โโโ uc-registry/ # ๐๏ธ Unity Catalog integration
โโโ data/ # ๐ Synthetic testbed
โโโ infrastructure/ # ๐๏ธ Deployment configs (DABS)
โโโ experiments/ # ๐ฌ Notebooks + MLflow tracking
โโโ tests/ # ๐งช Unit, integration, load tests
โโโ docs/ # ๐ Documentation
See Project Structure for detailed breakdown with key concepts.
Data Schemas
All data structures are defined using Pydantic models in shared/schemas/:
- transactions.py - Transaction records and payment data
- fraud_signals.py - Velocity, amount, location, device signals
- contexts.py - Merchant and customer profiles
- agent_io.py - Agent inputs, outputs, tool calls (MCP-ready)
- evaluation.py - Evaluation records and scorer metrics
- telemetry.py - OTEL traces for Zerobus ingestion
See shared/schemas/README.md for detailed documentation.
Quick Start (Plug-and-Play)
Add agents to your existing pipeline in 3 lines:
from agents import AgentRouter
from shared.schemas import AgentInput
# 1. Load agents from config (one line!)
router = AgentRouter.from_yaml("config/agents.yaml")
# 2. Convert your data to AgentInput (Pydantic validates!)
agent_input = AgentInput(
request_id=record.id,
data=YourDomainData(**record.dict()), # Your domain-specific data
# ... your contexts
)
# 3. Route to agent (inline or offline based on config!)
result = await router.route("your_agent", agent_input)
# That's it! Agent runs according to your config.
# No code changes to enable/disable or switch execution modes.
Configuration controls everything:
# config/agents.yaml
agents:
your_agent:
class: "your_package.YourAgent"
execution_mode: "offline" # or "inline" if SLA allows
enabled: true # Change to false to disable
timeout: 30
Works for any domain: Fraud detection, risk analysis, customer support, compliance, content moderation, etc.
See Configuration System for details.
Getting Started
Prerequisites
- Python 3.9+
- Databricks workspace with:
- Model Serving endpoint
- Unity Catalog
- Lakebase access
- Zerobus server endpoint (for telemetry)
Installation
# Clone the repository
git clone <repo-url>
cd "SOTA Agent"
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Or install in development mode
pip install -e ".[dev]"
Configuration
# Copy example config
cp .env.example .env
# Edit .env with your Databricks credentials
# - DATABRICKS_HOST
# - DATABRICKS_TOKEN
# - MODEL_SERVING_ENDPOINT
# - UNITY_CATALOG_NAME
# - ZEROBUS_ENDPOINT
Databricks Stack
| Component | Technology |
|---|---|
| LLM Inference | Databricks Model Serving |
| Orchestration | LangGraph + Databricks Workflows |
| Tracing & Evaluation | Databricks MLflow |
| Memory/Vector Store | Lakebase |
| Telemetry Sink | Zerobus โ Delta Lake |
| Prompt Registry | Unity Catalog |
| Dashboards | Databricks SQL |
| Compute | Databricks Clusters / Serverless |
Development
Run Tests
# Run all tests
pytest
# Run with coverage
pytest --cov=. --cov-report=html
# Run specific test suite
pytest tests/unit/
pytest tests/integration/
Code Quality
# Format code
black .
# Lint
ruff check .
# Type check
mypy .
Architecture Flows
Realtime Path (Low-latency)
Transaction โ Event Collector โ Ephemeral Narrative Agent โ MCP Tool Calls โ LLM Pool โ Risk Narrative โ Dashboard/Alerts
Async Path (Optimization)
MLflow Scorers โ Evaluate High-Risk Txns โ Log Metrics โ DSPy/TextGrad Optimization โ Update Prompts in UC โ Deploy to Agents
MCP Integration
All tool calls use Model Context Protocol for standardization:
# Tool call schema (MCP-ready)
tool_call = ToolCall(
tool_id="call_123",
tool_name="merchant_context",
tool_server="uc-query-server",
arguments={"merchant_id": "mch_001"}
)
# Tool result
tool_result = ToolResult(
tool_call_id="call_123",
success=True,
result=merchant_data,
latency_ms=45.2
)
See mcp-servers/ for tool implementations.
Telemetry
All events flow through OTEL โ Zerobus โ Delta Lake:
- Agent start/complete/error
- MCP tool calls
- LLM requests/responses
- Stream chunks
- Evaluation results
Query telemetry in Unity Catalog:
SELECT * FROM main.telemetry.agent_traces
WHERE transaction_id = 'txn_123'
ORDER BY timestamp DESC;
Prompt Optimization
DSPy (Task Prompts)
# Optimize reasoning pipeline
from optimization.dspy import MIPROOptimizer
optimizer = MIPROOptimizer(training_data)
optimized_prompt = optimizer.optimize(baseline_prompt)
TextGrad (System Prompts)
# Optimize system prompt with guardrails
from optimization.textgrad import SystemPromptOptimizer
optimizer = SystemPromptOptimizer(feedback_data)
optimized_system = optimizer.optimize(system_prompt)
Synthetic Data
Generate idempotent test data:
# Generate synthetic transactions
python -m data.synthetic.generate --seed 42 --count 5000
# Output: data/synthetic/raw/transactions.parquet
Contributing
- Create a feature branch
- Make changes with tests
- Run linters and tests
- Submit pull request
License
MIT
Documentation
๐ฏ Start Here
- Getting Started โญ - 5-minute quick start guide
- Template Guide โญ - Comprehensive guide for any domain
- Cross-Domain Examples โญ - 8 real-world examples
- Documentation Index - Complete documentation map
๐ Core Documentation
- Project Structure - Code organization and key concepts
- Configuration System - YAML-based configuration
- Schema Documentation - Data schemas and adaptation
- Use Cases - Advanced usage patterns
๐ ๏ธ Tools
- Template Generator -
python template_generator.py --help - Example Integrations -
examples/plug_and_play_integration.py
Contact
For questions, see docs/ or contact the team.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sota_agent_framework-0.1.0.tar.gz.
File metadata
- Download URL: sota_agent_framework-0.1.0.tar.gz
- Upload date:
- Size: 25.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
82020c9d75d689607b85e479bfb16347f0bd4cca9bbd735bc89d049950cc9e38
|
|
| MD5 |
42313b4621781a31ce5866baea5216af
|
|
| BLAKE2b-256 |
759f524e196b6e7a9ecac4ef1e462ba004f82bc2ab663eb3f9afe204c38c09ce
|
File details
Details for the file sota_agent_framework-0.1.0-py3-none-any.whl.
File metadata
- Download URL: sota_agent_framework-0.1.0-py3-none-any.whl
- Upload date:
- Size: 20.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1b8c881428cc7b99b5e82623dffbdd9c57459a6e977ef54663cc5e7c3cddfb54
|
|
| MD5 |
9a5cb3e71fce4ff34a1d31f874300beb
|
|
| BLAKE2b-256 |
997525e5b1211cbc7b23d3e467e68650ebbf08910e7779ee3c048f0f43bf0551
|