Skip to main content

Production-grade LangChain integration for AIS Protocol - Connect AIS agents to LangGraph workflows

Project description

ais-langchain

Production-grade LangChain integration for AIS Protocol

Connect AIS agents to modern LangGraph workflows with enterprise resilience features.

PyPI version License Python


๐ŸŽฏ What is This?

This package enables seamless integration between AIS Protocol agents and LangChain's modern LangGraph framework.

Think: HTTP for AI agents + LangChain's powerful workflows = Multi-Agent Nirvana ๐Ÿš€


โœจ Features

๐Ÿ—๏ธ Production-Ready

  • โœ… Modern LangGraph - Uses latest langgraph with create_react_agent
  • โœ… Automatic Retry - Exponential backoff with configurable jitter
  • โœ… Circuit Breaker - Prevents cascading failures
  • โœ… Response Caching - Reduce latency up to 160x
  • โœ… Connection Pooling - Efficient resource usage
  • โœ… Structured Logging - Production-grade observability
  • โœ… Performance Metrics - Track latency, success/failure rates
  • โœ… Health Checks - Monitor agent availability
  • โœ… Type Safety - Full Python type hints

๐ŸŽญ Multi-Agent Orchestration

  • โœ… ManagedAISTools - Coordinate multiple specialized agents
  • โœ… Dynamic Routing - Route to agents based on capabilities
  • โœ… Capability Discovery - Automatic tool generation
  • โœ… Session Management - Stateful multi-turn conversations

๐Ÿš€ Quick Start

Installation

pip install ais-langchain ais-protocol langchain-core langgraph langchain-openai

Basic Usage

import asyncio
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from ais_protocol import AISClient
from ais_langchain import create_enhanced_ais_tool, EnhancedAISToolConfig, RetryConfig, CircuitBreakerConfig
from pydantic import BaseModel, Field

# 1. Connect to AIS agent
client = AISClient(
    agent_id='agent://example.com/my-client',
    agent_name='My Client'
)

await client.connect('http://localhost:8000')

# 2. Define schema
class CalculateArgs(BaseModel):
    operation: str = Field(description="Operation: add, subtract, multiply, divide")
    a: float = Field(description="First number")
    b: float = Field(description="Second number")

# 3. Create production-grade tool
calculator_tool = create_enhanced_ais_tool(EnhancedAISToolConfig(
    client=client,
    capability='calculate',
    args_schema=CalculateArgs,
    retry=RetryConfig(max_attempts=3),
    circuit_breaker=CircuitBreakerConfig(failure_threshold=5),
    cache=True
))

# 4. Use in LangGraph
model = ChatOpenAI(model='gpt-4o-mini')
agent = create_react_agent(model, [calculator_tool])

# 5. Run!
result = await agent.ainvoke({
    'messages': [{'role': 'user', 'content': 'What is 42 times 17?'}]
})

๐ŸŽ“ Examples

Simple Tool

from ais_langchain import create_ais_tool

# Basic tool (no resilience features)
simple_tool = create_ais_tool(
    client=client,
    capability='greet'
)

Production-Grade Tool

from ais_langchain import (
    create_enhanced_ais_tool,
    EnhancedAISToolConfig,
    Logger,
    LoggerConfig,
    LogLevel,
    MetricsCollector,
    RetryConfig,
    CircuitBreakerConfig,
)

logger = Logger.get_instance_sync(LoggerConfig(level=LogLevel.INFO, pretty=True))
metrics = MetricsCollector()

production_tool = create_enhanced_ais_tool(EnhancedAISToolConfig(
    client=client,
    capability='process_data',
    retry=RetryConfig(
        max_attempts=3,
        initial_delay=1.0,
        backoff_multiplier=2.0
    ),
    circuit_breaker=CircuitBreakerConfig(
        failure_threshold=5,
        reset_timeout=30.0
    ),
    cache=True,
    cache_ttl=60.0,
    logger=logger,
    metrics=metrics
))

Multi-Agent Management

from ais_langchain import ManagedAISTools

# Create managed tools for multiple agents
managed = ManagedAISTools(client)

tools = managed.create_all_tools(
    retry=RetryConfig(max_attempts=3),
    circuit_breaker=CircuitBreakerConfig(failure_threshold=5),
    cache=True,
    schemas={
        'calculate': CalculateArgs,
        'process_text': ProcessTextArgs
    }
)

