Skip to main content

Provider-agnostic toolkit: dynamic MCP servers + agent skills for any LLM.

Project description

toolnexus

PyPI license

Build an agent in a few lines. Point at an mcp.json and a skills/ folder, call run(), and you have a working agent — MCP servers, agent skills, your own functions, and HTTP endpoints unified as one tool set, driving any LLM.

Right-sized. Not a framework (no builders, no config to wade through), not a toy that falls over the moment you need streaming or a retry. Everything a real agent needs — the loop, hooks, streaming, retries, memory — and nothing it doesn't.

The Python port of toolnexus — the same library, byte-identical, also in JavaScript, Go, Java, and C#. Built on the official MCP Python SDK (the mcp package). Python ≥ 3.11.

Install

pip install toolnexus

Quick start — a working agent in 5 lines

No mcp.json, no skills folder. The 10 built-in tools (bash, read, grep, webfetch, …) are on by default, so the model can actually do things right away:

import asyncio
from toolnexus import create_toolkit, create_client

async def main():
    tk = await create_toolkit()                          # built-in tools, on by default
    agent = create_client(
        base_url="https://openrouter.ai/api/v1", style="openai",
        model="deepseek/deepseek-chat",                  # any OpenRouter/OpenAI/Anthropic model
    )
    res = await agent.run("List the files here, then count them.", tk)
    print(res.text)
    await tk.close()

asyncio.run(main())
export OPENROUTER_API_KEY=...      # or OPENAI_API_KEY / ANTHROPIC_API_KEY

create_client reads the key from OPENROUTER_API_KEY / OPENAI_API_KEY / ANTHROPIC_API_KEY (no api_key= needed).

With MCP servers + skills

The MCP SDK is async, so the toolkit is async:

import asyncio
from toolnexus import create_toolkit, create_client


async def main():
    # 1. tools from an mcp.json + a skills/ folder
    tk = await create_toolkit(mcp_config="./mcp.json", skills_dir="./skills")

    # 2. point at any OpenAI- or Anthropic-style endpoint
    agent = create_client(
        base_url="https://openrouter.ai/api/v1",
        style="openai",                # or "anthropic"
        model="openai/gpt-4o-mini",
    )

    # 3. run — skills injected, tools called for you, looped to an answer
    res = await agent.run("Refund order 1234 for the customer.", tk)
    print(res.text)
    await tk.close()


asyncio.run(main())

The Toolkit is also an async context manager (async with await create_toolkit(...) as tk:) if you'd rather not call close() yourself.

Conversations / memory

run() is stateless — each call starts fresh. For a multi-turn thread that remembers, use ask(prompt, tk, id=...). Give it an id and the client's ConversationStore does the work: load that thread's transcript → run → save the updated transcript. The next ask with the same id continues where it left off. Call ask without an id and it's a stateless one-shot — identical to run.

agent = create_client(base_url="https://openrouter.ai/api/v1", style="openai",
                      model="openai/gpt-4o-mini")

await agent.ask("I trade NIFTY.", tk, id="trader-42")
res = await agent.ask("What do I trade?", tk, id="trader-42")
print(res.text)   # -> "NIFTY" — the second turn remembers the first

Every client has a store — by default an in-memory InMemoryConversationStore that lives as long as the client. To persist across processes (a file, a DB, Redis), pass your own to create_client:

from toolnexus import create_client, ConversationStore

class FileStore:                                  # implements ConversationStore
    async def get(self, id):                      # -> list[messages] | None
        ...
    async def save(self, id, messages):           # persist the updated transcript
        ...

agent = create_client(base_url=..., style="openai", model=..., store=FileStore())

ConversationStore is just two async methods — get(id) and save(id, messages). The A2A serve side uses the same store: an inbound peer's turns are keyed by their A2A contextId, so a served agent remembers a caller across tasks (see A2A agents).

Streaming with memory. The id also works while streaming. Pass on_text (a sync- or async-callable) to ask to stream text deltas as they arrive — ask still returns the final RunResult — or iterate stream() directly. With an id, the thread is loaded before the stream and saved on the terminal done event.

# block-style: stream deltas, still get the RunResult back — remembered under `id`
res = await agent.ask("Draft a reply.", tk, id="trader-42",
                      on_text=lambda delta: print(delta, end="", flush=True))

# async iterator: consume text + tool events; `id` makes it stateful (load before, save on done)
async for ev in agent.stream("And summarise it.", tk, id="trader-42"):
    if ev["type"] == "text":
        print(ev["delta"], end="", flush=True)
    elif ev["type"] == "done":
        print("\n", ev["result"].usage)

Observability / metrics

