Skip to main content

Full AI orchestration engine: LangGraph + LlamaIndex + MCP + A2A in 20 lines

Project description

nexus-ai-orchestrator

CI PyPI Python 3.10+ License: MIT

Full AI orchestration engine — LangGraph multi-turn memory, tool execution, streaming, multi-agent supervision, and voice I/O in one library.

import asyncio
from nexus import Nexus, NexusConfig, LLMConfig, LLMProvider

config = NexusConfig(llm=LLMConfig(provider=LLMProvider.OPENAI, model="gpt-4o-mini"))

async def main():
    async with await Nexus.create(config) as nexus:
        reply = await nexus.run("What is the capital of France?")
        print(reply)  # Paris

asyncio.run(main())

Install

pip install nexus-ai-orchestrator[anthropic]   # Anthropic / Claude
pip install nexus-ai-orchestrator[openai]      # OpenAI / GPT
pip install nexus-ai-orchestrator[google]      # Google / Gemini
pip install nexus-ai-orchestrator[all]         # all providers + redis + postgres

Features

Multi-turn memory

async with await Nexus.create(config) as nx:
    await nx.run("My name is Alice.", thread_id="user-123")
    reply = await nx.run("What's my name?", thread_id="user-123")
    print(reply)  # Alice

Streaming

async with await Nexus.create(config) as nx:
    async for token in nx.stream("Tell me a story"):
        print(token, end="", flush=True)

Typed event streaming (tool spinners, cost tickers)

async with await Nexus.create(config) as nx:
    async for event in nx.stream_events("Search the web for nexus-ai"):
        if event["type"] == "token":
            print(event["data"], end="")
        elif event["type"] == "tool_start":
            print(f"\n[calling {event['data']}...]")

Tool registration

from nexus import ToolDefinition, ToolParameter, ToolSource

async def get_weather(city: str) -> str:
    return f"Sunny and 72°F in {city}"

tool = ToolDefinition(
    name="get_weather",
    description="Returns current weather for a city.",
    parameters=[ToolParameter(name="city", type="string", description="City name", required=True)],
    source=ToolSource.NATIVE,
    handler=get_weather,
)

async with await Nexus.create(config) as nx:
    nx.add_tool(tool)
    reply = await nx.run("What's the weather in Tokyo?")

Multi-agent supervision

from nexus.agents import SubAgent, SubAgentConfig, Supervisor

async with await Nexus.create(config) as nx:
    researcher = SubAgent(SubAgentConfig(name="Researcher", role="Research and find facts"))
    writer = SubAgent(SubAgentConfig(name="Writer", role="Write clear summaries"))
    crew = Supervisor([researcher, writer], nexus=nx)
    reply = await crew.run("Research and summarise the history of the internet")

Performance monitoring — auto-fire bad agents

from nexus.agents import PerformanceMonitor

monitor = PerformanceMonitor(consecutive_threshold=3, on_fired=lambda name, reason: print(f"{name} fired: {reason}"))
crew = Supervisor([agent], nexus=nx, monitor=monitor)

Branch threads (fork conversation history)

async with await Nexus.create(config) as nx:
    await nx.run("The answer is 42.", thread_id="main")
    await nx.branch_thread("main", "branch-a")
    # branch-a starts with full context from main
    reply = await nx.run("What is the answer?", thread_id="branch-a")

User profile system prompts

from nexus import NexusConfig
from nexus.memory.profile import UserProfile

async with await Nexus.create(config) as nx:
    nx._profile_store.save("alice", {"name": "Alice", "language": "French"})
    reply = await nx.run("Bonjour!", user_id="alice")  # responds with Alice's context

Batch runs

async with await Nexus.create(config) as nx:
    results = await nx.batch_run(["Q1", "Q2", "Q3"], concurrency=3)

Human-in-the-loop approval

async def my_approval(tool_calls):
    for call in tool_calls:
        print(f"Approve {call['name']}({call['args']})? [y/n]")
        if input() != "y":
            return None
    return tool_calls

async with await Nexus.create(config) as nx:
    reply = await nx.run_with_approval("Delete old files", approval_callback=my_approval)

