Skip to main content

Production-grade LLM gateway with cost tracking, budget alerts, conversation memory, streaming, guardrails (PII/injection defense), Redis caching, and multi-provider routing

Project description

Agentic AI Gateway

PyPI version Python 3.9+ License: MIT Author: Tyler Canton

Production-grade LLM routing with automatic fallbacks, canary deployments, and multi-provider support.

Created by Tyler Canton | PyPI | Documentation

The Problem

When you call an LLM directly, you're one API error away from a crashed application:

# If Claude is down, rate-limited, or throws an error... your app crashes
response = bedrock.invoke_model(modelId='anthropic.claude-3-sonnet...')

AWS Bedrock, OpenAI, and other LLM providers don't offer:

  • Automatic fallback to alternative models
  • Traffic splitting for A/B testing new models
  • Centralized monitoring across models
  • Runtime configuration without redeployment

The Solution

Agentic AI Gateway sits between your application and LLM providers:

┌─────────────┐     ┌─────────────┐     ┌──────────────────────────────┐
│  Your App   │────▶│ Agentic AI Gateway │────▶│ Claude (Primary)             │
└─────────────┘     │             │     │ Llama (Fallback)             │
                    │ - Routing   │     │ GPT-4 (Cross-provider backup)│
                    │ - Fallback  │     └──────────────────────────────┘
                    │ - Canary    │
                    │ - Metrics   │
                    └─────────────┘

Installation

# For AWS Bedrock
pip install agentic-ai-gateway[bedrock]

# For OpenAI
pip install agentic-ai-gateway[openai]

# With Redis caching (v0.5.0+)
pip install agentic-ai-gateway[redis]

# For everything (cross-provider + redis)
pip install agentic-ai-gateway[all]

Quick Start

AWS Bedrock

from agentic_ai_gateway import create_bedrock_gateway

# Create gateway with automatic fallback
gateway = create_bedrock_gateway(
    primary_model="anthropic.claude-3-sonnet-20240229-v1:0",
    fallback_models=["anthropic.claude-3-haiku-20240307-v1:0"],
    region="us-east-1"
)

# Use it - if Claude Sonnet fails, automatically tries Haiku
response = gateway.invoke("What is the capital of France?")
print(response.content)  # "The capital of France is Paris."
print(response.model_used)  # Shows which model actually responded
print(response.fallback_used)  # True if primary failed

OpenAI

from agentic_ai_gateway import create_openai_gateway

gateway = create_openai_gateway(
    primary_model="gpt-4o",
    fallback_models=["gpt-4o-mini"],
    api_key="sk-..."
)

response = gateway.invoke("Explain quantum computing")

Cross-Provider Fallback

from agentic_ai_gateway import create_multi_provider_gateway

# Ultimate resilience: fall back across providers
gateway = create_multi_provider_gateway(
    primary_model="anthropic.claude-3-sonnet-20240229-v1:0",
    fallback_models=[
        "anthropic.claude-3-haiku-20240307-v1:0",  # Bedrock fallback
        "gpt-4o-mini",  # OpenAI fallback
    ],
    bedrock_region="us-east-1",
    openai_api_key="sk-..."
)

response = gateway.invoke("Summarize this document...")
# Tries Claude Sonnet → Claude Haiku → GPT-4o Mini

Canary Deployments

Test new models on a percentage of traffic:

from agentic_ai_gateway import create_bedrock_gateway

gateway = create_bedrock_gateway(
    primary_model="anthropic.claude-3-sonnet-20240229-v1:0",
    canary_model="anthropic.claude-3-5-sonnet-20241022-v2:0",
    canary_percentage=10,  # 10% traffic to Claude 3.5
    fallback_models=["anthropic.claude-3-haiku-20240307-v1:0"]
)

# 90% of requests go to Claude 3 Sonnet
# 10% of requests go to Claude 3.5 Sonnet (canary)
response = gateway.invoke("Hello!")
print(response.canary_used)  # True if canary was selected

Gradual Rollout

# Week 1: 5% canary
gateway.update_config(canary_percentage=5)

# Week 2: 20% canary (metrics look good)
gateway.update_config(canary_percentage=20)

# Week 3: 50% canary
gateway.update_config(canary_percentage=50)

# Week 4: Promote canary to primary
gateway.update_config(
    primary_model="anthropic.claude-3-5-sonnet-20241022-v2:0",
    canary_model=None,
    canary_percentage=0
)