Zero-dependency, two outputs from one internal instrumentation — both opt-in, no cost when unused.

on_metric — a semantic event feed. Pass it to create_client and it receives a readable, snake_case dict at each significant point: {"event": "llm", "model", "status", "ms", "prompt_tokens", "completion_tokens"} per model call, {"event": "tool", "tool", "source", "is_error", "ms"} per tool call, and a terminal {"event": "run", "model", "turns", "tool_calls", "total_tokens", "ms", "error"?} per run/ask. Forward it anywhere (statsd, logs, OpenTelemetry).

agent = create_client(
    base_url=..., style="openai", model=...,
    on_metric=lambda ev: print("[metric]", ev["event"], ev),
)

agent.metrics() — built-in Prometheus text. The same events feed a tiny in-memory registry that renders the Prometheus text exposition format (no third-party dep). Mount it at GET /metrics:

from http.server import BaseHTTPRequestHandler, HTTPServer

class Handler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == "/metrics":
            body = agent.metrics().encode()
            self.send_response(200)
            self.send_header("Content-Type", "text/plain; version=0.0.4")
            self.end_headers()
            self.wfile.write(body)
        else:
            self.send_response(404)
            self.end_headers()

HTTPServer(("", 9090), Handler).serve_forever()

Series: toolnexus_llm_requests_total{model,status}, toolnexus_llm_tokens_total{type}, toolnexus_tool_calls_total{tool,source,is_error}, toolnexus_run_errors_total{model}, plus the toolnexus_llm_request_duration_seconds and toolnexus_tool_duration_seconds histograms. The rendered text is byte-identical across all five ports; OTLP push is a planned future companion.

Add your own tools

from toolnexus import define_tool, http_tool

# a plain function → a tool (schema inferred from the signature)
def add(a: float, b: float) -> str:
    """Add two numbers and return the sum."""
    return str(a + b)

tk.register(define_tool(add, name="add"))

# a REST endpoint → a tool
tk.register(http_tool(
    name="create_ticket", description="Create a ticket", method="POST",
    url="https://api.example.com/tickets",
    headers={"Authorization": "Bearer ${API_TOKEN}"},   # ${ENV} expands from os.environ, never logged
    input_schema={"type": "object", "properties": {"title": {"type": "string"}}, "required": ["title"]},
))

URL {placeholders} are filled from args; the rest become the JSON body. Non-2xx → ToolResult(output="HTTP <status>: <body>", is_error=True).

Built-in tools

A fifth source ships 10 built-in toolsbash, read, write, edit, grep, glob, webfetch, question, apply_patch, todowrite (names + input schemas match opencode) — so an agent can act with zero wiring. They appear in the tool schema (to_openai()/to_anthropic()/to_gemini()), like MCP tools — not the system prompt.

On by default. One global toggle turns the whole source off, or a per-tool tools map disables individual builtins on the all-on baseline:

tk = await create_toolkit(mcp_config="./mcp.json", builtins=False)
# also accepts {"disabled": True} or {"enabled": False}

# per-tool: drop bash, keep the other nine (unknown names ignored; whole-source-off still wins)
tk2 = await create_toolkit(mcp_config="./mcp.json", builtins={"tools": {"bash": False}})

bash/write/edit/apply_patch run commands and mutate the filesystem — the toggle is the off-switch for locked-down hosts.

A2A agents (agent-to-agent)

Call remote A2A agents (each of their skills becomes a tool) and serve your own toolkit as an agent other A2A peers can call. A genuine, minimal subset of real A2A (JSON-RPC 2.0; Agent Card at /.well-known/agent-card.json; SendMessage → poll GetTask). No streaming / push / auth in v1.

Outbound — call a remote agent. Each advertised skill becomes a tool named <agent>_<skill> (source="a2a"):

from toolnexus import create_toolkit, agent

tk = await create_toolkit(
    agents=[agent("https://researcher.example.com/.well-known/agent-card.json")],
)

# or add one at runtime (an Agent or a bare card URL):
await tk.add_agent("https://writer.example.com/.well-known/agent-card.json")

agent(card, *, headers=None, timeout=None, poll_every=None)headers support ${ENV} expansion (never logged); timeout / poll_every are milliseconds (300000 / 1000 defaults). A config file can also carry an agents block. A failing agent is isolated — contributes no tools, never fatal.

Inbound — serve your toolkit as an agent. The Agent Card is built from your SKILL.md skills (never raw tools):

from toolnexus import create_client

agent_client = create_client(base_url="https://openrouter.ai/api/v1", style="openai", model="openai/gpt-4o-mini")

