Skip to main content

Full AI orchestration engine: LangGraph + LlamaIndex + MCP + A2A in 20 lines

Project description

nexus-ai-orchestrator

CI PyPI Python 3.11+ License: MIT

Full AI orchestration engine — LangGraph multi-turn memory, tool execution, streaming, multi-agent supervision, and voice I/O in one library.

import asyncio
from nexus import Nexus, NexusConfig, LLMConfig, LLMProvider

config = NexusConfig(llm=LLMConfig(provider=LLMProvider.OPENAI, model="gpt-4o-mini"))

async def main():
    async with await Nexus.create(config) as nexus:
        reply = await nexus.run("What is the capital of France?")
        print(reply)  # Paris

asyncio.run(main())

Install

pip install nexus-ai-orchestrator[anthropic]   # Anthropic / Claude
pip install nexus-ai-orchestrator[openai]      # OpenAI / GPT
pip install nexus-ai-orchestrator[google]      # Google / Gemini
pip install nexus-ai-orchestrator[all]         # all providers + redis + postgres

Features

Multi-turn memory

async with await Nexus.create(config) as nx:
    await nx.run("My name is Alice.", thread_id="user-123")
    reply = await nx.run("What's my name?", thread_id="user-123")
    print(reply)  # Alice

Streaming

async with await Nexus.create(config) as nx:
    async for token in nx.stream("Tell me a story"):
        print(token, end="", flush=True)

Typed event streaming (tool spinners, cost tickers)

async with await Nexus.create(config) as nx:
    async for event in nx.stream_events("Search the web for nexus-ai"):
        if event["type"] == "token":
            print(event["data"], end="")
        elif event["type"] == "tool_start":
            print(f"\n[calling {event['data']}...]")

Tool registration

from nexus import ToolDefinition, ToolParameter, ToolSource

async def get_weather(city: str) -> str:
    return f"Sunny and 72°F in {city}"

tool = ToolDefinition(
    name="get_weather",
    description="Returns current weather for a city.",
    parameters=[ToolParameter(name="city", type="string", description="City name", required=True)],
    source=ToolSource.NATIVE,
    handler=get_weather,
)

async with await Nexus.create(config) as nx:
    nx.add_tool(tool)
    reply = await nx.run("What's the weather in Tokyo?")

Multi-agent supervision

from nexus.agents import SubAgent, SubAgentConfig, Supervisor

async with await Nexus.create(config) as nx:
    researcher = SubAgent(SubAgentConfig(name="Researcher", role="Research and find facts"))
    writer = SubAgent(SubAgentConfig(name="Writer", role="Write clear summaries"))
    crew = Supervisor([researcher, writer], nexus=nx)
    reply = await crew.run("Research and summarise the history of the internet")

Performance monitoring — auto-fire bad agents

from nexus.agents import PerformanceMonitor

monitor = PerformanceMonitor(consecutive_threshold=3, on_fired=lambda name, reason: print(f"{name} fired: {reason}"))
crew = Supervisor([agent], nexus=nx, monitor=monitor)

Branch threads (fork conversation history)

async with await Nexus.create(config) as nx:
    await nx.run("The answer is 42.", thread_id="main")
    await nx.branch_thread("main", "branch-a")
    # branch-a starts with full context from main
    reply = await nx.run("What is the answer?", thread_id="branch-a")

User profile system prompts

from nexus import NexusConfig
from nexus.memory.profile import UserProfile

async with await Nexus.create(config) as nx:
    nx._profile_store.save("alice", {"name": "Alice", "language": "French"})
    reply = await nx.run("Bonjour!", user_id="alice")  # responds with Alice's context

Batch runs

async with await Nexus.create(config) as nx:
    results = await nx.batch_run(["Q1", "Q2", "Q3"], concurrency=3)

Human-in-the-loop approval

async def my_approval(tool_calls):
    for call in tool_calls:
        print(f"Approve {call['name']}({call['args']})? [y/n]")
        if input() != "y":
            return None
    return tool_calls

async with await Nexus.create(config) as nx:
    reply = await nx.run_with_approval("Delete old files", approval_callback=my_approval)

