Skip to main content

MCP ベースのマルチエージェント開発キット: ストリーミング対応の MCP クライアント / サーバーをまとめた統合ライブラリ

Project description

agent-hub-kit

MCP (Model Context Protocol) ベースの マルチエージェント開発キット。 MCP クライアントと MCP サーバーの両方を、トークン単位ストリーミング対応 で簡潔に書けるようにする 1 つのパッケージ。複数のエージェントリポジトリ から同じプロトコルで呼び合えるようにすることを目的とする。

特徴

  • トークン単位ストリーミング: notifications/progress.message に JSON エンベロープを載せて text / think / log / image / stl / video / emotion などを多重化。LLM のトークンを 1 文字ずつ流せる
  • 複数 MCP サーバー集約: 1 つの AgentHubClient で複数の MCP サーバーへ 並列接続し、ツール名でルーティング (AsyncExitStack で安全に管理)
  • 中断 (cancellation): async forbreak するだけで MCP の notifications/cancelled が自動送出される薄いラッパ
  • 画像入力規約: ツール引数 images: list[str] を data URI 形式で渡す 慣習を統一
  • 標準準拠: 中身は MCP SDK (mcp パッケージ) のラッパで、独自プロトコル なし。他の MCP クライアント/サーバーとも互換

インストール

uv add agent-hub-kit
# または
pip install agent-hub-kit

未リリース版を使う場合:

uv add "agent-hub-kit @ git+https://github.com/ToPo-ToPo-ToPo/agent-hub-kit.git"

要件: Python 3.13+。主要な依存は mcp, httpx, pydantic, llm-graph-kit

アーキテクチャ

              ┌────────────────────────────────────────┐
              │           AgentHubClient               │
              │     (複数 MCP サーバーを集約)          │
              └──────┬──────────────┬──────────────┬──┘
                     │ MCP/HTTP     │ MCP/HTTP     │ MCP/HTTP
                     ▼              ▼              ▼
          ┌──────────────────────────────────────────┐
          │           各 MCP サーバー                │
          │   EventStreamer で progress notification │
          │   としてトークン単位ストリーム配信        │
          └──────────────────────────────────────────┘
役割 主要 API MCP プロトコル上の立ち位置
MCP クライアント (呼び出し側) AgentHubClient client
MCP サーバー (応答側) EventStreamer / stream_text_iter server

クイックスタート

サーバーとクライアントを 1 ファイルずつ作って動かす最小例。

1. サーバー (my_server.py)

"""自前 MCP サーバーの最小例。"""
import asyncio
from mcp.server.fastmcp import Context, FastMCP
from agent_hub_kit import EventStreamer

mcp = FastMCP("my-app")

@mcp.tool()
async def echo(text: str, ctx: Context) -> str:
    """入力テキストを 1 文字ずつ text イベントとしてストリーム配信。"""
    stream = EventStreamer(ctx, node="my-app")
    await stream.log(f"受信: {len(text)} 文字")
    for ch in text:
        await stream.text(ch)
        await asyncio.sleep(0.02)
    return text

if __name__ == "__main__":
    mcp.settings.host = "127.0.0.1"
    mcp.settings.port = 8001
    mcp.run(transport="streamable-http")

2. クライアント (my_client.py)

"""自前 MCP サーバーを呼び出すクライアント。"""
import asyncio
from agent_hub_kit import AgentHubClient

async def main():
    async with AgentHubClient() as client:
        await client.connect_http("my_app", "http://127.0.0.1:8001/mcp")

        async for ev in client.call_tool_stream("echo", {"text": "Hello"}):
            if ev.type == "text":
                print(ev.content, end="", flush=True)
            elif ev.type == "log":
                print(f"\n[log] {ev.content}")
            elif ev.type == "result":
                print(f"\n[result] {ev.content!r}")

asyncio.run(main())

3. 実行 (2 ターミナル)

# ターミナル A
python my_server.py

# ターミナル B
python my_client.py

Hello が 1 文字ずつストリームで届き、最後に [result] 'Hello' が表示 される。

API リファレンス