Monitoring

Built-in metrics tracking:

# After running some requests
metrics = gateway.get_metrics()

print(metrics)
# {
#     "total_invocations": 1000,
#     "total_errors": 12,
#     "error_rate": 0.012,
#     "fallback_rate": 0.03,
#     "avg_latency_ms": 1250,
#     "by_model": {
#         "anthropic.claude-3-sonnet...": {"invocations": 900, "errors": 10},
#         "anthropic.claude-3-haiku...": {"invocations": 100, "errors": 2}
#     }
# }

CloudWatch Integration

import boto3
from agentic_ai_gateway import AgenticGateway, AgenticGatewayConfig, BedrockProvider

class CloudWatchMetrics:
    def __init__(self, namespace="AgenticGateway"):
        self.cloudwatch = boto3.client("cloudwatch")
        self.namespace = namespace

    def record(self, model_id, latency_ms, success, is_canary, is_fallback, error=None):
        self.cloudwatch.put_metric_data(
            Namespace=self.namespace,
            MetricData=[
                {
                    "MetricName": "Invocations",
                    "Value": 1,
                    "Dimensions": [
                        {"Name": "ModelId", "Value": model_id},
                        {"Name": "Success", "Value": str(success)}
                    ]
                },
                {
                    "MetricName": "Latency",
                    "Value": latency_ms,
                    "Unit": "Milliseconds",
                    "Dimensions": [{"Name": "ModelId", "Value": model_id}]
                }
            ]
        )

# Use custom metrics
gateway = AgenticGateway(
    config=AgenticGatewayConfig(
        primary_model="anthropic.claude-3-sonnet-20240229-v1:0",
        fallback_models=["anthropic.claude-3-haiku-20240307-v1:0"]
    ),
    providers=[BedrockProvider()],
    metrics=CloudWatchMetrics()
)

Custom Providers

Add support for any LLM provider:

from agentic_ai_gateway import AgenticGateway, AgenticGatewayConfig, LLMProvider

class AnthropicDirectProvider(LLMProvider):
    def __init__(self, api_key: str):
        import anthropic
        self.client = anthropic.Anthropic(api_key=api_key)

    def supports_model(self, model_id: str) -> bool:
        return "claude" in model_id and "anthropic." not in model_id

    def invoke(self, model_id: str, prompt: str, **kwargs):
        response = self.client.messages.create(
            model=model_id,
            max_tokens=kwargs.get("max_tokens", 1024),
            messages=[{"role": "user", "content": prompt}]
        )
        content = response.content[0].text
        return content, response.usage.input_tokens, response.usage.output_tokens

# Use it
gateway = AgenticGateway(
    config=AgenticGatewayConfig(primary_model="claude-3-opus-20240229"),
    providers=[AnthropicDirectProvider(api_key="sk-...")]
)

Multi-Agent Tool Calling

For multi-agent workflows that need tool calling, use the converse() method:

from agentic_ai_gateway import create_bedrock_gateway

gateway = create_bedrock_gateway(
    primary_model="anthropic.claude-3-sonnet-20240229-v1:0",
    fallback_models=["anthropic.claude-3-haiku-20240307-v1:0"]
)

# Define tools
tool_config = {
    "tools": [
        {
            "toolSpec": {
                "name": "get_patient_data",
                "description": "Retrieve patient records",
                "inputSchema": {
                    "json": {
                        "type": "object",
                        "properties": {
                            "patient_id": {"type": "string"}
                        },
                        "required": ["patient_id"]
                    }
                }
            }
        }
    ]
}

# Use converse() with tool calling - includes automatic fallback
result = gateway.converse(
    messages=[{
        "role": "user",
        "content": [{"text": "Look up patient P001"}]
    }],
    system=[{"text": "You are a healthcare assistant."}],
    tool_config=tool_config,
    inference_config={"maxTokens": 4096, "temperature": 0.1}
)

print(f"Model used: {result['model_used']}")
print(f"Fallback used: {result['fallback_used']}")

# Access raw Bedrock response
response = result["response"]

RAG Pipeline Integration

Integrate with your RAG pipeline for resilient document Q&A:

from agentic_ai_gateway import create_bedrock_gateway

gateway = create_bedrock_gateway(
    primary_model="anthropic.claude-3-sonnet-20240229-v1:0",
    fallback_models=["anthropic.claude-3-haiku-20240307-v1:0"],
    canary_model="anthropic.claude-3-5-sonnet-20241022-v2:0",
    canary_percentage=10  # A/B test new model
)

