Full AI orchestration engine: LangGraph + LlamaIndex + MCP + A2A in 20 lines
Project description
nexus-ai-orchestrator
Full AI orchestration engine — LangGraph multi-turn memory, tool execution, streaming, multi-agent supervision, and voice I/O in one library.
import asyncio
from nexus import Nexus, NexusConfig, LLMConfig, LLMProvider
config = NexusConfig(llm=LLMConfig(provider=LLMProvider.OPENAI, model="gpt-4o-mini"))
async def main():
async with await Nexus.create(config) as nexus:
reply = await nexus.run("What is the capital of France?")
print(reply) # Paris
asyncio.run(main())
Install
pip install nexus-ai-orchestrator[anthropic] # Anthropic / Claude
pip install nexus-ai-orchestrator[openai] # OpenAI / GPT
pip install nexus-ai-orchestrator[google] # Google / Gemini
pip install nexus-ai-orchestrator[all] # all providers + redis + postgres
Features
Multi-turn memory
async with await Nexus.create(config) as nx:
await nx.run("My name is Alice.", thread_id="user-123")
reply = await nx.run("What's my name?", thread_id="user-123")
print(reply) # Alice
Streaming
async with await Nexus.create(config) as nx:
async for token in nx.stream("Tell me a story"):
print(token, end="", flush=True)
Typed event streaming (tool spinners, cost tickers)
async with await Nexus.create(config) as nx:
async for event in nx.stream_events("Search the web for nexus-ai"):
if event["type"] == "token":
print(event["data"], end="")
elif event["type"] == "tool_start":
print(f"\n[calling {event['data']}...]")
Tool registration
from nexus import ToolDefinition, ToolParameter, ToolSource
async def get_weather(city: str) -> str:
return f"Sunny and 72°F in {city}"
tool = ToolDefinition(
name="get_weather",
description="Returns current weather for a city.",
parameters=[ToolParameter(name="city", type="string", description="City name", required=True)],
source=ToolSource.NATIVE,
handler=get_weather,
)
async with await Nexus.create(config) as nx:
nx.add_tool(tool)
reply = await nx.run("What's the weather in Tokyo?")
Multi-agent supervision
from nexus.agents import SubAgent, SubAgentConfig, Supervisor
async with await Nexus.create(config) as nx:
researcher = SubAgent(SubAgentConfig(name="Researcher", role="Research and find facts"))
writer = SubAgent(SubAgentConfig(name="Writer", role="Write clear summaries"))
crew = Supervisor([researcher, writer], nexus=nx)
reply = await crew.run("Research and summarise the history of the internet")
Performance monitoring — auto-fire bad agents
from nexus.agents import PerformanceMonitor
monitor = PerformanceMonitor(consecutive_threshold=3, on_fired=lambda name, reason: print(f"{name} fired: {reason}"))
crew = Supervisor([agent], nexus=nx, monitor=monitor)
Branch threads (fork conversation history)
async with await Nexus.create(config) as nx:
await nx.run("The answer is 42.", thread_id="main")
await nx.branch_thread("main", "branch-a")
# branch-a starts with full context from main
reply = await nx.run("What is the answer?", thread_id="branch-a")
User profile system prompts
from nexus import NexusConfig
from nexus.memory.profile import UserProfile
async with await Nexus.create(config) as nx:
nx._profile_store.save("alice", {"name": "Alice", "language": "French"})
reply = await nx.run("Bonjour!", user_id="alice") # responds with Alice's context
Batch runs
async with await Nexus.create(config) as nx:
results = await nx.batch_run(["Q1", "Q2", "Q3"], concurrency=3)
Human-in-the-loop approval
async def my_approval(tool_calls):
for call in tool_calls:
print(f"Approve {call['name']}({call['args']})? [y/n]")
if input() != "y":
return None
return tool_calls
async with await Nexus.create(config) as nx:
reply = await nx.run_with_approval("Delete old files", approval_callback=my_approval)
Voice I/O (STT → LLM → TTS)
from nexus import NexusConfig
from nexus.core.config import VoiceConfig
config = NexusConfig(llm=..., voice=VoiceConfig(stt_api_key="...", tts_api_key="..."))
async with await Nexus.create(config) as nx:
audio_out = await nx.run_voice(audio_bytes, thread_id="user-123")
Security
- Prompt-injection detection — built-in regex guard with
warnandblockmodes - SQL read-only guard —
SELECT-only enforcement with multi-statement andOUTFILEblocking - Path traversal prevention — user IDs sanitized before any filesystem access
- Token budget — hard per-run token cap to control costs
from nexus.core.security import InjectionDetectionConfig, SecurityConfig
from nexus.core.config import NexusConfig
cfg = NexusConfig(security=SecurityConfig(injection=InjectionDetectionConfig(enabled=True, mode="block")))
Model routing
from nexus.core.config import LLMConfig, ModelRouter, RoutingConfig
config = NexusConfig(llm=LLMConfig(
provider=LLMProvider.OPENAI,
model="gpt-4o",
router=ModelRouter(
fallback=RoutingConfig(model="gpt-4o-mini"), # on tool-call errors
tool_call=RoutingConfig(model="gpt-4o-mini"), # cheaper model for tool turns
summarizer=RoutingConfig(model="gpt-4o-mini"), # long-context summarization
)
))
Configuration reference
from nexus import NexusConfig, LLMConfig, LLMProvider
from nexus.core.config import RAGConfig, VoiceConfig, SecurityConfig
from nexus.core.token_budget import TokenBudgetConfig
config = NexusConfig(
llm=LLMConfig(
provider=LLMProvider.OPENAI,
model="gpt-4o-mini",
api_key="sk-...", # or set OPENAI_API_KEY env var
temperature=0.7,
max_tokens=4096,
),
rag=RAGConfig(enabled=False), # disable RAG if not needed
budget=TokenBudgetConfig(max_tokens_per_run=50_000),
security=SecurityConfig(),
)
What's inside
| Component | Description |
|---|---|
nexus.Nexus |
Top-level async context manager |
nexus.brain.GraphBrain |
LangGraph execution engine with checkpointing |
nexus.core.registry.ToolRegistry |
Async-first tool registry with monitor wiring |
nexus.agents.Supervisor |
Multi-agent router with PerformanceMonitor integration |
nexus.agents.monitor.PerformanceMonitor |
Rolling-window failure tracker, auto-fires bad components |
nexus.core.security |
Injection guard, content policy, thread policy |
nexus.memory.profile.UserProfile |
Per-user JSON fact store for system-prompt preambles |
nexus.voice |
STT → LLM → TTS pipeline (OpenAI Whisper + ElevenLabs) |
nexus.serve.app |
FastAPI app with SSE streaming endpoints |
Requirements
- Python 3.11+
- One of:
anthropic,openai,google-generativeai - LangGraph 1.1+
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nexus_ai_orchestrator-0.1.0.tar.gz.
File metadata
- Download URL: nexus_ai_orchestrator-0.1.0.tar.gz
- Upload date:
- Size: 12.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
84b590b466d70b2467058422bc8c178b649912a9abeb53592271c3a59c6baae6
|
|
| MD5 |
d47808319a3f1ab0e84720dac053a402
|
|
| BLAKE2b-256 |
fab02cfa00e15ebb37ca7c4ef2df53c593de8bde3a000afa1af6958f2e102d19
|
File details
Details for the file nexus_ai_orchestrator-0.1.0-py3-none-any.whl.
File metadata
- Download URL: nexus_ai_orchestrator-0.1.0-py3-none-any.whl
- Upload date:
- Size: 15.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
92af7d7a53e293405fdb880384b4ef2ee1b4f52189da87aee9a5e043fa5bdbca
|
|
| MD5 |
e0ccf7deb0c8307ecd949ae9a0278a88
|
|
| BLAKE2b-256 |
6c20f0d6e54b890e5011d14ac70772b61a9514c80085fc0994b786b20396ff5f
|