公開 API はすべて from agent_hub_kit import ... で取得可能:

from agent_hub_kit import (
    AgentHubClient,         # MCP クライアント
    ToolRef,                # tools/list で発見した 1 ツールの情報
    StreamEvent,            # ストリーム配信される 1 イベント
    ENVELOPE_VERSION,       # JSON エンベロープのバージョン
    TERMINAL_EVENT_TYPES,   # ストリーム終端イベントの種別集合
    encode_envelope,        # サーバー側で内部利用 (通常は EventStreamer 経由)
    decode_envelope,        # クライアント側で内部利用 (通常は自動)
    EventStreamer,          # サーバー側のストリーミング送出ヘルパー
    stream_text_iter,       # テキストイテレータを text イベントとして流すヘルパー
    AgentHubError,          # ライブラリ例外の基底
    AgentHubToolError,      # call_tool() でツールが失敗した時の例外
    AgentHubCancelled,      # call_tool() がキャンセルで終わった時の例外
    as_image_uris,          # 複数画像入力を data URI のリストに変換
    encode_image_to_data_uri,  # 1 件の画像を data URI に変換
)

AgentHubClient

複数 MCP サーバーへの接続を束ねる async context manager。

async with AgentHubClient() as client:
    refs = await client.connect_http("server_a", "http://localhost:8001/mcp")
    ...

コンストラクタは AgentHubClient(*, on_event=callback) を受ける。指定すると yield される全イベントを観測できる (観測性 / メトリクス用、同期・非同期どちらも可)。

メソッド 説明
connect_http(server_name, url, *, probe_timeout=None, on_error="raise", replace=False) Streamable HTTP で接続。probe_timeout で TCP open 確認、on_error="skip"/"warn" で失敗吸収、replace=True で同名既存セッションを置き換え
connect_stdio(server_name, command, *, args=None, env=None, cwd=None, on_error="raise", replace=False) stdio トランスポート (Claude Desktop 型のサブプロセス起動) で接続
connect_all_http(endpoints, *, probe_timeout=None, on_error="raise") 複数エンドポイントを一括接続
disconnect(server_name) 指定サーバーのみ切断 (他には影響しない)
reconnect_http(server_name, url, *, probe_timeout=None, on_error="raise") 切断してから HTTP で再接続 (specialist 再起動への追随用)
list_tools() -> dict[str, list[ToolRef]] 接続中の全サーバーから tools/list を取り直す
find_tool(tool_name, *, server=None) -> ToolRef | None ツール情報を返す。server= で曖昧性解消可能
find_tools(tool_name) -> list[ToolRef] 同名 tool が複数サーバーに居る場合の全マッチを返す
diagnose_tool(tool_name) -> str 見つからない時の Did-you-mean メッセージを返す
call_tool(tool_name, arguments, *, server=None) -> Any ノンストリーミング版。中間イベントを無視して最終結果だけ返す。失敗時は AgentHubToolError、キャンセル時は AgentHubCancelled を投げる
call_tool_stream(tool_name, arguments, *, server=None, queue_maxsize=256) async iterator + async context manager。async for でも async with でも使える

call_tool_stream() の中断は 2 通り:

# 暗黙キャンセル (break で notifications/cancelled が自動送出)
async for ev in client.call_tool_stream("echo", {"text": "..."}):
    if ev.type == "text":
        print(ev.content)
        if should_stop():
            break

# 明示キャンセル (ctxmgr 形式)
async with client.call_tool_stream("echo", {"text": "..."}) as stream:
    async for ev in stream:
        if should_stop():
            await stream.cancel()
            break

最終結果だけ欲しい時は call_tool() で短く書ける:

try:
    result: str = await client.call_tool("echo", {"user_request": "abc"})
except AgentHubToolError as e:
    log.error(f"tool failed: {e.content}")
except AgentHubCancelled:
    log.info("cancelled by peer")

EventStreamer

MCP サーバー側でツール内から使うストリーミング送出ヘルパー。FastMCPContext をラップする。