Voice I/O (STT → LLM → TTS)

from nexus import NexusConfig
from nexus.core.config import VoiceConfig

config = NexusConfig(llm=..., voice=VoiceConfig(stt_api_key="...", tts_api_key="..."))
async with await Nexus.create(config) as nx:
    audio_out = await nx.run_voice(audio_bytes, thread_id="user-123")

Security

  • Prompt-injection detection — built-in regex guard with warn and block modes
  • SQL read-only guardSELECT-only enforcement with multi-statement and OUTFILE blocking
  • Path traversal prevention — user IDs sanitized before any filesystem access
  • Token budget — hard per-run token cap to control costs
from nexus.core.security import InjectionDetectionConfig, SecurityConfig
from nexus.core.config import NexusConfig

cfg = NexusConfig(security=SecurityConfig(injection=InjectionDetectionConfig(enabled=True, mode="block")))

Model routing

from nexus.core.config import LLMConfig, ModelRouter, RoutingConfig

config = NexusConfig(llm=LLMConfig(
    provider=LLMProvider.OPENAI,
    model="gpt-4o",
    router=ModelRouter(
        fallback=RoutingConfig(model="gpt-4o-mini"),       # on tool-call errors
        tool_call=RoutingConfig(model="gpt-4o-mini"),      # cheaper model for tool turns
        summarizer=RoutingConfig(model="gpt-4o-mini"),     # long-context summarization
    )
))

Configuration reference

from nexus import NexusConfig, LLMConfig, LLMProvider
from nexus.core.config import RAGConfig, VoiceConfig, SecurityConfig
from nexus.core.token_budget import TokenBudgetConfig

config = NexusConfig(
    llm=LLMConfig(
        provider=LLMProvider.OPENAI,
        model="gpt-4o-mini",
        api_key="sk-...",           # or set OPENAI_API_KEY env var
        temperature=0.7,
        max_tokens=4096,
    ),
    rag=RAGConfig(enabled=False),   # disable RAG if not needed
    budget=TokenBudgetConfig(max_tokens_per_run=50_000),
    security=SecurityConfig(),
)

What's inside

Component Description
nexus.Nexus Top-level async context manager
nexus.brain.GraphBrain LangGraph execution engine with checkpointing
nexus.core.registry.ToolRegistry Async-first tool registry with monitor wiring
nexus.agents.Supervisor Multi-agent router with PerformanceMonitor integration
nexus.agents.monitor.PerformanceMonitor Rolling-window failure tracker, auto-fires bad components
nexus.core.security Injection guard, content policy, thread policy
nexus.memory.profile.UserProfile Per-user JSON fact store for system-prompt preambles
nexus.voice STT → LLM → TTS pipeline (OpenAI Whisper + ElevenLabs)
nexus.serve.app FastAPI app with SSE streaming endpoints

Requirements

  • Python 3.11+
  • One of: anthropic, openai, google-generativeai
  • LangGraph 1.1+

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nexus_ai_orchestrator-0.1.0.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nexus_ai_orchestrator-0.1.0-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file nexus_ai_orchestrator-0.1.0.tar.gz.

File metadata

  • Download URL: nexus_ai_orchestrator-0.1.0.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for nexus_ai_orchestrator-0.1.0.tar.gz
Algorithm Hash digest
SHA256 84b590b466d70b2467058422bc8c178b649912a9abeb53592271c3a59c6baae6
MD5 d47808319a3f1ab0e84720dac053a402
BLAKE2b-256 fab02cfa00e15ebb37ca7c4ef2df53c593de8bde3a000afa1af6958f2e102d19

See more details on using hashes here.

File details

Details for the file nexus_ai_orchestrator-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for nexus_ai_orchestrator-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 92af7d7a53e293405fdb880384b4ef2ee1b4f52189da87aee9a5e043fa5bdbca
MD5 e0ccf7deb0c8307ecd949ae9a0278a88
BLAKE2b-256 6c20f0d6e54b890e5011d14ac70772b61a9514c80085fc0994b786b20396ff5f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page