def rag_query(question: str, context_chunks: list[str]) -> dict:
    """RAG query with automatic fallback."""
    prompt = f"""Answer based on context:

Context:
{chr(10).join(context_chunks)}

Question: {question}"""

    response = gateway.invoke(prompt, max_tokens=500, temperature=0.3)

    return {
        "answer": response.content,
        "model_used": response.model_used,
        "fallback_used": response.fallback_used
    }

Async Support

import asyncio
from agentic_ai_gateway import create_bedrock_gateway

gateway = create_bedrock_gateway()

async def main():
    response = await gateway.ainvoke("Hello async world!")
    print(response.content)

asyncio.run(main())

v0.6.0 Features

Cost Tracking & Budget Alerts

Track LLM costs and prevent surprise bills:

from agentic_ai_gateway import CostTrackedGateway, BudgetConfig, BudgetPeriod

gateway = CostTrackedGateway(
    gateway=base_gateway,
    budget=BudgetConfig(
        limit=10.00,  # $10/day
        period=BudgetPeriod.DAILY,
        alert_threshold=0.8,  # Alert at 80%
        on_alert=lambda curr, limit: slack_notify(f"LLM spend: ${curr:.2f}/${limit}"),
        block_on_exceeded=True  # Stop requests when budget hit
    )
)

# Every request tracks cost
response = gateway.invoke("Summarize this document...")
print(f"Cost: ${response.cost:.4f}")

# Get usage stats
stats = gateway.get_cost_stats()
print(f"Today: ${stats.total_cost:.2f}")
print(f"By model: {stats.by_model}")

Multi-Tenant Cost Isolation

Track spend per customer/tenant:

# Track costs per tenant
response = gateway.invoke("Hello", tenant_id="customer-123")
response = gateway.invoke("World", tenant_id="customer-456")

stats = gateway.get_cost_stats()
print(stats.by_tenant)
# {"customer-123": 0.003, "customer-456": 0.002}

# Export for billing
csv_data = gateway.export_records(format="csv")

Enterprise Integrations

Connect cost tracking to your production monitoring stack:

Slack Alerts

from agentic_ai_gateway import CostTrackedGateway, BudgetConfig, BudgetPeriod, SlackAlerter

slack = SlackAlerter(
    webhook_url="https://hooks.slack.com/services/T.../B.../xxx",
    channel="#llm-costs",
    mention_on_critical="@oncall"
)

gateway = CostTrackedGateway(
    gateway=base_gateway,
    budget=BudgetConfig(
        limit=100.00,
        period=BudgetPeriod.DAILY,
        alert_threshold=0.8,
        on_alert=slack.send_alert
    )
)

CloudWatch Metrics

from agentic_ai_gateway import CostTrackedGateway, CloudWatchCostMetrics

cw_metrics = CloudWatchCostMetrics(
    namespace="MyApp/LLMCosts",
    region="us-east-1"
)

# Push metrics after each request
response = gateway.invoke("Hello")
cw_metrics.push(gateway.tracker.get_stats())

# Or push periodically
import threading
def push_metrics():
    while True:
        cw_metrics.push(gateway.tracker.get_stats())
        time.sleep(60)

threading.Thread(target=push_metrics, daemon=True).start()

DataDog Metrics

from agentic_ai_gateway import DataDogCostMetrics

dd_metrics = DataDogCostMetrics(
    api_key="your-datadog-api-key",
    app_key="your-datadog-app-key",
    tags=["env:production", "service:chat-api"]
)

# Push to DataDog
dd_metrics.push(gateway.tracker.get_stats())

S3 Export (for Athena/QuickSight)

from agentic_ai_gateway import S3CostExporter

exporter = S3CostExporter(
    bucket="my-llm-analytics",
    prefix="costs/",
    region="us-east-1"
)

# Export daily costs (e.g., from cron job)
records = gateway.tracker.get_records(
    start_time=datetime.now() - timedelta(days=1)
)
exporter.export(records, partition_by="day")
# Writes to: s3://my-llm-analytics/costs/year=2024/month=01/day=15/costs.parquet

Custom Webhook

from agentic_ai_gateway import WebhookExporter

webhook = WebhookExporter(
    url="https://your-api.com/llm-costs",
    headers={"Authorization": "Bearer xxx"},
    batch_size=100
)

