Add your description here
Project description
llm-graph-kit
LLM ベースのエージェントの動作内容をグラフ形式で記述・実行する軽量ライブラリです。 ノードとエッジでワークフローを宣言し、逐次実行・条件分岐・ストリーミングイベントの送出・Mermaid 可視化に対応します。
できること
- ノードと有向エッジでワークフローを宣言的に組み立てる
- 条件分岐(
stateのキー値 / 任意の関数 /Enum)でルートを動的に切り替える - ノードからの
yieldイベントがrun()の出力にそのまま流れる(LLM のトークンストリーム等を呼び出し側へ伝搬しやすい) TypedDictでステートのキーを宣言し、未宣言キーの書き込みを実行時に検出- ノードで発生した例外を自動で捕捉し、エラーイベントとして yield しつつ実行を継続
get_graph_mermaid()でグラフ構造を Mermaid 文字列として出力
インストール
uv add llm-graph-kit
開発する場合:
uv venv
source .venv/bin/activate
uv sync
クイックスタート
from typing import TypedDict, Optional
from llm_graph_kit import LLMGraph, NodeState
class State(TypedDict, total=False):
input: str
plan: Optional[str]
output: Optional[str]
decision: str
def plan(state: NodeState):
yield {"type": "log", "content": f"plan for: {state['input']}"}
return {"plan": "draft a response"}
def execute(state: NodeState):
yield {"type": "log", "content": "executing..."}
return {"output": f"done ({state['plan']})"}
def check(state: NodeState):
return {"decision": "complete" if state.get("output") else "retry"}
workflow = LLMGraph(state_schema=State)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("check", check)
workflow.add_edge(LLMGraph.START, "plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "check")
workflow.add_conditional_edge(
"check", "decision",
{"retry": "plan", "complete": LLMGraph.END},
)
print(workflow.get_graph_mermaid()) # Mermaid グラフを出力
gen = workflow.run({"input": "hello"})
try:
while True:
event = next(gen)
print("event:", event)
except StopIteration as e:
final_state = e.value
print("final:", final_state)
API
from llm_graph_kit import LLMGraph, NodeState, NodeFunc で公開シンボルを得られます。
LLMGraph(state_schema=None)
グラフを作成する。
state_schema: ステートのキーを宣言するTypedDict(または__annotations__を持つクラス)。新規コードでは必ず指定してください。- 定数
LLMGraph.START/LLMGraph.ENDは擬似ノードを表す。 - 予約キー
__errors__はライブラリが管理するためユーザーは書き込めない(後述)。
add_node(name, func)
ノードを 1 つ登録する。name はユニークな文字列。func は次の「ノード関数」のいずれか。
add_edge(from_node, to_node)
from_node から to_node への単一エッジを張る。
from_node に LLMGraph.START を渡すとエントリポイントが設定される。
to_node に LLMGraph.END を渡すと終端へ。
g.add_edge(LLMGraph.START, "first_node")
g.add_edge("a", "b")
g.add_edge("last_node", LLMGraph.END)
add_conditional_edge(from_node, condition, path_map)
from_node の出力に応じて遷移先を分岐する。
condition:- 文字列:
state[condition]の値をシグナルに使う - callable:
stateを受け取り、文字列またはEnumを返す関数
- 文字列:
path_map: シグナル値 → 次のノード名 の辞書。LLMGraph.ENDも値として使えるpath_mapに該当キーがなければ自動でENDへ遷移
# (1) state のキーで分岐
g.add_conditional_edge("check", "decision",
{"retry": "plan", "complete": LLMGraph.END})
# (2) 関数で分岐
g.add_conditional_edge(
"check",
lambda s: "ok" if s["score"] >= 80 else "ng",
{"ok": "publish", "ng": "revise"},
)
# (3) Enum で分岐(path_map のキーにもメンバーを置ける)
from enum import Enum
class Decision(Enum):
RETRY = "retry"
DONE = "done"
g.add_conditional_edge("check", "decision",
{Decision.RETRY: "plan", Decision.DONE: LLMGraph.END})
run(initial_state, max_steps=100)
グラフを実行する。ジェネレータを返す。
initial_state: 初期 state。スキーマで宣言したキーのみ書き込み可max_steps: 実行できるノードの上限(既定 100)。サイクルで上限超過時はRuntimeError- ノードから
yieldされた任意の値はそのままジェネレータの出力に流れる - すべての遷移が終わると最終
state(dict)がStopIteration.valueとして返る
gen = workflow.run({"input": "x"})
try:
while True:
event = next(gen)
# event を表示・配信する
except StopIteration as e:
final_state = e.value
シンプルに全イベントを使い捨てるなら:
for event in workflow.run({"input": "x"}):
print(event)
get_graph_mermaid()
グラフ構造を Mermaid 文字列で返す。コード/CLI からの貼り付けや LLM 出力での可視化向け。
print(workflow.get_graph_mermaid())
ノード関数の書き方
ノードは「state を受け取り、state 更新(dict)または None を返す関数」です。yield してジェネレータにすることで、実行中のイベントをストリーミングできます。
通常関数
def my_node(state: NodeState):
return {"key": "value"} # ← state にマージされる
None を返してもよい(state 更新なし)。
ジェネレータ(ストリーミング)
def my_node(state: NodeState):
yield {"type": "log", "content": "started"}
yield {"type": "answer_text", "content": "chunk..."}
return {"key": "value"} # 最終 state 更新
yield した値はそのまま run() の出力に出ます。形式は任意で、ユーザーが好きに決められます(例: {"type": "log", ...} / {"type": "answer_text", ...} など)。
ノード内例外
ノード内で例外が発生すると、ライブラリが自動で捕捉して以下を行います:
state["__errors__"]リストにエラーメッセージを追加(このキーはライブラリが管理){"type": "error", "agent": <node_name>, "content": <message>}を yield- グラフは止まらず、次の遷移ルールに進む(戻り値 None として扱う)
これにより、リトライループ等でエラーを引きずらないグラフが書けます。
State の取り扱い
- 初期 state は内部で
deepcopyされ、呼び出し側のオブジェクトを変更しません - 各ノードの戻り値 dict は
state.update(...)で既存 state にマージされます(同名キーは上書き) __errors__は予約キー。initial_stateでもノード戻り値でも書き込めません(読み取りは自由)
サンプル 1: シンプルなカウンタ(LLM 非依存)
リポジトリの example_with_schema.py と同じものです。3 回ループしてから終端ノードに進むグラフです。
"""
state_schema を使ったサンプル(LLM 非依存)。
TypedDict でステートのキーを宣言することで:
- 実行時に未宣言キーへの書き込みを検出して即時に ValueError を発生
- mypy / IDE で補完と型チェックが効く
実行:
python example_with_schema.py
"""
from typing import TypedDict, Optional, List
from llm_graph_kit import LLMGraph, NodeState
# ---------------------------------------------------------------------------
# 1. ステートのスキーマを TypedDict で宣言
# ---------------------------------------------------------------------------
class CounterState(TypedDict, total=False):
"""カウンタグラフのステート定義。total=False で全フィールドを任意扱いに。"""
input: str
count: int
history: List[str]
decision: str # 条件分岐で参照するキー
final_message: Optional[str]
# ---------------------------------------------------------------------------
# 2. ノード関数
# ---------------------------------------------------------------------------
def start_node(state: NodeState):
return {
"count": 0,
"history": [f"started with input={state['input']}"],
}
def increment_node(state: NodeState):
new_count = state["count"] + 1
history = state["history"] + [f"tick {new_count}"]
decision = "stop" if new_count >= 3 else "continue"
return {
"count": new_count,
"history": history,
"decision": decision,
}
def finish_node(state: NodeState):
return {
"history": state["history"] + ["done"],
"final_message": f"completed after {state['count']} ticks",
}
# ---------------------------------------------------------------------------
# 3. グラフ構築
# ---------------------------------------------------------------------------
def build_graph() -> LLMGraph:
g = LLMGraph(state_schema=CounterState)
g.add_node("start", start_node)
g.add_node("increment", increment_node)
g.add_node("finish", finish_node)
g.add_edge(LLMGraph.START, "start")
g.add_edge("start", "increment")
g.add_conditional_edge(
"increment",
"decision",
{"continue": "increment", "stop": "finish"},
)
g.add_edge("finish", LLMGraph.END)
return g
# ---------------------------------------------------------------------------
# 4. 実行
# ---------------------------------------------------------------------------
def main():
g = build_graph()
# run() はジェネレータを返す。最終 state は StopIteration.value で受け取る
gen = g.run({"input": "hello"})
final_state = None
try:
while True:
event = next(gen)
print("event:", event)
except StopIteration as e:
final_state = e.value
print("\n--- final state ---")
for k, v in final_state.items():
print(f" {k}: {v}")
if __name__ == "__main__":
main()
実行例(このサンプルはノードが yield しないため、中間イベントはなく最終 state のみが得られます):
--- final state ---
input: hello
__errors__: []
count: 3
history: ['started with input=hello', 'tick 1', 'tick 2', 'tick 3', 'done']
decision: stop
final_message: completed after 3 ticks
サンプル 2: LLM を使うエージェント
リポジトリの example_with_llm.py と同じものです。
「質問に答える → 答えをチェック → 短ければやり直す」という 2 段のグラフを構築します。LLM のストリーミングチャンクを yield でそのまま呼び出し側へ流す、エージェント実装の典型構成です。
構成:
- LLM は外側で生成してエージェントへ注入する(依存注入)
- グラフ構築を
build_graph()メソッドに分離(再利用・テストしやすい) - 各ノードはエージェントの private メソッド (
_answer,_checkなど) - 公開エントリポイント
run()は initial state の組み立て、Mermaid 図の通知、下位グラフからのイベント中継 (yield from) を担当
"""
LLM を使った最小サンプル(エージェント)。
質問に答えるノードと、答えをチェックするノードからなる 2 段グラフ。
チェックで「短すぎる」と判定されたらやり直す。最大 3 回でループを抜ける。
実行:
python example_with_llm.py
"""
from typing import TypedDict
from augllm import MlxLLM
from llm_graph_kit import LLMGraph, NodeState
# ---------------------------------------------------------------------------
# ステートのスキーマ
# ---------------------------------------------------------------------------
class QAState(TypedDict, total=False):
question: str
answer: str
attempts: int
decision: str
# ---------------------------------------------------------------------------
# エージェントクラス
# ---------------------------------------------------------------------------
class QAAgent:
"""質問応答エージェント。LLM を保持し、グラフの定義と実行を提供する。"""
# --------------------------------------------------
# 初期化
# --------------------------------------------------
def __init__(self, llm) -> None:
self.llm = llm
# --------------------------------------------------
# 公開エントリポイント
# --------------------------------------------------
def run(self, question: str):
"""質問を受け取り、グラフ実行中のイベントを呼び出し側へ yield する。"""
graph = self.build_graph()
# まずグラフ構造を log イベントとして流しておく(呼び出し側で可視化できる)
yield {
"type": "log",
"node": "system",
"content": f"Workflow Definition:\n{graph.get_graph_mermaid()}",
}
# 下位グラフのイベントをそのまま中継
initial_state = {"question": question, "attempts": 0}
yield from graph.run(initial_state)
# --------------------------------------------------
# グラフ定義
# --------------------------------------------------
def build_graph(self) -> LLMGraph:
"""このエージェントのワークフローを構築して返す。"""
g = LLMGraph(state_schema=QAState)
# ノード登録
g.add_node(name="answer", func=self._answer)
g.add_node(name="check", func=self._check)
# エッジ定義
g.add_edge(LLMGraph.START, "answer")
g.add_edge("answer", "check")
# 条件付きエッジ: state["decision"] によって遷移先を切り替える
g.add_conditional_edge(
"check",
"decision",
{
"retry": "answer",
"ok": LLMGraph.END,
},
)
return g
# --------------------------------------------------
# ノード: LLM で回答を生成(ストリーミング)
# --------------------------------------------------
def _answer(self, state: NodeState):
node_name = "answer"
yield {"type": "log", "node": node_name, "content": "answering..."}
system_prompt = "あなたは簡潔に答えるアシスタントです。"
user_prompt = f"質問: {state['question']}"
# LLM のチャンクをそのまま呼び出し側へ流す
text = ""
for chunk in self.llm.respond(
system_prompt=system_prompt, user_text=user_prompt, stream=True
):
text += chunk
yield {
"type": "answer_text",
"node": node_name,
"taskId": f"{node_name}-answer-text",
"content": chunk,
}
# state を更新(回答本文と試行回数のインクリメント)
return {"answer": text, "attempts": state.get("attempts", 0) + 1}
# --------------------------------------------------
# ノード: 回答の品質チェックと分岐シグナルの決定
# --------------------------------------------------
def _check(self, state: NodeState):
node_name = "check"
too_short = len(state["answer"]) < 30
give_up = state["attempts"] >= 3
decision = "retry" if (too_short and not give_up) else "ok"
yield {
"type": "log",
"node": node_name,
"content": (
f"len={len(state['answer'])} attempts={state['attempts']}"
f" -> {decision}"
),
}
return {"decision": decision}
# ---------------------------------------------------------------------------
# メインプログラム
# ---------------------------------------------------------------------------
if __name__ == "__main__":
# 1. エージェントを構築
LLM_PATH = "mlx-community/Qwen3.6-27B-4bit"
llm = MlxLLM(model_path=LLM_PATH)
agent = QAAgent(llm=llm)
# 2. 入力
question = "地球の半径は何 km?"
print(f"Request: {question}\n")
# 3. 実行(ストリーミングを受け取り、イベント種別ごとに表示先を変える)
for event in agent.run(question=question):
# LLM 出力のチャンク
if event["type"] == "answer_text":
print(event["content"], end="", flush=True)
# ログ
elif event["type"] == "log":
print(f"\n[LOG] {event['node']}: {event['content']}")
# ノード内例外(ライブラリが自動で yield する)
elif event["type"] == "error":
print(f"\n[ERROR] {event['agent']}: {event['content']}")
print("\n\nProcess Completed.")
このサンプルで示しているパターン:
- 依存注入: LLM は外側で生成してエージェントへ渡す。テスト時のモック化や LLM 実装の差し替えがしやすい
- エージェントの境界: グラフとノード関数をひとつのクラスに集約し、外側からは
agent.run(...)だけで呼べる build_graph()の分離: グラフ構築を専用メソッドにすることで、テスト・可視化・サブグラフ化が容易run()の責務: initial state の生成とyield from graph.run(...)による中継だけに留め、各ノードの処理は private メソッドに任せる- イベントの規約:
typeで種別を分け(log/answer_text/error等)、nodeキーに発火元を入れる __errors__は触らない: ノードで例外が起きるとライブラリが自動で{"type": "error", "agent": ..., "content": ...}を流す- リトライの上限はドメイン側:
attempts >= 3でdecision="ok"にして抜ける。max_stepsはライブラリ側のセーフティネット
ライセンス / リポジトリ
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file llm_graph_kit-1.1.1.tar.gz.
File metadata
- Download URL: llm_graph_kit-1.1.1.tar.gz
- Upload date:
- Size: 11.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
80fe379ad3b3d17467b1741caf8ef13ea9779b714a8ed78b51aa17af3d7ad5a7
|
|
| MD5 |
ff6974cb818ff0ee8131bdf9d5030c9b
|
|
| BLAKE2b-256 |
6d16615d6ff68515b4f04786a8a0e1dcb82c3c8eb3622d45e8c28451b086f921
|
File details
Details for the file llm_graph_kit-1.1.1-py3-none-any.whl.
File metadata
- Download URL: llm_graph_kit-1.1.1-py3-none-any.whl
- Upload date:
- Size: 12.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
aeb715e3c4f2936329c5c16d8ff3d5011ebdc0136df4114aa42f40faf3bd5167
|
|
| MD5 |
725c945890c49303c014f670bf72f723
|
|
| BLAKE2b-256 |
7df054b3bb4bd54fca00e0b0aaff498190d572393163b390f23a75d5a070c424
|