Production-grade LangChain integration for AIS Protocol - Connect AIS agents to LangGraph workflows
Project description
ais-langchain
Production-grade LangChain integration for AIS Protocol
Connect AIS agents to modern LangGraph workflows with enterprise resilience features.
๐ฏ What is This?
This package enables seamless integration between AIS Protocol agents and LangChain's modern LangGraph framework.
Think: HTTP for AI agents + LangChain's powerful workflows = Multi-Agent Nirvana ๐
โจ Features
๐๏ธ Production-Ready
- โ
Modern LangGraph - Uses latest
langgraphwithcreate_react_agent - โ Automatic Retry - Exponential backoff with configurable jitter
- โ Circuit Breaker - Prevents cascading failures
- โ Response Caching - Reduce latency up to 160x
- โ Connection Pooling - Efficient resource usage
- โ Structured Logging - Production-grade observability
- โ Performance Metrics - Track latency, success/failure rates
- โ Health Checks - Monitor agent availability
- โ Type Safety - Full Python type hints
๐ญ Multi-Agent Orchestration
- โ ManagedAISTools - Coordinate multiple specialized agents
- โ Dynamic Routing - Route to agents based on capabilities
- โ Capability Discovery - Automatic tool generation
- โ Session Management - Stateful multi-turn conversations
๐ Quick Start
Installation
pip install ais-langchain ais-protocol langchain-core langgraph langchain-openai
Basic Usage
import asyncio
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from ais_protocol import AISClient
from ais_langchain import create_enhanced_ais_tool, EnhancedAISToolConfig, RetryConfig, CircuitBreakerConfig
from pydantic import BaseModel, Field
# 1. Connect to AIS agent
client = AISClient(
agent_id='agent://example.com/my-client',
agent_name='My Client'
)
await client.connect('http://localhost:8000')
# 2. Define schema
class CalculateArgs(BaseModel):
operation: str = Field(description="Operation: add, subtract, multiply, divide")
a: float = Field(description="First number")
b: float = Field(description="Second number")
# 3. Create production-grade tool
calculator_tool = create_enhanced_ais_tool(EnhancedAISToolConfig(
client=client,
capability='calculate',
args_schema=CalculateArgs,
retry=RetryConfig(max_attempts=3),
circuit_breaker=CircuitBreakerConfig(failure_threshold=5),
cache=True
))
# 4. Use in LangGraph
model = ChatOpenAI(model='gpt-4o-mini')
agent = create_react_agent(model, [calculator_tool])
# 5. Run!
result = await agent.ainvoke({
'messages': [{'role': 'user', 'content': 'What is 42 times 17?'}]
})
๐ Examples
Simple Tool
from ais_langchain import create_ais_tool
# Basic tool (no resilience features)
simple_tool = create_ais_tool(
client=client,
capability='greet'
)
Production-Grade Tool
from ais_langchain import (
create_enhanced_ais_tool,
EnhancedAISToolConfig,
Logger,
LoggerConfig,
LogLevel,
MetricsCollector,
RetryConfig,
CircuitBreakerConfig,
)
logger = Logger.get_instance_sync(LoggerConfig(level=LogLevel.INFO, pretty=True))
metrics = MetricsCollector()
production_tool = create_enhanced_ais_tool(EnhancedAISToolConfig(
client=client,
capability='process_data',
retry=RetryConfig(
max_attempts=3,
initial_delay=1.0,
backoff_multiplier=2.0
),
circuit_breaker=CircuitBreakerConfig(
failure_threshold=5,
reset_timeout=30.0
),
cache=True,
cache_ttl=60.0,
logger=logger,
metrics=metrics
))
Multi-Agent Management
from ais_langchain import ManagedAISTools
# Create managed tools for multiple agents
managed = ManagedAISTools(client)
tools = managed.create_all_tools(
retry=RetryConfig(max_attempts=3),
circuit_breaker=CircuitBreakerConfig(failure_threshold=5),
cache=True,
schemas={
'calculate': CalculateArgs,
'process_text': ProcessTextArgs
}
)
# Get diagnostics
health = await managed.get_health()
metrics = managed.get_metrics()
diagnostics = await managed.get_diagnostics()
๐ Performance
Caching Impact
Without caching:
- Average latency: ~800ms per call
- Network overhead: High
With caching (60s TTL):
- First call: ~800ms
- Cached calls: ~5ms
- Speedup: 160x โก
Resilience Impact
Without retry/circuit breaker:
- Transient failures โ errors
- Cascading failures possible
- Manual recovery needed
With retry + circuit breaker:
- 95%+ success rate with network issues
- Automatic recovery
- Prevents cascade failures
- Self-healing system โจ
๐๏ธ Architecture
Tool Adapter
Converts AIS capabilities into LangChain tools:
AIS Agent LangChain
โ โ
โโ capability_1 โโ Tool 1
โโ capability_2 โโ Tool 2
โโ capability_3 โโ Tool 3
Resilience Layers
LangGraph Request
โ
โโโ Response Cache (optional)
โ โโ Hit โ Return cached
โ โโ Miss โ Continue
โ
โโโ Circuit Breaker
โ โโ OPEN โ Fail fast
โ โโ HALF_OPEN โ Test
โ โโ CLOSED โ Continue
โ
โโโ Retry Logic
โ โโ Success โ Return
โ โโ Failure โ Retry with backoff
โ
โโโ AIS Agent
โโ Execute capability
๐ API Reference
Core Functions
create_ais_tool()
Create a basic LangChain tool from an AIS capability.
def create_ais_tool(
client: AISClient,
capability: str,
name: Optional[str] = None,
description: Optional[str] = None,
args_schema: Optional[Type[BaseModel]] = None,
timeout: Optional[float] = None,
) -> StructuredTool
create_enhanced_ais_tool()
Create a production-grade tool with resilience features.
@dataclass
class EnhancedAISToolConfig:
client: AISClient
capability: str
name: Optional[str] = None
description: Optional[str] = None
args_schema: Optional[Type[BaseModel]] = None
timeout: Optional[float] = None
retry: Optional[RetryConfig] = None
circuit_breaker: Optional[CircuitBreakerConfig] = None
cache: bool = False
cache_ttl: float = 60.0
logger: Optional[Logger] = None
metrics: Optional[MetricsCollector] = None
ManagedAISTools
Manage multiple tools with shared infrastructure.
managed = ManagedAISTools(
client,
logger=logger,
metrics=metrics,
cache=cache,
cache_ttl=60.0
)
# Create all tools
tools = managed.create_all_tools(
retry=RetryConfig(...),
circuit_breaker=CircuitBreakerConfig(...),
cache=True
)
# Get diagnostics
health = await managed.get_health()
metrics = managed.get_metrics()
diagnostics = await managed.get_diagnostics()
Resilience Patterns
with_retry()
Execute function with retry logic.
result = await with_retry(
lambda: client.call('capability', params),
RetryConfig(
max_attempts=3,
initial_delay=1.0,
backoff_multiplier=2.0,
jitter=0.1
)
)
CircuitBreaker
Implement circuit breaker pattern.
breaker = CircuitBreaker(CircuitBreakerConfig(
failure_threshold=5,
reset_timeout=30.0,
on_open=lambda: print('Circuit OPEN'),
on_close=lambda: print('Circuit CLOSED')
))
result = await breaker.execute(lambda: some_function())
ResponseCache
Cache responses.
cache = ResponseCache(ttl=60.0)
cached = await cache.get('capability', params)
if not cached:
result = await client.call('capability', params)
await cache.set('capability', params, result)
Observability
Logger
Structured logging.
logger = Logger.get_instance_sync(LoggerConfig(
level=LogLevel.INFO,
pretty=True
))
logger.debug('Message', {'context': 'data'})
logger.info('Message', {'context': 'data'})
logger.warn('Message', {'context': 'data'})
logger.error('Message', error, {'context': 'data'})
MetricsCollector
Collect performance metrics.
metrics = MetricsCollector()
metrics.increment_counter_sync('requests_total', 1, {'endpoint': '/api'})
metrics.record_histogram_sync('request_duration_ms', 245, {'endpoint': '/api'})
metrics.set_gauge_sync('active_connections', 10)
stats = metrics.get_histogram_stats('request_duration_ms')
print(stats['p95']) # 95th percentile
HealthChecker
Monitor health.
health = HealthChecker()
async def check_database():
connected = await db.ping()
return {
'status': HealthStatus.HEALTHY if connected else HealthStatus.UNHEALTHY,
'message': 'DB down' if not connected else 'DB connected'
}
health.register('database', check_database)
result = await health.check()
print(result.status) # HEALTHY | DEGRADED | UNHEALTHY
๐ฏ Use Cases
1. Multi-Framework Integration
LangChain agents calling AutoGPT, CrewAI, or custom agents:
# LangChain โ AIS โ Any Agent Framework
autogpt_tool = create_enhanced_ais_tool(EnhancedAISToolConfig(
client=autogpt_client,
capability='research',
retry=RetryConfig(max_attempts=3),
cache=True
))
crewai_tool = create_enhanced_ais_tool(EnhancedAISToolConfig(
client=crewai_client,
capability='analyze',
retry=RetryConfig(max_attempts=3),
cache=True
))
agent = create_react_agent(model, [autogpt_tool, crewai_tool])
2. Microservices for AI
Each capability is an independent service:
math_client = AISClient(...)
await math_client.connect('http://nlp-service:8001')
vision_client = AISClient(...)
await vision_client.connect('http://vision-service:8002')
speech_client = AISClient(...)
await speech_client.connect('http://speech-service:8003')
๐ Production Checklist
Before deploying to production:
- โ Configure retry logic for your use case
- โ Set appropriate circuit breaker thresholds
- โ Enable caching for read-heavy workloads
- โ Set up health checks
- โ Monitor performance metrics
- โ Configure structured logging
- โ Set connection pool sizes
- โ Configure timeouts appropriately
- โ Test failure scenarios
- โ Set up alerting
๐ Troubleshooting
Common Issues
"Cannot connect to AIS agent"
# Make sure agent is running
curl http://localhost:8000/health
"Circuit breaker is OPEN"
# Reset manually or wait for timeout
await managed_tools.reset_circuit_breakers()
"Cache hit rate is low"
# Check stats
stats = cache.get_stats()
print(stats)
๐ License
Apache-2.0 - See LICENSE for details
๐ค Contributing
Contributions welcome! See CONTRIBUTING.md
๐ Links
- PyPI: https://pypi.org/project/ais-langchain/
- GitHub: https://github.com/ais-protocol/ais-langchain-python
- AIS Protocol: https://github.com/ais-protocol/ais-python
- LangChain: https://python.langchain.com/
๐ Built for LangChain
This integration was built with โค๏ธ as a gift to the LangChain community.
Let's make multi-agent AI interoperable! ๐
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ais_langchain-0.1.1.tar.gz.
File metadata
- Download URL: ais_langchain-0.1.1.tar.gz
- Upload date:
- Size: 32.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4b27cfe3d382f32403c5b9f93824303fca549239f1d141403361c5dda08bde59
|
|
| MD5 |
a31c38b7af9e6e26beb031f212cc53bc
|
|
| BLAKE2b-256 |
23f74729bcf145ea095582e23116d30b437af191898b6f4360fdc9955c0d5b25
|
File details
Details for the file ais_langchain-0.1.1-py3-none-any.whl.
File metadata
- Download URL: ais_langchain-0.1.1-py3-none-any.whl
- Upload date:
- Size: 24.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
29a1c946690d598af784159ca89e825f8e5b718903ec01dc08513f10ff73a0fd
|
|
| MD5 |
1dea04e183191d5fc1687d6bbfb6a919
|
|
| BLAKE2b-256 |
7782cc5047a386b1ef863a4bc42c7fc578e158d25be2b14e4a49c914b4ab058f
|