# Export records to your internal systems
webhook.export(gateway.tracker.get_records())

MCP Integration (Model Context Protocol)

Let Claude query your cost data directly via MCP:

from agentic_ai_gateway import MCPCostServer, CostTrackedGateway

# Create cost-tracked gateway
gateway = CostTrackedGateway(gateway=base_gateway, budget=budget_config)

# Create MCP server
mcp = MCPCostServer(tracker=gateway.tracker)

# Mount on FastAPI
from fastapi import FastAPI
app = FastAPI()
app.include_router(mcp.to_fastapi_routes(), prefix="/mcp")

Claude Desktop Config (claude_desktop_config.json):

{
    "mcpServers": {
        "llm-costs": {
            "url": "http://localhost:8000/mcp"
        }
    }
}

Available MCP Tools:

Tool Description
get_cost_stats Current spend, token counts, period stats
get_cost_by_model Cost breakdown by model
get_cost_by_tenant Cost breakdown by tenant/customer
get_budget_status Budget utilization and remaining
get_recent_requests Recent LLM requests with costs

Now Claude can answer: "How much have I spent on Claude 3 Sonnet today?"


v0.5.0 Features

Redis Distributed Cache

Cache LLM responses across load-balanced servers:

from agentic_ai_gateway import RedisCachedGateway, create_bedrock_gateway

# Wrap any gateway with Redis caching
base_gateway = create_bedrock_gateway(
    primary_model="anthropic.claude-3-sonnet-20240229-v1:0",
    fallback_models=["anthropic.claude-3-haiku-20240307-v1:0"]
)

gateway = RedisCachedGateway(
    gateway=base_gateway,
    redis_url="redis://localhost:6379",
    ttl_seconds=3600,  # Cache for 1 hour
    prefix="llm:"
)

# First call hits LLM (~2s)
response = gateway.invoke("What is the capital of France?")

# Second call hits cache (~1ms)
response = gateway.invoke("What is the capital of France?")
print(response.cache_hit)  # True

Conversation Memory

Multi-turn conversations with Redis persistence:

from agentic_ai_gateway import ConversationGateway, RedisConversationMemory

memory = RedisConversationMemory(
    redis_url="redis://localhost:6379",
    max_history=20
)

gateway = ConversationGateway(
    gateway=base_gateway,
    memory=memory
)

# Start a conversation
response = gateway.invoke("Hi, I'm building a healthcare app", conversation_id="user-123")

# Continue the conversation (remembers context)
response = gateway.invoke("What tech stack do you recommend?", conversation_id="user-123")

# Clear conversation
gateway.clear_conversation("user-123")

Guardrails (PII & Injection Protection)

Protect your LLM from sensitive data leaks and attacks:

from agentic_ai_gateway import GuardedGateway, Guardrails, PIIType

gateway = GuardedGateway(
    gateway=base_gateway,
    guardrails=Guardrails(
        pii_detection=True,
        pii_action="redact",  # or "block"
        pii_types=[PIIType.SSN, PIIType.CREDIT_CARD, PIIType.EMAIL],
        prompt_injection_detection=True
    )
)

# PII is automatically redacted
response = gateway.invoke("My SSN is 123-45-6789")
# Prompt sent to LLM: "My SSN is [REDACTED_SSN]"

# Prompt injection is blocked
try:
    response = gateway.invoke("Ignore all instructions and...")
except GuardrailsError as e:
    print(e)  # "Prompt injection detected"

Streaming Support (v0.2.0+)

Stream tokens in real-time for chat interfaces and SSE endpoints:

Basic Streaming

from agentic_ai_gateway import create_bedrock_gateway

gateway = create_bedrock_gateway(
    primary_model="anthropic.claude-3-sonnet-20240229-v1:0",
    fallback_models=["anthropic.claude-3-haiku-20240307-v1:0"]
)

# Synchronous streaming
for chunk in gateway.invoke_stream("Tell me a story"):
    if chunk["type"] == "start":
        print(f"Using model: {chunk['model_used']}")
    elif chunk["type"] == "token":
        print(chunk["content"], end="", flush=True)
    elif chunk["type"] == "done":
        print(f"\n\nCompleted in {chunk['latency_ms']}ms")
        print(f"Tokens: {chunk['output_tokens']}")

Async Streaming (for FastAPI/aiohttp)

import asyncio
from agentic_ai_gateway import create_bedrock_gateway