@mcp.tool()
async def my_tool(user_request: str, ctx: Context) -> str:
    stream = EventStreamer(ctx, node="my-agent")
    await stream.log("開始")
    for chunk in run_llm(user_request):
        await stream.text(chunk)
    return final_text
メソッド 配信される StreamEvent.type 用途
text(chunk: str, *, node=None) text LLM トークン等のテキスト断片
think(chunk: str, *, node=None) think 思考プロセス (UI でグレー表示等)
log(message: str, *, node=None) log 進捗ログ
image(payload: dict, *, node=None) image 画像 (URL / base64)
stl(payload: dict, *, node=None) stl STL モデル情報
video(payload: dict, *, node=None) video 動画情報
emotion(payload: dict, *, node=None) emotion_data 感情パラメータ
error(message: str, *, node=None) error ツール失敗扱いにせず継続したい時のエラー
send(event_type, content, *, node=None) 任意 上記以外の独自種別を送る場合の汎用 API
with_node(node) 指定 node を既定とする fluent ラッパを返す

すべてのメソッドで node= キーワード引数を指定すると、その 1 イベントだけ インスタンス既定の node を上書きできる。多段エージェント (planner → worker → renderer 等) で内側エージェントの node をそのまま伝播させたいときに使う:

stream = EventStreamer(ctx, node="orchestrator")
for ev in inner_agent.run(...):
    await stream.send(ev["type"], ev["content"], node=ev.get("node"))

# fluent 形式
await stream.with_node("planner").text(chunk)

stream_text_iter(stream, chunks)

テキストイテレータをまとめて text イベントとして配信し、連結結果を返す ヘルパー。同期/非同期どちらのイテレータも受け付ける。

full_text = await stream_text_iter(stream, llm.stream(prompt))

StreamEvent

クライアントが受信するイベント 1 件。call_tool_stream() が yield する型。

@dataclass(slots=True)
class StreamEvent:
    type: str                    # "text" / "think" / "log" / ... / "result" / "error" / "cancelled"
    content: Any = None          # 種別依存ペイロード
    node: str | None = None      # 送信元サーバー / エージェント名
    raw: dict | None = None      # 元の JSON エンベロープ (デバッグ用)

イベント種別一覧

クライアントが StreamEvent.type で受け取りうる値:

type 内容 発生源 terminal?
text LLM トークン等のテキスト断片 stream.text()
think 思考プロセス stream.think()
log 進捗ログ stream.log()
image 画像 stream.image()
stl STL モデル stream.stl()
video 動画 stream.video()
emotion_data 感情パラメータ stream.emotion()
error 継続可能エラー stream.error()
result ツールの最終戻り値 (成功) call_tool 完了時に 1 度だけ
terminal_error 致命的エラー (ツール本体失敗 / isError=True / 例外) call_tool 失敗時に 1 度だけ
cancelled キャンセル完了 クライアント側で中断した時

StreamEvent.is_terminal プロパティが上記 ✅ 列の判定をそのまま返すので、 クライアント側は if ev.is_terminal: break のように分岐できる。継続可能な error と致命的な terminal_error を区別したい場合に便利。

エンベロープ形式

notifications/progress.message に載せる JSON 形式:

{
  "v": 1,
  "type": "text",
  "content": "あ",
  "node": "my-agent"
}
  • v: ENVELOPE_VERSION (現在 1)
  • type: 上記イベント種別のいずれか、または任意文字列
  • content: 種別依存ペイロード (text は str、image は dict 等)
  • node: 送信元の識別子 (任意。UI で吹き出し色分け等に使う想定)

JSON ではない通常文字列が来た場合は type="text" として解釈される (前方互換)。

パターン集

複数の MCP サーバーをひとつのクライアントから呼ぶ

async with AgentHubClient() as client:
    await client.connect_all_http({
        "topology": "http://localhost:8001/mcp",
        "simulation": "http://localhost:8002/mcp",
        "rendering": "http://localhost:8003/mcp",
    })

    # ツール名で自動ルーティング (どの server に居るかは tools/list で発見済み)
    async for ev in client.call_tool_stream("optimize_beam", {"user_request": "..."}):
        handle(ev)