# Get diagnostics
health = await managed.get_health()
metrics = managed.get_metrics()
diagnostics = await managed.get_diagnostics()

๐Ÿ“Š Performance

Caching Impact

Without caching:
- Average latency: ~800ms per call
- Network overhead: High

With caching (60s TTL):
- First call: ~800ms
- Cached calls: ~5ms
- Speedup: 160x โšก

Resilience Impact

Without retry/circuit breaker:
- Transient failures โ†’ errors
- Cascading failures possible
- Manual recovery needed

With retry + circuit breaker:
- 95%+ success rate with network issues
- Automatic recovery
- Prevents cascade failures
- Self-healing system โœจ

๐Ÿ—๏ธ Architecture

Tool Adapter

Converts AIS capabilities into LangChain tools:

AIS Agent                LangChain
   โ”‚                        โ”‚
   โ”œโ”€ capability_1  โ”€โ†’  Tool 1
   โ”œโ”€ capability_2  โ”€โ†’  Tool 2
   โ””โ”€ capability_3  โ”€โ†’  Tool 3

Resilience Layers

LangGraph Request
    โ”‚
    โ”œโ”€โ†’ Response Cache (optional)
    โ”‚   โ”œโ”€ Hit โ†’ Return cached
    โ”‚   โ””โ”€ Miss โ†’ Continue
    โ”‚
    โ”œโ”€โ†’ Circuit Breaker
    โ”‚   โ”œโ”€ OPEN โ†’ Fail fast
    โ”‚   โ”œโ”€ HALF_OPEN โ†’ Test
    โ”‚   โ””โ”€ CLOSED โ†’ Continue
    โ”‚
    โ”œโ”€โ†’ Retry Logic
    โ”‚   โ”œโ”€ Success โ†’ Return
    โ”‚   โ””โ”€ Failure โ†’ Retry with backoff
    โ”‚
    โ””โ”€โ†’ AIS Agent
        โ””โ”€ Execute capability

๐Ÿ“š API Reference

Core Functions

create_ais_tool()

Create a basic LangChain tool from an AIS capability.

def create_ais_tool(
    client: AISClient,
    capability: str,
    name: Optional[str] = None,
    description: Optional[str] = None,
    args_schema: Optional[Type[BaseModel]] = None,
    timeout: Optional[float] = None,
) -> StructuredTool

create_enhanced_ais_tool()

Create a production-grade tool with resilience features.

@dataclass
class EnhancedAISToolConfig:
    client: AISClient
    capability: str
    name: Optional[str] = None
    description: Optional[str] = None
    args_schema: Optional[Type[BaseModel]] = None
    timeout: Optional[float] = None
    retry: Optional[RetryConfig] = None
    circuit_breaker: Optional[CircuitBreakerConfig] = None
    cache: bool = False
    cache_ttl: float = 60.0
    logger: Optional[Logger] = None
    metrics: Optional[MetricsCollector] = None

ManagedAISTools

Manage multiple tools with shared infrastructure.

managed = ManagedAISTools(
    client,
    logger=logger,
    metrics=metrics,
    cache=cache,
    cache_ttl=60.0
)

# Create all tools
tools = managed.create_all_tools(
    retry=RetryConfig(...),
    circuit_breaker=CircuitBreakerConfig(...),
    cache=True
)

# Get diagnostics
health = await managed.get_health()
metrics = managed.get_metrics()
diagnostics = await managed.get_diagnostics()

Resilience Patterns

with_retry()

Execute function with retry logic.

result = await with_retry(
    lambda: client.call('capability', params),
    RetryConfig(
        max_attempts=3,
        initial_delay=1.0,
        backoff_multiplier=2.0,
        jitter=0.1
    )
)

CircuitBreaker

Implement circuit breaker pattern.

breaker = CircuitBreaker(CircuitBreakerConfig(
    failure_threshold=5,
    reset_timeout=30.0,
    on_open=lambda: print('Circuit OPEN'),
    on_close=lambda: print('Circuit CLOSED')
))

result = await breaker.execute(lambda: some_function())

ResponseCache

Cache responses.

cache = ResponseCache(ttl=60.0)

cached = await cache.get('capability', params)
if not cached:
    result = await client.call('capability', params)
    await cache.set('capability', params, result)

Observability

Logger

Structured logging.

logger = Logger.get_instance_sync(LoggerConfig(
    level=LogLevel.INFO,
    pretty=True
))