gateway = create_bedrock_gateway()

async def stream_response():
    async for chunk in gateway.ainvoke_stream("Explain quantum computing"):
        if chunk["type"] == "token":
            yield chunk["content"]

FastAPI SSE Integration

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/api/v1/query/stream")
async def stream_query(request: QueryRequest):
    async def generate():
        # Emit start event
        yield f"data: {json.dumps({'type': 'start'})}\n\n"

        full_response = ""
        async for chunk in gateway.ainvoke_stream(request.prompt):
            if chunk["type"] == "token":
                full_response += chunk.get("content", "")
                yield f"data: {json.dumps({'type': 'token', 'content': chunk.get('content', '')})}\n\n"
            elif chunk["type"] == "done":
                yield f"data: {json.dumps({'type': 'done', 'model_used': chunk.get('model_used', 'unknown'), 'fallback_used': chunk.get('fallback_used', False)})}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Streaming Event Types

The streaming API yields dictionaries with the following types:

Event Type Description Fields
start Stream started model_used, fallback_used, canary_used
token Content token content (the token text)
done Stream complete model_used, latency_ms, input_tokens, output_tokens, fallback_used
error Error occurred error (error message)

Streaming with Fallback

Streaming includes automatic fallback support. If the primary model fails before streaming begins, the gateway automatically tries fallback models:

# If Claude Sonnet fails during connection, automatically tries Haiku
for chunk in gateway.invoke_stream("Hello"):
    if chunk["type"] == "start":
        if chunk["fallback_used"]:
            print(f"⚠️ Using fallback model: {chunk['model_used']}")
    # ... handle other events

Note: Once streaming has started successfully, if an error occurs mid-stream, the gateway will emit an error event rather than attempting fallback (since partial content has already been delivered).

Examples

See the examples/ directory for complete integration examples:

Why Not Just Use...

Approach Limitation
Direct API calls No fallback, crashes on errors
Try/except wrapper Manual, error-prone, no canary
API Gateway (AWS) Doesn't understand LLM-specific routing
SageMaker endpoints Overkill for routing, designed for hosting

Agentic AI Gateway is purpose-built for LLM routing:

  • Model-aware fallback chains
  • Canary deployments with gradual rollout
  • Multi-provider support (Bedrock + OpenAI + custom)
  • Cost tracking with budget alerts (v0.6.0)
  • Multi-tenant cost isolation (v0.6.0)
  • Redis distributed cache for load-balanced apps (v0.5.0)
  • Conversation memory with persistence (v0.5.0)
  • Guardrails: PII detection & prompt injection defense (v0.5.0)
  • Zero infrastructure (it's just Python code)

Author

Tyler Canton - AI/ML Engineer specializing in production LLM systems

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License - Copyright (c) 2026 Tyler Canton

See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentic_ai_gateway-0.6.0.tar.gz (12.8 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentic_ai_gateway-0.6.0-py3-none-any.whl (69.5 kB view details)

Uploaded Python 3

File details

Details for the file agentic_ai_gateway-0.6.0.tar.gz.

File metadata

  • Download URL: agentic_ai_gateway-0.6.0.tar.gz
  • Upload date:
  • Size: 12.8 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for agentic_ai_gateway-0.6.0.tar.gz
Algorithm Hash digest
SHA256 368bb1f4efb5e25daeae88e39e54becd1f808c44d1a8cc13a94dbfd9134e4619
MD5 37a26c2f7256210f04a25b0bbfd6dba9
BLAKE2b-256 e055b6ff08bcd96f6e6426c8703f6d5de7e952ed775fab3836a8038ef6b10fe4

See more details on using hashes here.

Provenance

The following attestation bundles were made for agentic_ai_gateway-0.6.0.tar.gz:

Publisher: publish.yml on tyler-canton/agentic-ai-gateway

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file agentic_ai_gateway-0.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for agentic_ai_gateway-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f3404572e30761cffd151aaa54e9d427e98d3b44492121691ceecc91d757d65b
MD5 57a772df01a4d8f71f8ab8e3a49eb67d
BLAKE2b-256 95f548663527d44d2faf1ab048cef2204d8fe72fa8dcdacdb3a7eab7e520e03c

See more details on using hashes here.

Provenance

The following attestation bundles were made for agentic_ai_gateway-0.6.0-py3-none-any.whl:

Publisher: publish.yml on tyler-canton/agentic-ai-gateway

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page