並列でツール呼び出し

async def call(label):
    async for ev in client.call_tool_stream("ask", {"user_request": label}):
        if ev.type == "result":
            return ev.content
    return ""

async with AgentHubClient() as client:
    await client.connect_http("agent", "http://localhost:8001/mcp")
    a, b = await asyncio.gather(call("質問 A"), call("質問 B"))

中断 (cancellation)

async for ev in client.call_tool_stream("long_running", args):
    if ev.type == "text":
        print(ev.content, end="")
        if user_pressed_stop():
            break   # ← MCP の notifications/cancelled が自動で送出される

画像を入力として渡す

サーバー側はツール引数に images: list[str] | None = None を取り、data URI 形式の base64 文字列リストを受け取る規約。as_image_uris() ヘルパーで Path / bytes / PIL.Image.Image / 既に data URI な str をまとめて 変換できる:

from pathlib import Path
from agent_hub_kit import as_image_uris

images = as_image_uris([
    Path("a.png"),                              # ファイル: 拡張子から mime 推定
    b"\x89PNG\r\n...raw bytes...",              # bytes: 既定 image/png
    pil_image,                                  # PIL.Image.Image (PIL は optional)
    "data:image/jpeg;base64,...",               # 既に data URI ならパススルー
])

await client.call_tool(
    "analyze_image",
    {"user_request": "この画像を説明して", "images": images},
)
# サーバー側
@mcp.tool()
async def analyze_image(
    user_request: str,
    ctx: Context,
    images: list[str] | None = None,
) -> str:
    stream = EventStreamer(ctx, node="vision-agent")
    if images:
        for uri in images:
            _, b64 = uri.split(",", 1)
            img_bytes = base64.b64decode(b64)
            # ... VLM 等に渡す
    ...

接続の TCP probe / 失敗時の skip

specialist が動的に増減するオーケストレータでは、未起動エンドポイントへの 接続試行が BaseExceptionGroup を引きずって全体を巻き込まないようガードしたい。 connect_http (および connect_all_http) はこれを内蔵している:

async with AgentHubClient() as client:
    # 0.5 秒で TCP open しなければ無音 skip
    await client.connect_http(
        "maybe_offline", "http://127.0.0.1:9999/mcp",
        probe_timeout=0.5, on_error="skip",
    )

    # warn ログを出すだけにする
    await client.connect_all_http(
        {"a": "http://...", "b": "http://..."},
        probe_timeout=0.5, on_error="warn",
    )

on_error:

  • "raise" (default): 失敗時は例外を上げる
  • "skip": 失敗時は何もせず空リストを返す
  • "warn": warn ログを出して空リストを返す

MCP SDK が投げる BaseExceptionGroup もここで吸収するため、orchestrator 側で except BaseException を書く必要が無い。

エラーハンドリング

async for ev in client.call_tool_stream("flaky_tool", args):
    if ev.type == "error":
        # ツール内で stream.error() されたか、ツールが isError=True で完了した
        log.warning(f"server error: {ev.content}")
    elif ev.type == "result":
        ...

接続自体が失敗する場合は connect_http が例外を投げる:

try:
    await client.connect_http("server", url)
except Exception as e:
    log.error(f"接続失敗: {e!r}")

llm-graph-kit と組み合わせて LLM エージェントを書く

llm-graph-kit で宣言した グラフベースのエージェントを MCP サーバーとして公開するパターン。 llm-graph-kit のイベント ({"type": "answer_text", ...} 等) を EventStreamer のメソッドにマップする:

from mcp.server.fastmcp import Context, FastMCP
from llm_graph_kit import LLMGraph, NodeState
from agent_hub_kit import EventStreamer

class MyAgent:
    def __init__(self, llm):
        self.llm = llm

    def build_graph(self) -> LLMGraph:
        g = LLMGraph(state_schema=...)
        g.add_node("answer", self._answer)
        ...
        return g

    def _answer(self, state: NodeState):
        for chunk in self.llm.respond(...):
            yield {"type": "answer_text", "content": chunk}
        return {"answer": "..."}

    def run(self, question: str):
        yield from self.build_graph().run({"question": question})


