MCP ベースのマルチエージェント開発キット: ストリーミング対応の MCP クライアント / サーバーをまとめた統合ライブラリ
Project description
agent-hub-kit
MCP (Model Context Protocol) ベースの マルチエージェント開発キット。 MCP クライアントと MCP サーバーの両方を、トークン単位ストリーミング対応 で簡潔に書けるようにする 1 つのパッケージ。複数のエージェントリポジトリ から同じプロトコルで呼び合えるようにすることを目的とする。
特徴
- トークン単位ストリーミング:
notifications/progress.messageに JSON エンベロープを載せてtext/think/log/image/stl/video/emotionなどを多重化。LLM のトークンを 1 文字ずつ流せる - 複数 MCP サーバー集約: 1 つの
AgentHubClientで複数の MCP サーバーへ 並列接続し、ツール名でルーティング (AsyncExitStackで安全に管理) - 中断 (cancellation):
async forをbreakするだけで MCP のnotifications/cancelledが自動送出される薄いラッパ - 画像入力規約: ツール引数
images: list[str]を data URI 形式で渡す 慣習を統一 - 標準準拠: 中身は MCP SDK (
mcpパッケージ) のラッパで、独自プロトコル なし。他の MCP クライアント/サーバーとも互換
インストール
uv add agent-hub-kit
# または
pip install agent-hub-kit
未リリース版を使う場合:
uv add "agent-hub-kit @ git+https://github.com/ToPo-ToPo-ToPo/agent-hub-kit.git"
要件: Python 3.13+。主要な依存は mcp, httpx, pydantic, llm-graph-kit。
アーキテクチャ
┌────────────────────────────────────────┐
│ AgentHubClient │
│ (複数 MCP サーバーを集約) │
└──────┬──────────────┬──────────────┬──┘
│ MCP/HTTP │ MCP/HTTP │ MCP/HTTP
▼ ▼ ▼
┌──────────────────────────────────────────┐
│ 各 MCP サーバー │
│ EventStreamer で progress notification │
│ としてトークン単位ストリーム配信 │
└──────────────────────────────────────────┘
| 役割 | 主要 API | MCP プロトコル上の立ち位置 |
|---|---|---|
| MCP クライアント (呼び出し側) | AgentHubClient |
client |
| MCP サーバー (応答側) | EventStreamer / stream_text_iter |
server |
クイックスタート
サーバーとクライアントを 1 ファイルずつ作って動かす最小例。
1. サーバー (my_server.py)
"""自前 MCP サーバーの最小例。"""
import asyncio
from mcp.server.fastmcp import Context, FastMCP
from agent_hub_kit import EventStreamer
mcp = FastMCP("my-app")
@mcp.tool()
async def echo(text: str, ctx: Context) -> str:
"""入力テキストを 1 文字ずつ text イベントとしてストリーム配信。"""
stream = EventStreamer(ctx, node="my-app")
await stream.log(f"受信: {len(text)} 文字")
for ch in text:
await stream.text(ch)
await asyncio.sleep(0.02)
return text
if __name__ == "__main__":
mcp.settings.host = "127.0.0.1"
mcp.settings.port = 8001
mcp.run(transport="streamable-http")
2. クライアント (my_client.py)
"""自前 MCP サーバーを呼び出すクライアント。"""
import asyncio
from agent_hub_kit import AgentHubClient
async def main():
async with AgentHubClient() as client:
await client.connect_http("my_app", "http://127.0.0.1:8001/mcp")
async for ev in client.call_tool_stream("echo", {"text": "Hello"}):
if ev.type == "text":
print(ev.content, end="", flush=True)
elif ev.type == "log":
print(f"\n[log] {ev.content}")
elif ev.type == "result":
print(f"\n[result] {ev.content!r}")
asyncio.run(main())
3. 実行 (2 ターミナル)
# ターミナル A
python my_server.py
# ターミナル B
python my_client.py
Hello が 1 文字ずつストリームで届き、最後に [result] 'Hello' が表示
される。
API リファレンス
公開 API はすべて from agent_hub_kit import ... で取得可能:
from agent_hub_kit import (
AgentHubClient, # MCP クライアント
ToolRef, # tools/list で発見した 1 ツールの情報
StreamEvent, # ストリーム配信される 1 イベント
ENVELOPE_VERSION, # JSON エンベロープのバージョン
TERMINAL_EVENT_TYPES, # ストリーム終端イベントの種別集合
encode_envelope, # サーバー側で内部利用 (通常は EventStreamer 経由)
decode_envelope, # クライアント側で内部利用 (通常は自動)
EventStreamer, # サーバー側のストリーミング送出ヘルパー
stream_text_iter, # テキストイテレータを text イベントとして流すヘルパー
AgentHubError, # ライブラリ例外の基底
AgentHubToolError, # call_tool() でツールが失敗した時の例外
AgentHubCancelled, # call_tool() がキャンセルで終わった時の例外
as_image_uris, # 複数画像入力を data URI のリストに変換
encode_image_to_data_uri, # 1 件の画像を data URI に変換
)
AgentHubClient
複数 MCP サーバーへの接続を束ねる async context manager。
async with AgentHubClient() as client:
refs = await client.connect_http("server_a", "http://localhost:8001/mcp")
...
コンストラクタは AgentHubClient(*, on_event=callback) を受ける。指定すると
yield される全イベントを観測できる (観測性 / メトリクス用、同期・非同期どちらも可)。
| メソッド | 説明 |
|---|---|
connect_http(server_name, url, *, probe_timeout=None, on_error="raise", replace=False) |
Streamable HTTP で接続。probe_timeout で TCP open 確認、on_error="skip"/"warn" で失敗吸収、replace=True で同名既存セッションを置き換え |
connect_stdio(server_name, command, *, args=None, env=None, cwd=None, on_error="raise", replace=False) |
stdio トランスポート (Claude Desktop 型のサブプロセス起動) で接続 |
connect_all_http(endpoints, *, probe_timeout=None, on_error="raise") |
複数エンドポイントを一括接続 |
disconnect(server_name) |
指定サーバーのみ切断 (他には影響しない) |
reconnect_http(server_name, url, *, probe_timeout=None, on_error="raise") |
切断してから HTTP で再接続 (specialist 再起動への追随用) |
list_tools() -> dict[str, list[ToolRef]] |
接続中の全サーバーから tools/list を取り直す |
find_tool(tool_name, *, server=None) -> ToolRef | None |
ツール情報を返す。server= で曖昧性解消可能 |
find_tools(tool_name) -> list[ToolRef] |
同名 tool が複数サーバーに居る場合の全マッチを返す |
diagnose_tool(tool_name) -> str |
見つからない時の Did-you-mean メッセージを返す |
call_tool(tool_name, arguments, *, server=None) -> Any |
ノンストリーミング版。中間イベントを無視して最終結果だけ返す。失敗時は AgentHubToolError、キャンセル時は AgentHubCancelled を投げる |
call_tool_stream(tool_name, arguments, *, server=None, queue_maxsize=256) |
async iterator + async context manager。async for でも async with でも使える |
call_tool_stream() の中断は 2 通り:
# 暗黙キャンセル (break で notifications/cancelled が自動送出)
async for ev in client.call_tool_stream("echo", {"text": "..."}):
if ev.type == "text":
print(ev.content)
if should_stop():
break
# 明示キャンセル (ctxmgr 形式)
async with client.call_tool_stream("echo", {"text": "..."}) as stream:
async for ev in stream:
if should_stop():
await stream.cancel()
break
最終結果だけ欲しい時は call_tool() で短く書ける:
try:
result: str = await client.call_tool("echo", {"user_request": "abc"})
except AgentHubToolError as e:
log.error(f"tool failed: {e.content}")
except AgentHubCancelled:
log.info("cancelled by peer")
EventStreamer
MCP サーバー側でツール内から使うストリーミング送出ヘルパー。FastMCP の
Context をラップする。
@mcp.tool()
async def my_tool(user_request: str, ctx: Context) -> str:
stream = EventStreamer(ctx, node="my-agent")
await stream.log("開始")
for chunk in run_llm(user_request):
await stream.text(chunk)
return final_text
| メソッド | 配信される StreamEvent.type |
用途 |
|---|---|---|
text(chunk: str, *, node=None) |
text |
LLM トークン等のテキスト断片 |
think(chunk: str, *, node=None) |
think |
思考プロセス (UI でグレー表示等) |
log(message: str, *, node=None) |
log |
進捗ログ |
image(payload: dict, *, node=None) |
image |
画像 (URL / base64) |
stl(payload: dict, *, node=None) |
stl |
STL モデル情報 |
video(payload: dict, *, node=None) |
video |
動画情報 |
emotion(payload: dict, *, node=None) |
emotion_data |
感情パラメータ |
error(message: str, *, node=None) |
error |
ツール失敗扱いにせず継続したい時のエラー |
send(event_type, content, *, node=None) |
任意 | 上記以外の独自種別を送る場合の汎用 API |
with_node(node) |
— | 指定 node を既定とする fluent ラッパを返す |
すべてのメソッドで node= キーワード引数を指定すると、その 1 イベントだけ
インスタンス既定の node を上書きできる。多段エージェント (planner → worker
→ renderer 等) で内側エージェントの node をそのまま伝播させたいときに使う:
stream = EventStreamer(ctx, node="orchestrator")
for ev in inner_agent.run(...):
await stream.send(ev["type"], ev["content"], node=ev.get("node"))
# fluent 形式
await stream.with_node("planner").text(chunk)
stream_text_iter(stream, chunks)
テキストイテレータをまとめて text イベントとして配信し、連結結果を返す ヘルパー。同期/非同期どちらのイテレータも受け付ける。
full_text = await stream_text_iter(stream, llm.stream(prompt))
StreamEvent
クライアントが受信するイベント 1 件。call_tool_stream() が yield する型。
@dataclass(slots=True)
class StreamEvent:
type: str # "text" / "think" / "log" / ... / "result" / "error" / "cancelled"
content: Any = None # 種別依存ペイロード
node: str | None = None # 送信元サーバー / エージェント名
raw: dict | None = None # 元の JSON エンベロープ (デバッグ用)
イベント種別一覧
クライアントが StreamEvent.type で受け取りうる値:
type |
内容 | 発生源 | terminal? |
|---|---|---|---|
text |
LLM トークン等のテキスト断片 | stream.text() |
— |
think |
思考プロセス | stream.think() |
— |
log |
進捗ログ | stream.log() |
— |
image |
画像 | stream.image() |
— |
stl |
STL モデル | stream.stl() |
— |
video |
動画 | stream.video() |
— |
emotion_data |
感情パラメータ | stream.emotion() |
— |
error |
継続可能エラー | stream.error() |
— |
result |
ツールの最終戻り値 (成功) | call_tool 完了時に 1 度だけ |
✅ |
terminal_error |
致命的エラー (ツール本体失敗 / isError=True / 例外) | call_tool 失敗時に 1 度だけ |
✅ |
cancelled |
キャンセル完了 | クライアント側で中断した時 | ✅ |
StreamEvent.is_terminal プロパティが上記 ✅ 列の判定をそのまま返すので、
クライアント側は if ev.is_terminal: break のように分岐できる。継続可能な
error と致命的な terminal_error を区別したい場合に便利。
エンベロープ形式
notifications/progress.message に載せる JSON 形式:
{
"v": 1,
"type": "text",
"content": "あ",
"node": "my-agent"
}
v:ENVELOPE_VERSION(現在 1)type: 上記イベント種別のいずれか、または任意文字列content: 種別依存ペイロード (text は str、image は dict 等)node: 送信元の識別子 (任意。UI で吹き出し色分け等に使う想定)
JSON ではない通常文字列が来た場合は type="text" として解釈される
(前方互換)。
パターン集
複数の MCP サーバーをひとつのクライアントから呼ぶ
async with AgentHubClient() as client:
await client.connect_all_http({
"topology": "http://localhost:8001/mcp",
"simulation": "http://localhost:8002/mcp",
"rendering": "http://localhost:8003/mcp",
})
# ツール名で自動ルーティング (どの server に居るかは tools/list で発見済み)
async for ev in client.call_tool_stream("optimize_beam", {"user_request": "..."}):
handle(ev)
並列でツール呼び出し
async def call(label):
async for ev in client.call_tool_stream("ask", {"user_request": label}):
if ev.type == "result":
return ev.content
return ""
async with AgentHubClient() as client:
await client.connect_http("agent", "http://localhost:8001/mcp")
a, b = await asyncio.gather(call("質問 A"), call("質問 B"))
中断 (cancellation)
async for ev in client.call_tool_stream("long_running", args):
if ev.type == "text":
print(ev.content, end="")
if user_pressed_stop():
break # ← MCP の notifications/cancelled が自動で送出される
画像を入力として渡す
サーバー側はツール引数に images: list[str] | None = None を取り、data URI
形式の base64 文字列リストを受け取る規約。as_image_uris() ヘルパーで
Path / bytes / PIL.Image.Image / 既に data URI な str をまとめて
変換できる:
from pathlib import Path
from agent_hub_kit import as_image_uris
images = as_image_uris([
Path("a.png"), # ファイル: 拡張子から mime 推定
b"\x89PNG\r\n...raw bytes...", # bytes: 既定 image/png
pil_image, # PIL.Image.Image (PIL は optional)
"data:image/jpeg;base64,...", # 既に data URI ならパススルー
])
await client.call_tool(
"analyze_image",
{"user_request": "この画像を説明して", "images": images},
)
# サーバー側
@mcp.tool()
async def analyze_image(
user_request: str,
ctx: Context,
images: list[str] | None = None,
) -> str:
stream = EventStreamer(ctx, node="vision-agent")
if images:
for uri in images:
_, b64 = uri.split(",", 1)
img_bytes = base64.b64decode(b64)
# ... VLM 等に渡す
...
接続の TCP probe / 失敗時の skip
specialist が動的に増減するオーケストレータでは、未起動エンドポイントへの
接続試行が BaseExceptionGroup を引きずって全体を巻き込まないようガードしたい。
connect_http (および connect_all_http) はこれを内蔵している:
async with AgentHubClient() as client:
# 0.5 秒で TCP open しなければ無音 skip
await client.connect_http(
"maybe_offline", "http://127.0.0.1:9999/mcp",
probe_timeout=0.5, on_error="skip",
)
# warn ログを出すだけにする
await client.connect_all_http(
{"a": "http://...", "b": "http://..."},
probe_timeout=0.5, on_error="warn",
)
on_error:
"raise"(default): 失敗時は例外を上げる"skip": 失敗時は何もせず空リストを返す"warn": warn ログを出して空リストを返す
MCP SDK が投げる BaseExceptionGroup もここで吸収するため、orchestrator
側で except BaseException を書く必要が無い。
エラーハンドリング
async for ev in client.call_tool_stream("flaky_tool", args):
if ev.type == "error":
# ツール内で stream.error() されたか、ツールが isError=True で完了した
log.warning(f"server error: {ev.content}")
elif ev.type == "result":
...
接続自体が失敗する場合は connect_http が例外を投げる:
try:
await client.connect_http("server", url)
except Exception as e:
log.error(f"接続失敗: {e!r}")
llm-graph-kit と組み合わせて LLM エージェントを書く
llm-graph-kit で宣言した
グラフベースのエージェントを MCP サーバーとして公開するパターン。
llm-graph-kit のイベント ({"type": "answer_text", ...} 等) を
EventStreamer のメソッドにマップする:
from mcp.server.fastmcp import Context, FastMCP
from llm_graph_kit import LLMGraph, NodeState
from agent_hub_kit import EventStreamer
class MyAgent:
def __init__(self, llm):
self.llm = llm
def build_graph(self) -> LLMGraph:
g = LLMGraph(state_schema=...)
g.add_node("answer", self._answer)
...
return g
def _answer(self, state: NodeState):
for chunk in self.llm.respond(...):
yield {"type": "answer_text", "content": chunk}
return {"answer": "..."}
def run(self, question: str):
yield from self.build_graph().run({"question": question})
mcp = FastMCP("my-llm-agent")
agent = MyAgent(llm=...)
@mcp.tool()
async def ask(user_request: str, ctx: Context) -> str:
stream = EventStreamer(ctx, node="my-llm-agent")
answer = ""
for event in agent.run(user_request):
if event["type"] == "answer_text":
answer += event["content"]
await stream.text(event["content"])
elif event["type"] == "log":
await stream.log(event["content"])
return answer
完全な動作例は本リポジトリの examples/llm_agent_server.py (DummyLLM 版)
および examples/llm_agent_server_mlx.py (実 LLM 版) を参照。
stdio MCP サーバーに接続する
Claude Desktop 型の「サブプロセスとして MCP サーバーを起動して stdin/stdout で会話する」パターン。
await client.connect_stdio(
"local_tool",
command=["python", "my_tool.py"],
)
# あるいは
await client.connect_stdio(
"local_tool",
command="python",
args=["my_tool.py"],
env={"FOO": "bar"},
)
接続後の call_tool / call_tool_stream / disconnect の使い方は
HTTP と同じ。
specialist の動的増減 / 再起動に追随する
長時間動かす orchestrator では specialist 側が再起動 / 増減する。
disconnect / reconnect_http / connect_http(replace=True) で
クライアントを組み直さずに状態を維持できる:
async with AgentHubClient() as client:
# 起動時に接続。未起動なら無音 skip。
await client.connect_all_http(specialists, probe_timeout=0.5, on_error="skip")
# 後から特定 specialist を再起動 → 再接続
await client.reconnect_http("topology", "http://localhost:8001/mcp")
# もう要らなくなった specialist を切る (他には影響なし)
await client.disconnect("legacy_agent")
観測性: イベント / メトリクスを覗く
AgentHubClient(on_event=callback) ですべての yield イベントを副流できる。
ロギング / Prometheus エクスポート / トレーシング等に使う:
def to_logger(ev: StreamEvent) -> None:
logger.debug("event %s node=%s", ev.type, ev.node)
async with AgentHubClient(on_event=to_logger) as client:
...
非同期コールバックも受け付ける。コールバック側で例外が出ても本流は止まらない。
pytest: 同梱フィクスチャを使う
利用側プロジェクトの conftest.py で本キットの testing プラグインを読み込むと、
echo / llm specialist が起動した URL がそのままフィクスチャとして降ってくる:
# conftest.py
pytest_plugins = ["agent_hub_kit.testing"]
# tests/test_my_orchestrator.py
import pytest
from agent_hub_kit import AgentHubClient
@pytest.mark.asyncio
async def test_with_echo(echo_specialist_url: str) -> None:
async with AgentHubClient() as client:
await client.connect_http("echo", echo_specialist_url)
assert await client.call_tool("echo", {"user_request": "hi"}) == "hi"
提供されるフィクスチャ:
echo_specialist_url: echo specialist を session スコープで起動llm_specialist_url: llm (DummyLLM) specialist を session スコープで起動free_port(): 空き port を取るヘルパー (自前 subprocess 用)
サーバーを auto-reload で開発する
mcp.run(transport="streamable-http") の代わりに ASGI アプリを取り出して
uvicorn を使えば --reload が利用できる:
# my_server_reload.py
from my_server import mcp
if __name__ != "__main__":
app = mcp.streamable_http_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"my_server_reload:app",
host="127.0.0.1", port=8001,
reload=True,
reload_dirs=["."],
)
python my_server_reload.py # .py を編集すると自動再起動
LLM ロードが重い場合は、再起動コストとのトレードオフがあるので、軽量 モデルで開発するか、--reload を使わず手動再起動が現実的な場合も多い。
同梱サンプルサーバー (CLI)
ライブラリに最小サンプル specialist を同梱しており、写経せずに 1 行で立ち上げられる:
# echo: 受け取った文字列を 1 文字ずつ text イベントで返す (依存少)
python -m agent_hub_kit.examples echo --host 127.0.0.1 --port 8001
# llm: llm-graph-kit + DummyLLM の QA エージェント
python -m agent_hub_kit.examples llm --host 127.0.0.1 --port 8002
オンボーディング / smoke test / pytest fixture の subprocess 起動先として
使える。実装は agent_hub_kit.examples.echo.build_echo_server() /
agent_hub_kit.examples.llm.build_llm_server() で取り出せるため、自前の
FastMCP に差し込んでカスタマイズすることも可能。
リポジトリ内の実行サンプル
本リポジトリをクローンした場合は examples/ に動作確認用のサンプルが
入っている。uv sync 後に以下で動く:
| サンプル | 起動コマンド | LLM | OS |
|---|---|---|---|
examples/llm_agent_server.py |
uv run python examples/llm_agent_server.py |
DummyLLM | 不問 |
examples/llm_agent_server_mlx.py |
uv run python examples/llm_agent_server_mlx.py |
augllm.MlxLLM |
macOS + MLX |
examples/llm_agent_server_reload.py |
uv run python examples/llm_agent_server_reload.py |
DummyLLM (+ --reload) | 不問 |
examples/llm_agent_server_mlx_reload.py |
uv run python examples/llm_agent_server_mlx_reload.py |
MlxLLM (+ --reload) | macOS + MLX |
クライアントはどのサーバーに対しても共通:
uv run python main.py # 標準 (内部で examples/llm_agent_client.py を実行)
# または
uv run python examples/llm_agent_client.py
ポート切替は環境変数 LLM_AGENT_PORT (デフォルト 8201) で。サーバーと
クライアントで同じ値を指定する。
なぜ MCP の素の SDK ではなく本キットなのか
MCP SDK (mcp パッケージ)
だけでも client/server の双方向通信は可能だが、マルチエージェント開発
では以下が頻出のため、本キットでカプセル化している:
- トークン単位ストリーミング:
notifications/progress.messageに JSON エンベロープを載せて多種類のイベントを多重化 - 複数サーバー集約: 1 つのクライアントから複数の MCP サーバーへ 並列接続し、ツール名で自動ルーティング
- 中断 (cancellation):
async forをbreakするだけでnotifications/cancelledが自動送出される - 画像入力規約: tool 引数
images: list[str]を data URI 形式で 渡す慣習を統一
開発
git clone https://github.com/ToPo-ToPo-ToPo/agent-hub-kit.git
cd agent-hub-kit
uv sync
uv run pytest -v
テストは tests/test_llm_agent_integration.py に集約されており、
examples/llm_agent_server.py をサブプロセス起動して公開 API のみで
end-to-end (発見・ストリーミング・中断・並行呼び出し) を検証する。
ライセンス
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agent_hub_kit-0.1.1.tar.gz.
File metadata
- Download URL: agent_hub_kit-0.1.1.tar.gz
- Upload date:
- Size: 26.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b682fdb06b4b5ee2bd13865a2b202a74a5666d12c0a62475c22dbcf26dc23a85
|
|
| MD5 |
ac19f221e5843ea3bd692f10e5720d7a
|
|
| BLAKE2b-256 |
562d34be6cbf6077cbcc2db2ae0a0bee756c698b733464f09bea276c2d6fcabc
|
File details
Details for the file agent_hub_kit-0.1.1-py3-none-any.whl.
File metadata
- Download URL: agent_hub_kit-0.1.1-py3-none-any.whl
- Upload date:
- Size: 32.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
07302e3d967aea91df148380f28bb609723386de0f913ad63780c7987778c18d
|
|
| MD5 |
a867482e22b42a4406bda2453ac6aeff
|
|
| BLAKE2b-256 |
c127ac6a24a3ad3986cec364e2b76bcde3bcaac8b57fb8575fcb4bd813f9327f
|