handle = await tk.serve("127.0.0.1:0", client=agent_client, a2a={
    "name": "research-agent",
    "description": "Answers research questions.",
    # "skills": ["hello-world"],   # subset of skills to advertise; omit ⇒ all
    "store": "memory",             # "memory" (default) | "file:<dir>" | a custom TaskStore
})
print(handle.url)                  # GET /.well-known/agent-card.json ; POST / (SendMessage / GetTask)
await handle.stop()

serve(addr, *, client, a2a=None, on_task=None) fulfils each inbound task through the client: a message carrying an A2A contextId goes through client.ask(..., id=contextId), so a peer's turns are remembered across tasks via the client's ConversationStore; without a contextId it's a stateless client.run. Task persistence is a separate pluggable TaskStore (in-memory default, "file:<dir>", or your own).

Bring your own loop

Don't want the host loop? Use the schema adapters and execute calls yourself:

tools  = tk.to_openai()        # or tk.to_anthropic() / tk.to_gemini()
system = tk.skills_prompt()    # skills catalog for your system prompt (opens with a preamble telling the model to use the skill tool)
# when the model returns a tool call { name, arguments }:
res = await tk.execute(name, arguments)   # -> ToolResult(output, is_error, metadata)

The four sources

Source How
MCP servers an mcp.json (mcpServers/servers/mcp); local stdio + remote streamable-HTTP, headers for auth
Agent skills a folder of <name>/SKILL.md; a skill tool loads each on demand + a system-prompt catalog
Native tools define_tool(fn) / the @tool decorator — a function becomes a tool
HTTP / REST http_tool(...) — an endpoint becomes a tool, ${ENV} headers

All four appear as one uniform Tool in tk.tools(), with source in "mcp" | "skill" | "custom".

API

Python Description
await create_toolkit(...) async factory → Toolkit
create_client(..., store=?, on_metric=?) the unified host loop; store is the ConversationStore (default in-memory); on_metric is the metric-event sink
await agent.run(prompt, tk) one stateless agent loop → RunResult(text, messages, tool_calls, usage, …)
await agent.ask(prompt, tk, *, id=None, on_text=None) with id: remembers the thread via store (get → run → save); without: one-shot (= run); on_text streams text deltas
agent.stream(prompt, tk, *, id=None) streaming variant — async-iterate text/tool/usage/done events; id ⇒ stateful
agent.metrics() Prometheus text exposition of cumulative metrics — mount at GET /metrics
ConversationStore / InMemoryConversationStore async get(id) / async save(id, messages) — implement for file/db; in-memory default
tk.tools() / tk.get(name) the uniform tools
await tk.execute(name, args, ctx=None) run a tool → ToolResult
tk.skills_prompt() system-prompt skill catalog
tk.mcp_status() per-server connection status
tk.to_openai() / to_anthropic() / to_gemini() provider tool schemas
tk.register(*tools) add native/http/custom tools
await tk.serve(addr, client=…, a2a=…) serve the toolkit as an A2A agent
await tk.close() disconnect MCP servers

More

Full docs, the other four language ports, the shared behavior spec, and runnable examples: https://github.com/muthuishere/toolnexus

MIT licensed.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

toolnexus-0.4.0.tar.gz (117.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

toolnexus-0.4.0-py3-none-any.whl (53.2 kB view details)

Uploaded Python 3

File details

Details for the file toolnexus-0.4.0.tar.gz.

File metadata

  • Download URL: toolnexus-0.4.0.tar.gz
  • Upload date:
  • Size: 117.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for toolnexus-0.4.0.tar.gz
Algorithm Hash digest
SHA256 a10201ec80fcb487581b4f96867a71b7497b2b02c0349d87ae1976b34fdbee7e
MD5 b05ad851dc9ab539f64779ec3e0f7c01
BLAKE2b-256 4b32d1ee347639cf65132da70c9becc6f1154e93bcce57052ea2c31f890d4834

See more details on using hashes here.

Provenance

The following attestation bundles were made for toolnexus-0.4.0.tar.gz:

Publisher: release.yml on muthuishere/toolnexus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file toolnexus-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: toolnexus-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 53.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for toolnexus-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f0c4b8ed92c88e9c9073cece35bf2db7e182062c1ced70d1e1b5e537e1eea029
MD5 77a73bc75a708016c02e64d923fdc7d7
BLAKE2b-256 29903284989ee522fcf25db15f4df239500c83a4796e3c38fe27790714fb9d94

See more details on using hashes here.

Provenance

The following attestation bundles were made for toolnexus-0.4.0-py3-none-any.whl:

Publisher: release.yml on muthuishere/toolnexus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page