Skip to main content

MCP ベースのマルチエージェント開発キット: ストリーミング対応の MCP クライアント / サーバーをまとめた統合ライブラリ

Project description

agent-hub-kit

MCP (Model Context Protocol) ベースの マルチエージェント開発キット。 MCP クライアントと MCP サーバーの両方を、トークン単位ストリーミング対応 で簡潔に書けるようにする 1 つのパッケージ。複数のエージェントリポジトリ から同じプロトコルで呼び合えるようにすることを目的とする。

特徴

  • トークン単位ストリーミング: notifications/progress.message に JSON エンベロープを載せて text / think / log / image / stl / video / emotion などを多重化。LLM のトークンを 1 文字ずつ流せる
  • 複数 MCP サーバー集約: 1 つの AgentHubClient で複数の MCP サーバーへ 並列接続し、ツール名でルーティング (AsyncExitStack で安全に管理)
  • 中断 (cancellation): async forbreak するだけで MCP の notifications/cancelled が自動送出される薄いラッパ
  • 画像入力規約: ツール引数 images: list[str] を data URI 形式で渡す 慣習を統一
  • 標準準拠: 中身は MCP SDK (mcp パッケージ) のラッパで、独自プロトコル なし。他の MCP クライアント/サーバーとも互換

インストール

uv add agent-hub-kit
# または
pip install agent-hub-kit

未リリース版を使う場合:

uv add "agent-hub-kit @ git+https://github.com/ToPo-ToPo-ToPo/agent-hub-kit.git"

要件: Python 3.13+。主要な依存は mcp, httpx, pydantic, llm-graph-kit

アーキテクチャ

              ┌────────────────────────────────────────┐
              │           AgentHubClient               │
              │     (複数 MCP サーバーを集約)          │
              └──────┬──────────────┬──────────────┬──┘
                     │ MCP/HTTP     │ MCP/HTTP     │ MCP/HTTP
                     ▼              ▼              ▼
          ┌──────────────────────────────────────────┐
          │           各 MCP サーバー                │
          │   EventStreamer で progress notification │
          │   としてトークン単位ストリーム配信        │
          └──────────────────────────────────────────┘
役割 主要 API MCP プロトコル上の立ち位置
MCP クライアント (呼び出し側) AgentHubClient client
MCP サーバー (応答側) EventStreamer / stream_text_iter server

クイックスタート

サーバーとクライアントを 1 ファイルずつ作って動かす最小例。

1. サーバー (my_server.py)

"""自前 MCP サーバーの最小例。"""
import asyncio
from mcp.server.fastmcp import Context, FastMCP
from agent_hub_kit import EventStreamer

mcp = FastMCP("my-app")

@mcp.tool()
async def echo(text: str, ctx: Context) -> str:
    """入力テキストを 1 文字ずつ text イベントとしてストリーム配信。"""
    stream = EventStreamer(ctx, node="my-app")
    await stream.log(f"受信: {len(text)} 文字")
    for ch in text:
        await stream.text(ch)
        await asyncio.sleep(0.02)
    return text

if __name__ == "__main__":
    mcp.settings.host = "127.0.0.1"
    mcp.settings.port = 8001
    mcp.run(transport="streamable-http")

2. クライアント (my_client.py)

"""自前 MCP サーバーを呼び出すクライアント。"""
import asyncio
from agent_hub_kit import AgentHubClient

async def main():
    async with AgentHubClient() as client:
        await client.connect_http("my_app", "http://127.0.0.1:8001/mcp")

        async for ev in client.call_tool_stream("echo", {"text": "Hello"}):
            if ev.type == "text":
                print(ev.content, end="", flush=True)
            elif ev.type == "log":
                print(f"\n[log] {ev.content}")
            elif ev.type == "result":
                print(f"\n[result] {ev.content!r}")

asyncio.run(main())

3. 実行 (2 ターミナル)

# ターミナル A
python my_server.py

# ターミナル B
python my_client.py

Hello が 1 文字ずつストリームで届き、最後に [result] 'Hello' が表示 される。

API リファレンス

公開 API はすべて from agent_hub_kit import ... で取得可能:

from agent_hub_kit import (
    AgentHubClient,       # MCP クライアント
    ToolRef,              # tools/list で発見した 1 ツールの情報
    StreamEvent,          # ストリーム配信される 1 イベント
    ENVELOPE_VERSION,     # JSON エンベロープのバージョン
    encode_envelope,      # サーバー側で内部利用 (通常は EventStreamer 経由)
    decode_envelope,      # クライアント側で内部利用 (通常は自動)
    EventStreamer,        # サーバー側のストリーミング送出ヘルパー
    stream_text_iter,     # テキストイテレータを text イベントとして流すヘルパー
)

AgentHubClient

複数 MCP サーバーへの接続を束ねる async context manager。

async with AgentHubClient() as client:
    refs = await client.connect_http("server_a", "http://localhost:8001/mcp")
    ...
メソッド 説明
connect_http(server_name: str, url: str) -> list[ToolRef] Streamable HTTP で 1 サーバーに接続し、tools/list 結果を返す
connect_all_http(endpoints: Mapping[str, str]) -> list[ToolRef] 複数エンドポイントを一括接続。{"name": "http://.../mcp", ...}
list_tools() -> dict[str, list[ToolRef]] 接続中の全サーバーから tools/list を取り直す。{server: [ToolRef]}
find_tool(tool_name: str) -> ToolRef | None キャッシュ済みのツール情報を返す
call_tool_stream(tool_name, arguments, *, server=None, queue_maxsize=256) -> AsyncIterator[StreamEvent] ツールを呼び出し、StreamEvent を逐次 yield する async generator

call_tool_stream() は中断 (cancellation) も簡単:

async for ev in client.call_tool_stream("echo", {"text": "..."}):
    if ev.type == "text":
        print(ev.content)
        if should_stop():
            break   # ← MCP の notifications/cancelled が自動送出される

EventStreamer

MCP サーバー側でツール内から使うストリーミング送出ヘルパー。FastMCPContext をラップする。

@mcp.tool()
async def my_tool(user_request: str, ctx: Context) -> str:
    stream = EventStreamer(ctx, node="my-agent")
    await stream.log("開始")
    for chunk in run_llm(user_request):
        await stream.text(chunk)
    return final_text
メソッド 配信される StreamEvent.type 用途
text(chunk: str) text LLM トークン等のテキスト断片
think(chunk: str) think 思考プロセス (UI でグレー表示等)
log(message: str) log 進捗ログ
image(payload: dict) image 画像 (URL / base64)
stl(payload: dict) stl STL モデル情報
video(payload: dict) video 動画情報
emotion(payload: dict) emotion_data 感情パラメータ
error(message: str) error ツール失敗扱いにせず継続したい時のエラー
send(event_type: str, content: Any) 任意 上記以外の独自種別を送る場合の汎用 API

stream_text_iter(stream, chunks)

テキストイテレータをまとめて text イベントとして配信し、連結結果を返す ヘルパー。同期/非同期どちらのイテレータも受け付ける。

full_text = await stream_text_iter(stream, llm.stream(prompt))

StreamEvent

クライアントが受信するイベント 1 件。call_tool_stream() が yield する型。

@dataclass(slots=True)
class StreamEvent:
    type: str                    # "text" / "think" / "log" / ... / "result" / "error" / "cancelled"
    content: Any = None          # 種別依存ペイロード
    node: str | None = None      # 送信元サーバー / エージェント名
    raw: dict | None = None      # 元の JSON エンベロープ (デバッグ用)

イベント種別一覧

クライアントが StreamEvent.type で受け取りうる値:

type 内容 発生源
text LLM トークン等のテキスト断片 stream.text()
think 思考プロセス stream.think()
log 進捗ログ stream.log()
image 画像 stream.image()
stl STL モデル stream.stl()
video 動画 stream.video()
emotion_data 感情パラメータ stream.emotion()
result ツールの最終戻り値 call_tool_streamcall_tool 完了時に 1 度だけ yield
error エラー stream.error() / call_tool 例外 / ツール側 isError
cancelled キャンセル完了 クライアント側で中断した時

エンベロープ形式

notifications/progress.message に載せる JSON 形式:

{
  "v": 1,
  "type": "text",
  "content": "あ",
  "node": "my-agent"
}
  • v: ENVELOPE_VERSION (現在 1)
  • type: 上記イベント種別のいずれか、または任意文字列
  • content: 種別依存ペイロード (text は str、image は dict 等)
  • node: 送信元の識別子 (任意。UI で吹き出し色分け等に使う想定)

JSON ではない通常文字列が来た場合は type="text" として解釈される (前方互換)。

パターン集

複数の MCP サーバーをひとつのクライアントから呼ぶ

async with AgentHubClient() as client:
    await client.connect_all_http({
        "topology": "http://localhost:8001/mcp",
        "simulation": "http://localhost:8002/mcp",
        "rendering": "http://localhost:8003/mcp",
    })

    # ツール名で自動ルーティング (どの server に居るかは tools/list で発見済み)
    async for ev in client.call_tool_stream("optimize_beam", {"user_request": "..."}):
        handle(ev)

並列でツール呼び出し

async def call(label):
    async for ev in client.call_tool_stream("ask", {"user_request": label}):
        if ev.type == "result":
            return ev.content
    return ""

async with AgentHubClient() as client:
    await client.connect_http("agent", "http://localhost:8001/mcp")
    a, b = await asyncio.gather(call("質問 A"), call("質問 B"))

中断 (cancellation)

async for ev in client.call_tool_stream("long_running", args):
    if ev.type == "text":
        print(ev.content, end="")
        if user_pressed_stop():
            break   # ← MCP の notifications/cancelled が自動で送出される

画像を入力として渡す

サーバー側はツール引数に images: list[str] | None = None を取り、data URI 形式の base64 文字列リストを受け取る規約。

# クライアント側
import base64
with open("input.png", "rb") as f:
    b64 = base64.b64encode(f.read()).decode()
images = [f"data:image/png;base64,{b64}"]

async for ev in client.call_tool_stream(
    "analyze_image",
    {"user_request": "この画像を説明して", "images": images},
):
    ...
# サーバー側
@mcp.tool()
async def analyze_image(
    user_request: str,
    ctx: Context,
    images: list[str] | None = None,
) -> str:
    stream = EventStreamer(ctx, node="vision-agent")
    if images:
        for uri in images:
            _, b64 = uri.split(",", 1)
            img_bytes = base64.b64decode(b64)
            # ... VLM 等に渡す
    ...

エラーハンドリング

async for ev in client.call_tool_stream("flaky_tool", args):
    if ev.type == "error":
        # ツール内で stream.error() されたか、ツールが isError=True で完了した
        log.warning(f"server error: {ev.content}")
    elif ev.type == "result":
        ...

接続自体が失敗する場合は connect_http が例外を投げる:

try:
    await client.connect_http("server", url)
except Exception as e:
    log.error(f"接続失敗: {e!r}")

llm-graph-kit と組み合わせて LLM エージェントを書く

llm-graph-kit で宣言した グラフベースのエージェントを MCP サーバーとして公開するパターン。 llm-graph-kit のイベント ({"type": "answer_text", ...} 等) を EventStreamer のメソッドにマップする:

from mcp.server.fastmcp import Context, FastMCP
from llm_graph_kit import LLMGraph, NodeState
from agent_hub_kit import EventStreamer

class MyAgent:
    def __init__(self, llm):
        self.llm = llm

    def build_graph(self) -> LLMGraph:
        g = LLMGraph(state_schema=...)
        g.add_node("answer", self._answer)
        ...
        return g

    def _answer(self, state: NodeState):
        for chunk in self.llm.respond(...):
            yield {"type": "answer_text", "content": chunk}
        return {"answer": "..."}

    def run(self, question: str):
        yield from self.build_graph().run({"question": question})


mcp = FastMCP("my-llm-agent")
agent = MyAgent(llm=...)

@mcp.tool()
async def ask(user_request: str, ctx: Context) -> str:
    stream = EventStreamer(ctx, node="my-llm-agent")
    answer = ""
    for event in agent.run(user_request):
        if event["type"] == "answer_text":
            answer += event["content"]
            await stream.text(event["content"])
        elif event["type"] == "log":
            await stream.log(event["content"])
    return answer