mcp = FastMCP("my-llm-agent")
agent = MyAgent(llm=...)

@mcp.tool()
async def ask(user_request: str, ctx: Context) -> str:
    stream = EventStreamer(ctx, node="my-llm-agent")
    answer = ""
    for event in agent.run(user_request):
        if event["type"] == "answer_text":
            answer += event["content"]
            await stream.text(event["content"])
        elif event["type"] == "log":
            await stream.log(event["content"])
    return answer

完全な動作例は本リポジトリの examples/llm_agent_server.py (DummyLLM 版) および examples/llm_agent_server_mlx.py (実 LLM 版) を参照。

stdio MCP サーバーに接続する

Claude Desktop 型の「サブプロセスとして MCP サーバーを起動して stdin/stdout で会話する」パターン。

await client.connect_stdio(
    "local_tool",
    command=["python", "my_tool.py"],
)
# あるいは
await client.connect_stdio(
    "local_tool",
    command="python",
    args=["my_tool.py"],
    env={"FOO": "bar"},
)

接続後の call_tool / call_tool_stream / disconnect の使い方は HTTP と同じ。

specialist の動的増減 / 再起動に追随する

長時間動かす orchestrator では specialist 側が再起動 / 増減する。 disconnect / reconnect_http / connect_http(replace=True) で クライアントを組み直さずに状態を維持できる:

async with AgentHubClient() as client:
    # 起動時に接続。未起動なら無音 skip。
    await client.connect_all_http(specialists, probe_timeout=0.5, on_error="skip")

    # 後から特定 specialist を再起動 → 再接続
    await client.reconnect_http("topology", "http://localhost:8001/mcp")

    # もう要らなくなった specialist を切る (他には影響なし)
    await client.disconnect("legacy_agent")

観測性: イベント / メトリクスを覗く

AgentHubClient(on_event=callback) ですべての yield イベントを副流できる。 ロギング / Prometheus エクスポート / トレーシング等に使う:

def to_logger(ev: StreamEvent) -> None:
    logger.debug("event %s node=%s", ev.type, ev.node)

async with AgentHubClient(on_event=to_logger) as client:
    ...

非同期コールバックも受け付ける。コールバック側で例外が出ても本流は止まらない。

pytest: 同梱フィクスチャを使う

利用側プロジェクトの conftest.py で本キットの testing プラグインを読み込むと、 echo / llm specialist が起動した URL がそのままフィクスチャとして降ってくる:

# conftest.py
pytest_plugins = ["agent_hub_kit.testing"]
# tests/test_my_orchestrator.py
import pytest
from agent_hub_kit import AgentHubClient

@pytest.mark.asyncio
async def test_with_echo(echo_specialist_url: str) -> None:
    async with AgentHubClient() as client:
        await client.connect_http("echo", echo_specialist_url)
        assert await client.call_tool("echo", {"user_request": "hi"}) == "hi"

提供されるフィクスチャ:

  • echo_specialist_url : echo specialist を session スコープで起動
  • llm_specialist_url : llm (DummyLLM) specialist を session スコープで起動
  • free_port() : 空き port を取るヘルパー (自前 subprocess 用)

サーバーを auto-reload で開発する

mcp.run(transport="streamable-http") の代わりに ASGI アプリを取り出して uvicorn を使えば --reload が利用できる:

# my_server_reload.py
from my_server import mcp

if __name__ != "__main__":
    app = mcp.streamable_http_app()

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "my_server_reload:app",
        host="127.0.0.1", port=8001,
        reload=True,
        reload_dirs=["."],
    )
python my_server_reload.py   # .py を編集すると自動再起動

LLM ロードが重い場合は、再起動コストとのトレードオフがあるので、軽量 モデルで開発するか、--reload を使わず手動再起動が現実的な場合も多い。

同梱サンプルサーバー (CLI)

ライブラリに最小サンプル specialist を同梱しており、写経せずに 1 行で立ち上げられる:

# echo: 受け取った文字列を 1 文字ずつ text イベントで返す (依存少)
python -m agent_hub_kit.examples echo --host 127.0.0.1 --port 8001

# llm: llm-graph-kit + DummyLLM の QA エージェント
python -m agent_hub_kit.examples llm --host 127.0.0.1 --port 8002

オンボーディング / smoke test / pytest fixture の subprocess 起動先として 使える。実装は agent_hub_kit.examples.echo.build_echo_server() / agent_hub_kit.examples.llm.build_llm_server() で取り出せるため、自前の FastMCP に差し込んでカスタマイズすることも可能。

リポジトリ内の実行サンプル

本リポジトリをクローンした場合は examples/ に動作確認用のサンプルが 入っている。uv sync 後に以下で動く:

サンプル 起動コマンド LLM OS
examples/llm_agent_server.py uv run python examples/llm_agent_server.py DummyLLM 不問
examples/llm_agent_server_mlx.py uv run python examples/llm_agent_server_mlx.py augllm.MlxLLM macOS + MLX
examples/llm_agent_server_reload.py uv run python examples/llm_agent_server_reload.py DummyLLM (+ --reload) 不問
examples/llm_agent_server_mlx_reload.py uv run python examples/llm_agent_server_mlx_reload.py MlxLLM (+ --reload) macOS + MLX

クライアントはどのサーバーに対しても共通:

uv run python main.py            # 標準 (内部で examples/llm_agent_client.py を実行)
# または
uv run python examples/llm_agent_client.py

ポート切替は環境変数 LLM_AGENT_PORT (デフォルト 8201) で。サーバーと クライアントで同じ値を指定する。

なぜ MCP の素の SDK ではなく本キットなのか

MCP SDK (mcp パッケージ) だけでも client/server の双方向通信は可能だが、マルチエージェント開発 では以下が頻出のため、本キットでカプセル化している:

  1. トークン単位ストリーミング: notifications/progress.message に JSON エンベロープを載せて多種類のイベントを多重化
  2. 複数サーバー集約: 1 つのクライアントから複数の MCP サーバーへ 並列接続し、ツール名で自動ルーティング
  3. 中断 (cancellation): async forbreak するだけで notifications/cancelled が自動送出される
  4. 画像入力規約: tool 引数 images: list[str] を data URI 形式で 渡す慣習を統一

開発

git clone https://github.com/ToPo-ToPo-ToPo/agent-hub-kit.git
cd agent-hub-kit
uv sync
uv run pytest -v

テストは tests/test_llm_agent_integration.py に集約されており、 examples/llm_agent_server.py をサブプロセス起動して公開 API のみで end-to-end (発見・ストリーミング・中断・並行呼び出し) を検証する。

ライセンス

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agent_hub_kit-0.1.1.tar.gz (26.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agent_hub_kit-0.1.1-py3-none-any.whl (32.9 kB view details)

Uploaded Python 3

File details

Details for the file agent_hub_kit-0.1.1.tar.gz.

File metadata

  • Download URL: agent_hub_kit-0.1.1.tar.gz
  • Upload date:
  • Size: 26.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.10

File hashes

Hashes for agent_hub_kit-0.1.1.tar.gz
Algorithm Hash digest
SHA256 b682fdb06b4b5ee2bd13865a2b202a74a5666d12c0a62475c22dbcf26dc23a85
MD5 ac19f221e5843ea3bd692f10e5720d7a
BLAKE2b-256 562d34be6cbf6077cbcc2db2ae0a0bee756c698b733464f09bea276c2d6fcabc

See more details on using hashes here.

File details

Details for the file agent_hub_kit-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: agent_hub_kit-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 32.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.10

File hashes

Hashes for agent_hub_kit-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 07302e3d967aea91df148380f28bb609723386de0f913ad63780c7987778c18d
MD5 a867482e22b42a4406bda2453ac6aeff
BLAKE2b-256 c127ac6a24a3ad3986cec364e2b76bcde3bcaac8b57fb8575fcb4bd813f9327f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page