Enhanced Model Context Protocol (MCP) server for intelligent Kusto table discovery, advanced analytics, and query optimization
Project description
๐ Enhanced Kusto Table Search MCP Server
A production-grade Model Context Protocol (MCP) server built with FastMCP for intelligent Kusto table discovery, advanced analytics, query optimization, and enterprise-ready reliability.
๐ What Makes This Special?
This is a battle-tested, enterprise-ready MCP server designed for AI agents working with Azure Data Explorer (Kusto) at scale:
Core Capabilities
- ๐ Smart Search: Find tables across 9,799+ tables using natural language queries
- ๐ก๏ธ Anti-Hallucination: Schema validation, sampling-based query building, zero invented column names
- ๐ฏ Query Handles: Server-side result caching prevents context window pollution with disk persistence
- ๐พ Reusable Workflows: 10 production-ready query templates for common patterns
- โก Blazing Fast: Lazy loading, LRU caching, <100ms for cached searches
- ๐ Observable: Built-in performance monitoring and comprehensive metrics
Production-Ready Features (NEW!)
- ๐ Real Kusto Integration: Native Azure SDK connectivity with multi-auth support
- ๐ Advanced Analytics: Statistical analysis, correlation detection, anomaly detection, time-series analysis
- ๐ฏ Query Optimization: Intelligent anti-pattern detection, complexity scoring, automatic optimization
- ๐ก๏ธ Enterprise Reliability: Circuit breakers, exponential backoff retry, bulkhead isolation, rate limiting
- ๐ฟ Persistent Caching: Disk-backed query results with LRU eviction and lazy loading
- ๐ Template Library: 10 production templates for monitoring, performance, security, and cost optimization
- ๐ค Workflow Builder: AI-powered extraction of Kusto queries from docs into reusable templates (NEW!)
๐ฏ Perfect For
- AI Agents building Kusto queries without hallucinating column names
- Data Engineers exploring large Kusto estates with thousands of tables
- Automated Workflows needing reliable, repeatable query patterns with fault tolerance
- Analytics Teams performing statistical analysis and anomaly detection on cached results
- DevOps Teams monitoring performance, optimizing queries, and ensuring system reliability
- Security Teams detecting threats and analyzing suspicious activity patterns
- FinOps Teams performing cost analysis and resource optimization
๐ Quick Start
Prerequisites
- Python 3.9 or higher
- Azure credentials with Kusto access (Managed Identity, Azure CLI, or Device Auth)
- FastMCP installed (
pip install fastmcp) - Azure SDK packages:
azure-kusto-data>=4.0.0,azure-identity>=1.14.0
Installation
# Clone the repository
git clone <your-repo-url>
cd cache-kusto-info
# Install dependencies
pip install -r requirements.txt
# Setup connection strings (copy from example)
cp connection_strings.json.example connection_strings.json
# Edit connection_strings.json with your cluster details
# Run the server
python kusto_server.py
Configuration for Claude Desktop
Add to your Claude Desktop config (~/Library/Application Support/Claude/claude_desktop_config.json on macOS):
{
"mcpServers": {
"kusto-table-search": {
"command": "python",
"args": ["/path/to/cache-kusto-info/kusto_server.py"],
"env": {}
}
}
}
Using with FastMCP Client
from fastmcp import FastMCP
from fastmcp.client import Client
import kusto_server
async with Client(kusto_server.mcp) as client:
# Search for tables
result = await client.call_tool(
"search_kusto_tables",
arguments={"query": "wireserver requests", "limit": 5}
)
print(result)
๐จ Key Features Explained
1. Real Kusto Connectivity (NEW!)
Connect to Azure Data Explorer using the official Azure SDK with automatic authentication fallback:
# Automatic authentication chain:
# 1. Managed Identity (for Azure-hosted services)
# 2. Azure CLI (for local development)
# 3. Device Auth (interactive fallback)
### Configuration
```bash
# Configure in cache/connection_strings.json
{
"clusters": {
"mycluster": {
"cluster_url": "https://mycluster.westus.kusto.windows.net",
"use_real_client": true # Enable Azure SDK
}
}
}
# Built-in resilience:
# - Circuit breaker (protects against cascading failures)
# - Exponential backoff retry (3 attempts with jitter)
# - Bulkhead isolation (20 concurrent queries max)
# - Rate limiting (token bucket algorithm)
2. Advanced Analytics Engine (NEW!)
Perform sophisticated analysis on cached query results without polluting context:
# Statistical Analysis
stats = query_handle_statistical_analysis(
handle="qh_abc123",
columns=["ResponseTime", "RequestCount"]
)
# Returns: mean, median, stddev, P50/P75/P90/P95/P99, skewness, kurtosis
# Correlation Detection
correlations = query_handle_correlation_analysis(
handle="qh_abc123",
numeric_columns=["CPU", "Memory", "Latency"]
)
# Returns: Pearson correlation matrix
# Anomaly Detection (3 methods)
anomalies = query_handle_outlier_detection(
handle="qh_abc123",
column="RequestLatency",
method="iqr", # or "zscore" or "modified_zscore"
threshold=1.5
)
# Returns: outlier values, indices, statistics
# Time Series Analysis
trends = query_handle_time_series_analysis(
handle="qh_abc123",
time_column="Timestamp",
value_column="ErrorRate",
window_size=10
)
# Returns: trend (increasing/decreasing/stable), rate_of_change, moving_averages
3. Query Optimization (NEW!)
Intelligent query analysis with anti-pattern detection and automatic optimization:
# Comprehensive Query Analysis
analysis = query_analyze_optimization(
query="""
MyTable
| where EventTime > ago(7d)
| extend ComputedField = tostring(RawData)
| order by EventTime desc
| take 100
"""
)
# Returns:
# - Anti-patterns detected (no time filter, SELECT *, inefficient ordering, etc.)
# - Complexity score (0-100+)
# - Estimated cost (low/medium/high/very high)
# - Optimization suggestions
# - Optimized query
# Index Recommendations
indexes = query_suggest_indexes(
query="MyTable | where UserId == 'user123' and EventTime > ago(1h)",
table_name="MyTable"
)
# Returns: recommended indexes (range, datetime, hash) with justification
# Query Comparison
comparison = query_compare_performance(
query1="MyTable | where EventTime > ago(1d) | take 1000",
query2="MyTable | where EventTime > ago(1d) | summarize count()"
)
# Returns: side-by-side complexity, cost, anti-patterns, recommendation
4. Production Query Templates (NEW!)
10 battle-tested templates for common scenarios:
# Error Analysis
template_render(
name="error_analysis_timerange",
parameters={
"table": "ApplicationLogs",
"start_time": "ago(6h)",
"end_time": "now()",
"min_severity": 3
}
)
# Performance Percentiles (SLA monitoring)
template_render(
name="performance_percentiles",
parameters={
"table": "RequestMetrics",
"time_column": "RequestTime",
"value_column": "Latency",
"timespan": "1h"
}
)
# Security Threat Detection
template_render(
name="security_suspicious_activity",
parameters={
"table": "SecurityEvents",
"lookback": "24h",
"user_column": "UserPrincipalName"
}
)
# Cost Analysis
template_render(
name="cost_analysis_by_resource",
parameters={
"table": "BillingData",
"timespan": "30d"
}
)
Available Templates: error_analysis_timerange, performance_percentiles, top_users_by_activity, anomaly_detection_timeseries, data_quality_check, hourly_trend_analysis, failure_rate_by_component, resource_utilization_peaks, security_suspicious_activity, cost_analysis_by_resource
5. Smart Table Search with Lazy Caching
Traditional approach: Load all schemas upfront (slow, memory-intensive) Our approach: Load schemas on-demand as you search (fast, efficient)
# Search is instant - schemas loaded only when needed
search_kusto_tables(
query="node health monitoring tables",
method="hybrid", # Combines keyword + fuzzy + semantic
limit=10
)
6. Anti-Hallucination Query Building
AI agents love to invent column names. We prevent this:
# Step 1: Sample table to get ACTUAL column names
schema = sample_table_for_query_building(
cluster="admeus",
database="AdmeusDB",
table="NodeHealthEvents"
)
# Step 2: Build query using ONLY the real column names
# โ
Uses: EventTime, NodeName, Status (from actual schema)
# โ Never invents: Timestamp, Node, State
7. Query Handle System with Persistence
Large query results pollute context windows. We solve this with disk-backed caching:
# Execute query, get handle instead of 10,000 rows
handle = execute_query_with_handle(
cluster="admeus",
database="AdmeusDB",
table="LargeTable",
query="LargeTable | where EventTime > ago(1d)"
)
# Results automatically persisted to disk (survives server restart)
# LRU eviction keeps memory usage under control
# Lazy loading: handles loaded from disk only when accessed
# Analyze results WITHOUT loading into context
query_handle_analyze(
handle=handle,
operation="count_by",
column="ErrorType"
)
# Returns: {"Critical": 1234, "Warning": 5678}
# Check cache statistics
cache_stats()
# Returns: memory_handles, disk_handles, cache_hit_rate, disk_reads, disk_writes
8. Reusable Query Templates
Save common patterns, reuse with different parameters:
# Create template
template_create(
name="error_rate_analysis",
query="""
{table}
| where EventTime > ago({timespan})
| where Severity >= {min_severity}
| summarize ErrorCount=count() by bin(EventTime, {bin_size}), ErrorType
""",
parameters=[
{"name": "table", "type": "string", "description": "Table name"},
{"name": "timespan", "type": "timespan", "default_value": "1h"},
{"name": "min_severity", "type": "number", "default_value": 3},
{"name": "bin_size", "type": "timespan", "default_value": "5m"}
]
)
# Reuse template
query = template_render(
name="error_rate_analysis",
parameters={
"table": "ApplicationLogs",
"timespan": "6h",
"min_severity": 4
}
)
9. Intelligent Workflow Builder (NEW!)
Transform documentation into reusable workflow templates automatically:
# Use the prompt to analyze docs with Kusto queries
# The AI will:
# 1. Extract all KQL queries from markdown
# 2. Identify parameters (times, IDs, filters)
# 3. Generate descriptive names and tags
# 4. Create parameterized templates
# Simply invoke the prompt and provide a doc path
prompt_create_workflows_from_docs()
# โ Provide: docs/troubleshooting/kusto-queries.md
# โ AI extracts 5 queries
# โ Creates 5 reusable workflow templates
# โ Ready to execute with template_execute
# Example transformation:
# Input (doc): "ago(24h)" and "ErrorCode == '500'"
# Output (template): {time_window} and {error_code} parameters
# Discover created workflows
workflow_list(category="troubleshooting")
# โ Shows all troubleshooting workflows with params
# Execute workflow with one command
template_execute(
template_name="wireserver_error_analysis",
parameters={"time_window": "6h", "error_code": "503"}
)
See: docs/WORKFLOW_BUILDER_GUIDE.md for complete documentation
๐ Tool Reference
Core Search Tools
| Tool | Purpose | Key Parameters |
|---|---|---|
search_kusto_tables |
Find tables using natural language | query, method, limit |
get_table_details |
Get comprehensive table info | cluster, database, table |
sample_table_for_query_building |
Anti-hallucination: Get real schema | cluster, database, table |
Query Handle Tools
| Tool | Purpose | Key Parameters |
|---|---|---|
execute_query_with_handle |
Execute query, return handle | query, limit |
query_handle_analyze |
Analyze cached results | handle, operation, column |
query_handle_list |
List all active handles | include_expired |
query_handle_validate |
Check if handle is valid | handle |
query_handle_get_sample |
Get sample rows from handle | handle, num_rows |
Advanced Analytics Tools (NEW!)
| Tool | Purpose | Key Parameters |
|---|---|---|
query_handle_statistical_analysis |
Statistical analysis on cached results | handle, columns |
query_handle_correlation_analysis |
Detect correlations between columns | handle, numeric_columns |
query_handle_outlier_detection |
Anomaly detection (IQR/Z-score) | handle, column, method, threshold |
query_handle_time_series_analysis |
Trend detection and time-series analysis | handle, time_column, value_column, window_size |
Query Optimization Tools (NEW!)
| Tool | Purpose | Key Parameters |
|---|---|---|
query_analyze_optimization |
Comprehensive query analysis | query |
query_suggest_indexes |
Index recommendations | query, table_name |
query_compare_performance |
Side-by-side query comparison | query1, query2 |
Template Tools
| Tool | Purpose | Key Parameters |
|---|---|---|
template_create |
Create reusable query template | name, query, parameters |
template_list |
List all templates (10 production templates included) | tags, search |
template_render |
Generate query from template | name, parameters |
template_get |
Get template details | name |
template_delete |
Delete custom template | name |
Cache & Performance Tools
| Tool | Purpose | Key Parameters |
|---|---|---|
cache_stats |
Get comprehensive cache statistics | - |
cache_clear |
Clear expired cache entries | - |
performance_stats |
Get performance metrics | operation (optional) |
๐ก๏ธ Anti-Hallucination Best Practices
โ DO: Always Sample Before Querying
# CORRECT WORKFLOW
1. search_kusto_tables("error logs")
2. sample_table_for_query_building(cluster, db, table)
3. Build query using EXACT column names from step 2
4. execute_query_with_handle(query)
โ DON'T: Assume Column Names
# WRONG - Assumes "Timestamp" exists
query = "ErrorLogs | where Timestamp > ago(1h)"
# RIGHT - Uses actual column from schema
schema = sample_table_for_query_building(...)
time_col = schema["schema"]["primary_time_column"] # e.g., "EventTime"
query = f"ErrorLogs | where {time_col} > ago(1h)"
๏ฟฝ Running the Server
Local Development (Default)
# Run with STDIO transport (for Claude Desktop)
python kusto_server.py
Alternative Transports
# Run with HTTP transport
fastmcp run kusto_server.py --transport http --port 8000
# Run with SSE transport
fastmcp run kusto_server.py --transport sse
๐งช Testing
# Run all tests
pytest
# Run with coverage
pytest --cov=src --cov-report=html
# Run specific test file
pytest tests/test_fastmcp_server.py -v
# Run integration tests only
pytest tests/test_fastmcp_server.py::TestKustoServerIntegration -v
# Run resilience pattern tests
pytest tests/test_resilience.py -v
# Run real Kusto client tests
pytest tests/test_kusto_client.py -v
๐ Recent Enhancements
See OVERNIGHT_ENHANCEMENTS_V3.md for comprehensive documentation of recent production-ready improvements:
1. Real Kusto Client Integration
- Native Azure SDK connectivity (azure-kusto-data 4.0+)
- Multi-auth support: Managed Identity โ Azure CLI โ Device Auth
- Built-in resilience: circuit breakers, retry, bulkheads
- Impact: Production-ready Kusto connectivity with enterprise reliability
2. Advanced Analytics Engine
- Statistical analysis: mean, median, stddev, percentiles (P50-P99)
- Correlation detection: Pearson correlation matrices
- Anomaly detection: IQR, Z-score, Modified Z-score methods
- Time-series analysis: trend detection, rate of change, moving averages
- Data quality scoring: completeness, consistency, validity (0-100 score)
- Impact: Perform sophisticated analysis without context pollution
3. Query Optimization Analyzer
- Anti-pattern detection: 10+ patterns (no time filter, SELECT *, inefficient ordering)
- Complexity scoring: 0-100+ with cost estimation
- Automatic optimization: query rewriting with best practices
- Index recommendations: range, datetime, hash indexes
- Side-by-side comparison: compare alternative queries
- Impact: Write better queries, reduce costs, improve performance
4. Enhanced Error Recovery System
- Circuit breaker: state machine with auto-recovery (5 failure threshold, 60s timeout)
- Exponential backoff retry: 3 attempts with jitter (1s โ 2s โ 4s)
- Bulkhead isolation: 20 concurrent query limit
- Fallback handlers: graceful degradation
- Rate limiting: token bucket algorithm
- Impact: Production-grade fault tolerance
5. Query Result Caching with Persistence
- Disk-backed storage: survives server restarts
- LRU eviction: memory limit 1000 handles
- Lazy loading: handles loaded only when accessed
- Integrity verification: pickle serialization with validation
- Cache statistics: hit rate, disk I/O, memory usage
- Impact: Efficient memory usage, reliable caching
6. Advanced Query Template Library
- 10 production-ready templates: error analysis, performance, security, cost
- Comprehensive documentation: parameters, tags, execution times
- Common scenarios: monitoring, SLA tracking, threat detection, FinOps
- Impact: Accelerate common analysis patterns
Total Additions: ~3000+ lines of production code, 7 new files, 7 new MCP tools, comprehensive test coverage
๐ Performance Characteristics
- Table Search: <100ms for cached queries, <500ms for first search
- Schema Cache: <10ms for cached lookups, <2s for fresh fetch
- Query Handles: <50ms for analytics operations, persistent across restarts
- Memory: <200MB typical, <500MB with large caches, LRU eviction at 1000 handles
- Disk I/O: Lazy loading, handles loaded only when accessed
- Resilience: Circuit breaker opens after 5 failures, 60s timeout, auto-recovery
- Concurrency: Bulkhead limits 20 concurrent Kusto queries, prevents resource exhaustion
- Retry: Exponential backoff (1s โ 2s โ 4s) with jitter, 3 max attempts
๐๏ธ Architecture
kusto_server.py (FastMCP server - 20+ tools)
โโโ @mcp.tool decorators (20+ tools including analytics & optimization)
โโโ @mcp.resource (2 help resources)
โโโ @mcp.prompt (2 reusable prompts)
src/
โโโ core/ # Infrastructure
โ โโโ logging_config.py # Structured logging
โ โโโ config.py # Configuration management
โ โโโ performance.py # Performance monitoring
โ โโโ exceptions.py # Custom exceptions
โ โโโ resilience.py # NEW: Circuit breakers, retry, bulkheads, rate limiting
โโโ services/ # Business logic
โ โโโ schema_cache_service.py # Lazy schema caching
โ โโโ query_handle_service.py # NEW: Disk-backed result caching with LRU
โ โโโ query_template_service.py # Template management (10 production templates)
โ โโโ query_templates.json # NEW: Production query template library
โ โโโ kql_query_builder_service.py # KQL query generation
โ โโโ analytics_engine.py # NEW: Statistical analysis, correlation, anomalies
โ โโโ query_optimizer.py # NEW: Anti-pattern detection, query optimization
โโโ tools/ # High-level tools
โ โโโ table_discovery.py # Table search orchestration
โ โโโ query_sampler.py # Anti-hallucination sampling
โโโ utils/ # Utilities
โโโ anti_hallucination.py # Schema validation
โโโ kusto_client.py # NEW: Real Azure SDK + Mock clients with resilience
โโโ helpers.py # Helper functions
tests/ # Comprehensive test suite
โโโ test_fastmcp_server.py # Server integration tests
โโโ test_resilience.py # NEW: Circuit breaker, retry, bulkhead tests
โโโ test_kusto_client.py # NEW: Real & mock client tests
โโโ test_ai_query_builder.py # NEW: AI query building tests
โโโ test_query_handle_service.py # Query handle tests
โโโ test_schema_cache_service.py # Schema cache tests
๐ค Contributing
Contributions are welcome! Please see DEVELOPMENT_GUIDE.md for development practices and OVERNIGHT_ENHANCEMENTS_V3.md for recent feature additions.
๐ License
MIT License - see LICENSE file for details
๐ Acknowledgments
- Built with FastMCP by @jlowin
- Powered by Azure SDK for Python
- Inspired by the need for trustworthy AI-powered query building
- Special thanks to the Azure Data Explorer team
๐ Latest Update: Major production-ready enhancements including real Azure SDK integration, advanced analytics, query optimization, enterprise resilience patterns, persistent caching, and 10 production query templates. See OVERNIGHT_ENHANCEMENTS_V3.md for details.
๐ง Support
- ๐ Documentation
- ๐ Issue Tracker
- ๐ฌ Discussions
Legacy Files (Pre-FastMCP Migration)
The following files are from the previous MCP SDK implementation and are kept for reference:
mcp_table_search_server.py- Original MCP SDK serverenhanced_mcp_server.py- Previous enhanced server (now replaced bykusto_server.py)
Use kusto_server.py for all new development.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file find_kusto_table_mcp-1.0.0.tar.gz.
File metadata
- Download URL: find_kusto_table_mcp-1.0.0.tar.gz
- Upload date:
- Size: 210.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
273d7e5bb592d34749893c8e556bc6669c048873dccdb89d9ee176158357f720
|
|
| MD5 |
d4a707ac77ab04cd741487e42eab8d53
|
|
| BLAKE2b-256 |
0b2f9d8680ad3215ec9621912826aeb2921b9f678a5b82f90aab2788c796e02f
|
File details
Details for the file find_kusto_table_mcp-1.0.0-py3-none-any.whl.
File metadata
- Download URL: find_kusto_table_mcp-1.0.0-py3-none-any.whl
- Upload date:
- Size: 168.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6474a0e19a52520a1ce51b1202e86467a8e1f3ffb9e13da365d86ea32c04a448
|
|
| MD5 |
72a6961fa723e112cc9b731bc1f70cca
|
|
| BLAKE2b-256 |
c6c5f5d74ff2dd7b656bafe6c2b743ebfcad25a1eddf42697242140c2c7b5da4
|