完全な動作例は本リポジトリの examples/llm_agent_server.py (DummyLLM 版) および examples/llm_agent_server_mlx.py (実 LLM 版) を参照。

サーバーを auto-reload で開発する

mcp.run(transport="streamable-http") の代わりに ASGI アプリを取り出して uvicorn を使えば --reload が利用できる:

# my_server_reload.py
from my_server import mcp

if __name__ != "__main__":
    app = mcp.streamable_http_app()

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "my_server_reload:app",
        host="127.0.0.1", port=8001,
        reload=True,
        reload_dirs=["."],
    )
python my_server_reload.py   # .py を編集すると自動再起動

LLM ロードが重い場合は、再起動コストとのトレードオフがあるので、軽量 モデルで開発するか、--reload を使わず手動再起動が現実的な場合も多い。

リポジトリ内の実行サンプル

本リポジトリをクローンした場合は examples/ に動作確認用のサンプルが 入っている。uv sync 後に以下で動く:

サンプル 起動コマンド LLM OS
examples/llm_agent_server.py uv run python examples/llm_agent_server.py DummyLLM 不問
examples/llm_agent_server_mlx.py uv run python examples/llm_agent_server_mlx.py augllm.MlxLLM macOS + MLX
examples/llm_agent_server_reload.py uv run python examples/llm_agent_server_reload.py DummyLLM (+ --reload) 不問
examples/llm_agent_server_mlx_reload.py uv run python examples/llm_agent_server_mlx_reload.py MlxLLM (+ --reload) macOS + MLX

クライアントはどのサーバーに対しても共通:

uv run python main.py            # 標準 (内部で examples/llm_agent_client.py を実行)
# または
uv run python examples/llm_agent_client.py

ポート切替は環境変数 LLM_AGENT_PORT (デフォルト 8201) で。サーバーと クライアントで同じ値を指定する。

なぜ MCP の素の SDK ではなく本キットなのか

MCP SDK (mcp パッケージ) だけでも client/server の双方向通信は可能だが、マルチエージェント開発 では以下が頻出のため、本キットでカプセル化している:

  1. トークン単位ストリーミング: notifications/progress.message に JSON エンベロープを載せて多種類のイベントを多重化
  2. 複数サーバー集約: 1 つのクライアントから複数の MCP サーバーへ 並列接続し、ツール名で自動ルーティング
  3. 中断 (cancellation): async forbreak するだけで notifications/cancelled が自動送出される
  4. 画像入力規約: tool 引数 images: list[str] を data URI 形式で 渡す慣習を統一

開発

git clone https://github.com/ToPo-ToPo-ToPo/agent-hub-kit.git
cd agent-hub-kit
uv sync
uv run pytest -v

テストは tests/test_llm_agent_integration.py に集約されており、 examples/llm_agent_server.py をサブプロセス起動して公開 API のみで end-to-end (発見・ストリーミング・中断・並行呼び出し) を検証する。

ライセンス

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agent_hub_kit-0.1.0.tar.gz (13.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agent_hub_kit-0.1.0-py3-none-any.whl (15.7 kB view details)

Uploaded Python 3

File details

Details for the file agent_hub_kit-0.1.0.tar.gz.

File metadata

  • Download URL: agent_hub_kit-0.1.0.tar.gz
  • Upload date:
  • Size: 13.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.10

File hashes

Hashes for agent_hub_kit-0.1.0.tar.gz
Algorithm Hash digest
SHA256 8421c35fe325c51c760f21093e232f610c7458337953cfa710a6a865b8bb9850
MD5 f88af6990e6f43cd5fe2209d30c5b483
BLAKE2b-256 90bd11626cf7332d6ca23f6f50249e81014e7f31adbfbf240b9ea46a33732ea5

See more details on using hashes here.

File details

Details for the file agent_hub_kit-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: agent_hub_kit-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 15.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.10

File hashes

Hashes for agent_hub_kit-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8bd91649c998f9b3d74fbc0e3d0a34f73c747614457f64e75783733c0d8818c6
MD5 9301ef729b6318a65038cbc82cd39d30
BLAKE2b-256 f678ae79bdd00d55720bfb1459d2ae23f6e8f3faf2bff4e01e965cad17bf240b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page