Voice I/O (STT → LLM → TTS)

from nexus import NexusConfig
from nexus.core.config import VoiceConfig

config = NexusConfig(llm=..., voice=VoiceConfig(stt_api_key="...", tts_api_key="..."))
async with await Nexus.create(config) as nx:
    audio_out = await nx.run_voice(audio_bytes, thread_id="user-123")

Security

  • Prompt-injection detection — built-in regex guard with warn and block modes
  • SQL read-only guardSELECT-only enforcement with multi-statement and OUTFILE blocking
  • Path traversal prevention — user IDs sanitized before any filesystem access
  • Token budget — hard per-run token cap to control costs
from nexus.core.security import InjectionDetectionConfig, SecurityConfig
from nexus.core.config import NexusConfig

cfg = NexusConfig(security=SecurityConfig(injection=InjectionDetectionConfig(enabled=True, mode="block")))

Model routing

from nexus.core.config import LLMConfig, ModelRouter, RoutingConfig

config = NexusConfig(llm=LLMConfig(
    provider=LLMProvider.OPENAI,
    model="gpt-4o",
    router=ModelRouter(
        fallback=RoutingConfig(model="gpt-4o-mini"),       # on tool-call errors
        tool_call=RoutingConfig(model="gpt-4o-mini"),      # cheaper model for tool turns
        summarizer=RoutingConfig(model="gpt-4o-mini"),     # long-context summarization
    )
))

Configuration reference

from nexus import NexusConfig, LLMConfig, LLMProvider
from nexus.core.config import RAGConfig, VoiceConfig, SecurityConfig
from nexus.core.token_budget import TokenBudgetConfig

config = NexusConfig(
    llm=LLMConfig(
        provider=LLMProvider.OPENAI,
        model="gpt-4o-mini",
        api_key="sk-...",           # or set OPENAI_API_KEY env var
        temperature=0.7,
        max_tokens=4096,
    ),
    rag=RAGConfig(enabled=False),   # disable RAG if not needed
    budget=TokenBudgetConfig(max_tokens_per_run=50_000),
    security=SecurityConfig(),
)

What's inside

Component Description
nexus.Nexus Top-level async context manager
nexus.brain.GraphBrain LangGraph execution engine with checkpointing
nexus.core.registry.ToolRegistry Async-first tool registry with monitor wiring
nexus.agents.Supervisor Multi-agent router with PerformanceMonitor integration
nexus.agents.monitor.PerformanceMonitor Rolling-window failure tracker, auto-fires bad components
nexus.core.security Injection guard, content policy, thread policy
nexus.memory.profile.UserProfile Per-user JSON fact store for system-prompt preambles
nexus.voice STT → LLM → TTS pipeline (OpenAI Whisper + ElevenLabs)
nexus.serve.app FastAPI app with SSE streaming endpoints

Requirements

  • Python 3.10+
  • One of: anthropic, openai, google-generativeai
  • LangGraph 1.1+

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nexus_ai_orchestrator-0.1.1.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nexus_ai_orchestrator-0.1.1-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file nexus_ai_orchestrator-0.1.1.tar.gz.

File metadata

  • Download URL: nexus_ai_orchestrator-0.1.1.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for nexus_ai_orchestrator-0.1.1.tar.gz
Algorithm Hash digest
SHA256 f6df330ecdf72ca5a886d98b99e2dbddab9d9fc9a8e50d6c92dbb5ccdd032bdb
MD5 0a654fc539aac7f9909d93c8858da33d
BLAKE2b-256 a01d20ba72488d253e5b4d2f1064b826b53cfedd9f72cf84433e863a56a5966e

See more details on using hashes here.

File details

Details for the file nexus_ai_orchestrator-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for nexus_ai_orchestrator-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d3c868fdd4dcff35cc12b1ea8b314e7987abd91a679fec917acb26e993a93e35
MD5 c003e83036a4c64affee92692b56d0e7
BLAKE2b-256 142d5e6e8cbf54c9596b900b24275f414b8bf3d14a118066bd64492489b31661

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page