logger.debug('Message', {'context': 'data'})
logger.info('Message', {'context': 'data'})
logger.warn('Message', {'context': 'data'})
logger.error('Message', error, {'context': 'data'})

MetricsCollector

Collect performance metrics.

metrics = MetricsCollector()

metrics.increment_counter_sync('requests_total', 1, {'endpoint': '/api'})
metrics.record_histogram_sync('request_duration_ms', 245, {'endpoint': '/api'})
metrics.set_gauge_sync('active_connections', 10)

stats = metrics.get_histogram_stats('request_duration_ms')
print(stats['p95'])  # 95th percentile

HealthChecker

Monitor health.

health = HealthChecker()

async def check_database():
    connected = await db.ping()
    return {
        'status': HealthStatus.HEALTHY if connected else HealthStatus.UNHEALTHY,
        'message': 'DB down' if not connected else 'DB connected'
    }

health.register('database', check_database)

result = await health.check()
print(result.status)  # HEALTHY | DEGRADED | UNHEALTHY

๐ŸŽฏ Use Cases

1. Multi-Framework Integration

LangChain agents calling AutoGPT, CrewAI, or custom agents:

# LangChain โ†’ AIS โ†’ Any Agent Framework
autogpt_tool = create_enhanced_ais_tool(EnhancedAISToolConfig(
    client=autogpt_client,
    capability='research',
    retry=RetryConfig(max_attempts=3),
    cache=True
))

crewai_tool = create_enhanced_ais_tool(EnhancedAISToolConfig(
    client=crewai_client,
    capability='analyze',
    retry=RetryConfig(max_attempts=3),
    cache=True
))

agent = create_react_agent(model, [autogpt_tool, crewai_tool])

2. Microservices for AI

Each capability is an independent service:

math_client = AISClient(...)
await math_client.connect('http://nlp-service:8001')

vision_client = AISClient(...)
await vision_client.connect('http://vision-service:8002')

speech_client = AISClient(...)
await speech_client.connect('http://speech-service:8003')

๐Ÿ† Production Checklist

Before deploying to production:

  • โœ… Configure retry logic for your use case
  • โœ… Set appropriate circuit breaker thresholds
  • โœ… Enable caching for read-heavy workloads
  • โœ… Set up health checks
  • โœ… Monitor performance metrics
  • โœ… Configure structured logging
  • โœ… Set connection pool sizes
  • โœ… Configure timeouts appropriately
  • โœ… Test failure scenarios
  • โœ… Set up alerting

๐Ÿ†˜ Troubleshooting

Common Issues

"Cannot connect to AIS agent"

# Make sure agent is running
curl http://localhost:8000/health

"Circuit breaker is OPEN"

# Reset manually or wait for timeout
await managed_tools.reset_circuit_breakers()

"Cache hit rate is low"

# Check stats
stats = cache.get_stats()
print(stats)

๐Ÿ“ License

Apache-2.0 - See LICENSE for details


๐Ÿค Contributing

Contributions welcome! See CONTRIBUTING.md


๐Ÿ”— Links


๐ŸŽ‰ Built for LangChain

This integration was built with โค๏ธ as a gift to the LangChain community.

Let's make multi-agent AI interoperable! ๐Ÿš€

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ais_langchain-0.1.3.tar.gz (32.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ais_langchain-0.1.3-py3-none-any.whl (24.1 kB view details)

Uploaded Python 3

File details

Details for the file ais_langchain-0.1.3.tar.gz.

File metadata

  • Download URL: ais_langchain-0.1.3.tar.gz
  • Upload date:
  • Size: 32.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.8

File hashes

Hashes for ais_langchain-0.1.3.tar.gz
Algorithm Hash digest
SHA256 f81a713e483b789e2b3720f2f189ea13b26e431808afe384a171fe906c0eb142
MD5 8515303d78df4eff382721a722849f33
BLAKE2b-256 cfe146d19d679a81352f921efb4e3c7fd36f4e7d1e3cd118fcd0a9e6d5108096

See more details on using hashes here.

File details

Details for the file ais_langchain-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: ais_langchain-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 24.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.8

File hashes

Hashes for ais_langchain-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 cd60d5b4cbf0dc4f964fd07661a5b6504cbe56cb7aa638c04628b289019e5dd0
MD5 bbbabbf0b5ff1ae85ba309606d9a4bdd
BLAKE2b-256 6279f5d82662480cf97d1bc2847ca5124352d6c919604d3c31703526c22581ed

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page