MCP ベースのマルチエージェント開発キット: ストリーミング対応の MCP クライアント / サーバーをまとめた統合ライブラリ
Project description
agent-hub-kit
MCP (Model Context Protocol) ベースの マルチエージェント開発キット。 MCP クライアントと MCP サーバーの両方を、トークン単位ストリーミング対応 で簡潔に書けるようにする 1 つのパッケージ。複数のエージェントリポジトリ から同じプロトコルで呼び合えるようにすることを目的とする。
特徴
- トークン単位ストリーミング:
notifications/progress.messageに JSON エンベロープを載せてtext/think/log/image/stl/video/emotionなどを多重化。LLM のトークンを 1 文字ずつ流せる - 複数 MCP サーバー集約: 1 つの
AgentHubClientで複数の MCP サーバーへ 並列接続し、ツール名でルーティング (AsyncExitStackで安全に管理) - 中断 (cancellation):
async forをbreakするだけで MCP のnotifications/cancelledが自動送出される薄いラッパ - 画像入力規約: ツール引数
images: list[str]を data URI 形式で渡す 慣習を統一 - 標準準拠: 中身は MCP SDK (
mcpパッケージ) のラッパで、独自プロトコル なし。他の MCP クライアント/サーバーとも互換
インストール
uv add agent-hub-kit
# または
pip install agent-hub-kit
未リリース版を使う場合:
uv add "agent-hub-kit @ git+https://github.com/ToPo-ToPo-ToPo/agent-hub-kit.git"
要件: Python 3.13+。主要な依存は mcp, httpx, pydantic, llm-graph-kit。
アーキテクチャ
┌────────────────────────────────────────┐
│ AgentHubClient │
│ (複数 MCP サーバーを集約) │
└──────┬──────────────┬──────────────┬──┘
│ MCP/HTTP │ MCP/HTTP │ MCP/HTTP
▼ ▼ ▼
┌──────────────────────────────────────────┐
│ 各 MCP サーバー │
│ EventStreamer で progress notification │
│ としてトークン単位ストリーム配信 │
└──────────────────────────────────────────┘
| 役割 | 主要 API | MCP プロトコル上の立ち位置 |
|---|---|---|
| MCP クライアント (呼び出し側) | AgentHubClient |
client |
| MCP サーバー (応答側) | EventStreamer / stream_text_iter |
server |
クイックスタート
サーバーとクライアントを 1 ファイルずつ作って動かす最小例。
1. サーバー (my_server.py)
"""自前 MCP サーバーの最小例。"""
import asyncio
from mcp.server.fastmcp import Context, FastMCP
from agent_hub_kit import EventStreamer
mcp = FastMCP("my-app")
@mcp.tool()
async def echo(text: str, ctx: Context) -> str:
"""入力テキストを 1 文字ずつ text イベントとしてストリーム配信。"""
stream = EventStreamer(ctx, node="my-app")
await stream.log(f"受信: {len(text)} 文字")
for ch in text:
await stream.text(ch)
await asyncio.sleep(0.02)
return text
if __name__ == "__main__":
mcp.settings.host = "127.0.0.1"
mcp.settings.port = 8001
mcp.run(transport="streamable-http")
2. クライアント (my_client.py)
"""自前 MCP サーバーを呼び出すクライアント。"""
import asyncio
from agent_hub_kit import AgentHubClient
async def main():
async with AgentHubClient() as client:
await client.connect_http("my_app", "http://127.0.0.1:8001/mcp")
async for ev in client.call_tool_stream("echo", {"text": "Hello"}):
if ev.type == "text":
print(ev.content, end="", flush=True)
elif ev.type == "log":
print(f"\n[log] {ev.content}")
elif ev.type == "result":
print(f"\n[result] {ev.content!r}")
asyncio.run(main())
3. 実行 (2 ターミナル)
# ターミナル A
python my_server.py
# ターミナル B
python my_client.py
Hello が 1 文字ずつストリームで届き、最後に [result] 'Hello' が表示
される。
API リファレンス
公開 API はすべて from agent_hub_kit import ... で取得可能:
from agent_hub_kit import (
AgentHubClient, # MCP クライアント
ToolRef, # tools/list で発見した 1 ツールの情報
StreamEvent, # ストリーム配信される 1 イベント
ENVELOPE_VERSION, # JSON エンベロープのバージョン
encode_envelope, # サーバー側で内部利用 (通常は EventStreamer 経由)
decode_envelope, # クライアント側で内部利用 (通常は自動)
EventStreamer, # サーバー側のストリーミング送出ヘルパー
stream_text_iter, # テキストイテレータを text イベントとして流すヘルパー
)
AgentHubClient
複数 MCP サーバーへの接続を束ねる async context manager。
async with AgentHubClient() as client:
refs = await client.connect_http("server_a", "http://localhost:8001/mcp")
...
| メソッド | 説明 |
|---|---|
connect_http(server_name: str, url: str) -> list[ToolRef] |
Streamable HTTP で 1 サーバーに接続し、tools/list 結果を返す |
connect_all_http(endpoints: Mapping[str, str]) -> list[ToolRef] |
複数エンドポイントを一括接続。{"name": "http://.../mcp", ...} |
list_tools() -> dict[str, list[ToolRef]] |
接続中の全サーバーから tools/list を取り直す。{server: [ToolRef]} |
find_tool(tool_name: str) -> ToolRef | None |
キャッシュ済みのツール情報を返す |
call_tool_stream(tool_name, arguments, *, server=None, queue_maxsize=256) -> AsyncIterator[StreamEvent] |
ツールを呼び出し、StreamEvent を逐次 yield する async generator |
call_tool_stream() は中断 (cancellation) も簡単:
async for ev in client.call_tool_stream("echo", {"text": "..."}):
if ev.type == "text":
print(ev.content)
if should_stop():
break # ← MCP の notifications/cancelled が自動送出される
EventStreamer
MCP サーバー側でツール内から使うストリーミング送出ヘルパー。FastMCP の
Context をラップする。
@mcp.tool()
async def my_tool(user_request: str, ctx: Context) -> str:
stream = EventStreamer(ctx, node="my-agent")
await stream.log("開始")
for chunk in run_llm(user_request):
await stream.text(chunk)
return final_text
| メソッド | 配信される StreamEvent.type |
用途 |
|---|---|---|
text(chunk: str) |
text |
LLM トークン等のテキスト断片 |
think(chunk: str) |
think |
思考プロセス (UI でグレー表示等) |
log(message: str) |
log |
進捗ログ |
image(payload: dict) |
image |
画像 (URL / base64) |
stl(payload: dict) |
stl |
STL モデル情報 |
video(payload: dict) |
video |
動画情報 |
emotion(payload: dict) |
emotion_data |
感情パラメータ |
error(message: str) |
error |
ツール失敗扱いにせず継続したい時のエラー |
send(event_type: str, content: Any) |
任意 | 上記以外の独自種別を送る場合の汎用 API |
stream_text_iter(stream, chunks)
テキストイテレータをまとめて text イベントとして配信し、連結結果を返す ヘルパー。同期/非同期どちらのイテレータも受け付ける。
full_text = await stream_text_iter(stream, llm.stream(prompt))
StreamEvent
クライアントが受信するイベント 1 件。call_tool_stream() が yield する型。
@dataclass(slots=True)
class StreamEvent:
type: str # "text" / "think" / "log" / ... / "result" / "error" / "cancelled"
content: Any = None # 種別依存ペイロード
node: str | None = None # 送信元サーバー / エージェント名
raw: dict | None = None # 元の JSON エンベロープ (デバッグ用)
イベント種別一覧
クライアントが StreamEvent.type で受け取りうる値:
type |
内容 | 発生源 |
|---|---|---|
text |
LLM トークン等のテキスト断片 | stream.text() |
think |
思考プロセス | stream.think() |
log |
進捗ログ | stream.log() |
image |
画像 | stream.image() |
stl |
STL モデル | stream.stl() |
video |
動画 | stream.video() |
emotion_data |
感情パラメータ | stream.emotion() |
result |
ツールの最終戻り値 | call_tool_stream が call_tool 完了時に 1 度だけ yield |
error |
エラー | stream.error() / call_tool 例外 / ツール側 isError |
cancelled |
キャンセル完了 | クライアント側で中断した時 |
エンベロープ形式
notifications/progress.message に載せる JSON 形式:
{
"v": 1,
"type": "text",
"content": "あ",
"node": "my-agent"
}
v:ENVELOPE_VERSION(現在 1)type: 上記イベント種別のいずれか、または任意文字列content: 種別依存ペイロード (text は str、image は dict 等)node: 送信元の識別子 (任意。UI で吹き出し色分け等に使う想定)
JSON ではない通常文字列が来た場合は type="text" として解釈される
(前方互換)。
パターン集
複数の MCP サーバーをひとつのクライアントから呼ぶ
async with AgentHubClient() as client:
await client.connect_all_http({
"topology": "http://localhost:8001/mcp",
"simulation": "http://localhost:8002/mcp",
"rendering": "http://localhost:8003/mcp",
})
# ツール名で自動ルーティング (どの server に居るかは tools/list で発見済み)
async for ev in client.call_tool_stream("optimize_beam", {"user_request": "..."}):
handle(ev)
並列でツール呼び出し
async def call(label):
async for ev in client.call_tool_stream("ask", {"user_request": label}):
if ev.type == "result":
return ev.content
return ""
async with AgentHubClient() as client:
await client.connect_http("agent", "http://localhost:8001/mcp")
a, b = await asyncio.gather(call("質問 A"), call("質問 B"))
中断 (cancellation)
async for ev in client.call_tool_stream("long_running", args):
if ev.type == "text":
print(ev.content, end="")
if user_pressed_stop():
break # ← MCP の notifications/cancelled が自動で送出される
画像を入力として渡す
サーバー側はツール引数に images: list[str] | None = None を取り、data URI
形式の base64 文字列リストを受け取る規約。
# クライアント側
import base64
with open("input.png", "rb") as f:
b64 = base64.b64encode(f.read()).decode()
images = [f"data:image/png;base64,{b64}"]
async for ev in client.call_tool_stream(
"analyze_image",
{"user_request": "この画像を説明して", "images": images},
):
...
# サーバー側
@mcp.tool()
async def analyze_image(
user_request: str,
ctx: Context,
images: list[str] | None = None,
) -> str:
stream = EventStreamer(ctx, node="vision-agent")
if images:
for uri in images:
_, b64 = uri.split(",", 1)
img_bytes = base64.b64decode(b64)
# ... VLM 等に渡す
...
エラーハンドリング
async for ev in client.call_tool_stream("flaky_tool", args):
if ev.type == "error":
# ツール内で stream.error() されたか、ツールが isError=True で完了した
log.warning(f"server error: {ev.content}")
elif ev.type == "result":
...
接続自体が失敗する場合は connect_http が例外を投げる:
try:
await client.connect_http("server", url)
except Exception as e:
log.error(f"接続失敗: {e!r}")
llm-graph-kit と組み合わせて LLM エージェントを書く
llm-graph-kit で宣言した
グラフベースのエージェントを MCP サーバーとして公開するパターン。
llm-graph-kit のイベント ({"type": "answer_text", ...} 等) を
EventStreamer のメソッドにマップする:
from mcp.server.fastmcp import Context, FastMCP
from llm_graph_kit import LLMGraph, NodeState
from agent_hub_kit import EventStreamer
class MyAgent:
def __init__(self, llm):
self.llm = llm
def build_graph(self) -> LLMGraph:
g = LLMGraph(state_schema=...)
g.add_node("answer", self._answer)
...
return g
def _answer(self, state: NodeState):
for chunk in self.llm.respond(...):
yield {"type": "answer_text", "content": chunk}
return {"answer": "..."}
def run(self, question: str):
yield from self.build_graph().run({"question": question})
mcp = FastMCP("my-llm-agent")
agent = MyAgent(llm=...)
@mcp.tool()
async def ask(user_request: str, ctx: Context) -> str:
stream = EventStreamer(ctx, node="my-llm-agent")
answer = ""
for event in agent.run(user_request):
if event["type"] == "answer_text":
answer += event["content"]
await stream.text(event["content"])
elif event["type"] == "log":
await stream.log(event["content"])
return answer
完全な動作例は本リポジトリの examples/llm_agent_server.py (DummyLLM 版)
および examples/llm_agent_server_mlx.py (実 LLM 版) を参照。
サーバーを auto-reload で開発する
mcp.run(transport="streamable-http") の代わりに ASGI アプリを取り出して
uvicorn を使えば --reload が利用できる:
# my_server_reload.py
from my_server import mcp
if __name__ != "__main__":
app = mcp.streamable_http_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"my_server_reload:app",
host="127.0.0.1", port=8001,
reload=True,
reload_dirs=["."],
)
python my_server_reload.py # .py を編集すると自動再起動
LLM ロードが重い場合は、再起動コストとのトレードオフがあるので、軽量 モデルで開発するか、--reload を使わず手動再起動が現実的な場合も多い。
リポジトリ内の実行サンプル
本リポジトリをクローンした場合は examples/ に動作確認用のサンプルが
入っている。uv sync 後に以下で動く:
| サンプル | 起動コマンド | LLM | OS |
|---|---|---|---|
examples/llm_agent_server.py |
uv run python examples/llm_agent_server.py |
DummyLLM | 不問 |
examples/llm_agent_server_mlx.py |
uv run python examples/llm_agent_server_mlx.py |
augllm.MlxLLM |
macOS + MLX |
examples/llm_agent_server_reload.py |
uv run python examples/llm_agent_server_reload.py |
DummyLLM (+ --reload) | 不問 |
examples/llm_agent_server_mlx_reload.py |
uv run python examples/llm_agent_server_mlx_reload.py |
MlxLLM (+ --reload) | macOS + MLX |
クライアントはどのサーバーに対しても共通:
uv run python main.py # 標準 (内部で examples/llm_agent_client.py を実行)
# または
uv run python examples/llm_agent_client.py
ポート切替は環境変数 LLM_AGENT_PORT (デフォルト 8201) で。サーバーと
クライアントで同じ値を指定する。
なぜ MCP の素の SDK ではなく本キットなのか
MCP SDK (mcp パッケージ)
だけでも client/server の双方向通信は可能だが、マルチエージェント開発
では以下が頻出のため、本キットでカプセル化している:
- トークン単位ストリーミング:
notifications/progress.messageに JSON エンベロープを載せて多種類のイベントを多重化 - 複数サーバー集約: 1 つのクライアントから複数の MCP サーバーへ 並列接続し、ツール名で自動ルーティング
- 中断 (cancellation):
async forをbreakするだけでnotifications/cancelledが自動送出される - 画像入力規約: tool 引数
images: list[str]を data URI 形式で 渡す慣習を統一
開発
git clone https://github.com/ToPo-ToPo-ToPo/agent-hub-kit.git
cd agent-hub-kit
uv sync
uv run pytest -v
テストは tests/test_llm_agent_integration.py に集約されており、
examples/llm_agent_server.py をサブプロセス起動して公開 API のみで
end-to-end (発見・ストリーミング・中断・並行呼び出し) を検証する。
ライセンス
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agent_hub_kit-0.1.0.tar.gz.
File metadata
- Download URL: agent_hub_kit-0.1.0.tar.gz
- Upload date:
- Size: 13.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8421c35fe325c51c760f21093e232f610c7458337953cfa710a6a865b8bb9850
|
|
| MD5 |
f88af6990e6f43cd5fe2209d30c5b483
|
|
| BLAKE2b-256 |
90bd11626cf7332d6ca23f6f50249e81014e7f31adbfbf240b9ea46a33732ea5
|
File details
Details for the file agent_hub_kit-0.1.0-py3-none-any.whl.
File metadata
- Download URL: agent_hub_kit-0.1.0-py3-none-any.whl
- Upload date:
- Size: 15.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8bd91649c998f9b3d74fbc0e3d0a34f73c747614457f64e75783733c0d8818c6
|
|
| MD5 |
9301ef729b6318a65038cbc82cd39d30
|
|
| BLAKE2b-256 |
f678ae79bdd00d55720bfb1459d2ae23f6e8f3faf2bff4e01e965cad17bf240b
|