Skip to main content

Add your description here

Project description

llm-graph-kit

LLM ベースのエージェントの動作内容をグラフ形式で記述・実行する軽量ライブラリです。 ノードとエッジでワークフローを宣言し、逐次実行・条件分岐・ストリーミングイベントの送出・Mermaid 可視化に対応します。

できること

  • ノードと有向エッジでワークフローを宣言的に組み立てる
  • 条件分岐(state のキー値 / 任意の関数 / Enum)でルートを動的に切り替える
  • ノードからの yield イベントが run() の出力にそのまま流れる(LLM のトークンストリーム等を呼び出し側へ伝搬しやすい)
  • TypedDict でステートのキーを宣言し、未宣言キーの書き込みを実行時に検出
  • ノードで発生した例外を自動で捕捉し、エラーイベントとして yield しつつ実行を継続
  • get_graph_mermaid() でグラフ構造を Mermaid 文字列として出力

インストール

uv add llm-graph-kit

開発する場合:

uv venv
source .venv/bin/activate
uv sync

クイックスタート

from typing import TypedDict, Optional
from llm_graph_kit import LLMGraph, NodeState


class State(TypedDict, total=False):
    input: str
    plan: Optional[str]
    output: Optional[str]
    decision: str


def plan(state: NodeState):
    yield {"type": "log", "content": f"plan for: {state['input']}"}
    return {"plan": "draft a response"}


def execute(state: NodeState):
    yield {"type": "log", "content": "executing..."}
    return {"output": f"done ({state['plan']})"}


def check(state: NodeState):
    return {"decision": "complete" if state.get("output") else "retry"}


workflow = LLMGraph(state_schema=State)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("check", check)

workflow.add_edge(LLMGraph.START, "plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "check")
workflow.add_conditional_edge(
    "check", "decision",
    {"retry": "plan", "complete": LLMGraph.END},
)

print(workflow.get_graph_mermaid())   # Mermaid グラフを出力

gen = workflow.run({"input": "hello"})
try:
    while True:
        event = next(gen)
        print("event:", event)
except StopIteration as e:
    final_state = e.value
    print("final:", final_state)

API

from llm_graph_kit import LLMGraph, NodeState, NodeFunc で公開シンボルを得られます。

LLMGraph(state_schema=None)

グラフを作成する。

  • state_schema: ステートのキーを宣言する TypedDict(または __annotations__ を持つクラス)。新規コードでは必ず指定してください。
  • 定数 LLMGraph.START / LLMGraph.END は擬似ノードを表す。
  • 予約キー __errors__ はライブラリが管理するためユーザーは書き込めない(後述)。

add_node(name, func)

ノードを 1 つ登録する。name はユニークな文字列。func は次の「ノード関数」のいずれか。

add_edge(from_node, to_node)

from_node から to_node への単一エッジを張る。 from_nodeLLMGraph.START を渡すとエントリポイントが設定される。 to_nodeLLMGraph.END を渡すと終端へ。

g.add_edge(LLMGraph.START, "first_node")
g.add_edge("a", "b")
g.add_edge("last_node", LLMGraph.END)

add_conditional_edge(from_node, condition, path_map)

from_node の出力に応じて遷移先を分岐する。

  • condition:
    • 文字列: state[condition] の値をシグナルに使う
    • callable: state を受け取り、文字列または Enum を返す関数
  • path_map: シグナル値 → 次のノード名 の辞書。LLMGraph.END も値として使える
  • path_map に該当キーがなければ自動で END へ遷移
# (1) state のキーで分岐
g.add_conditional_edge("check", "decision",
                       {"retry": "plan", "complete": LLMGraph.END})

# (2) 関数で分岐
g.add_conditional_edge(
    "check",
    lambda s: "ok" if s["score"] >= 80 else "ng",
    {"ok": "publish", "ng": "revise"},
)

# (3) Enum で分岐(path_map のキーにもメンバーを置ける)
from enum import Enum
class Decision(Enum):
    RETRY = "retry"
    DONE = "done"

g.add_conditional_edge("check", "decision",
                       {Decision.RETRY: "plan", Decision.DONE: LLMGraph.END})

run(initial_state, max_steps=100)

グラフを実行する。ジェネレータを返す。

  • initial_state: 初期 state。スキーマで宣言したキーのみ書き込み可
  • max_steps: 実行できるノードの上限(既定 100)。サイクルで上限超過時は RuntimeError
  • ノードから yield された任意の値はそのままジェネレータの出力に流れる
  • すべての遷移が終わると最終 state(dict)が StopIteration.value として返る
gen = workflow.run({"input": "x"})
try:
    while True:
        event = next(gen)
        # event を表示・配信する
except StopIteration as e:
    final_state = e.value

シンプルに全イベントを使い捨てるなら:

for event in workflow.run({"input": "x"}):
    print(event)

get_graph_mermaid()

グラフ構造を Mermaid 文字列で返す。コード/CLI からの貼り付けや LLM 出力での可視化向け。

print(workflow.get_graph_mermaid())

ノード関数の書き方

ノードは「state を受け取り、state 更新(dict)または None を返す関数」です。yield してジェネレータにすることで、実行中のイベントをストリーミングできます。

通常関数

def my_node(state: NodeState):
    return {"key": "value"}   # ← state にマージされる

None を返してもよい(state 更新なし)。

ジェネレータ(ストリーミング)

def my_node(state: NodeState):
    yield {"type": "log", "content": "started"}
    yield {"type": "answer_text", "content": "chunk..."}
    return {"key": "value"}   # 最終 state 更新

yield した値はそのまま run() の出力に出ます。形式は任意で、ユーザーが好きに決められます(例: {"type": "log", ...} / {"type": "answer_text", ...} など)。

ノード内例外

ノード内で例外が発生すると、ライブラリが自動で捕捉して以下を行います:

  1. state["__errors__"] リストにエラーメッセージを追加(このキーはライブラリが管理)
  2. {"type": "error", "agent": <node_name>, "content": <message>} を yield
  3. グラフは止まらず、次の遷移ルールに進む(戻り値 None として扱う)

これにより、リトライループ等でエラーを引きずらないグラフが書けます。

State の取り扱い

  • 初期 state は内部で deepcopy され、呼び出し側のオブジェクトを変更しません
  • 各ノードの戻り値 dict は state.update(...) で既存 state にマージされます(同名キーは上書き)
  • __errors__ は予約キー。initial_state でもノード戻り値でも書き込めません(読み取りは自由)

サンプル 1: シンプルなカウンタ(LLM 非依存)

リポジトリの example_with_schema.py と同じものです。3 回ループしてから終端ノードに進むグラフです。

"""
state_schema を使ったサンプル(LLM 非依存)。

TypedDict でステートのキーを宣言することで:
  - 実行時に未宣言キーへの書き込みを検出して即時に ValueError を発生
  - mypy / IDE で補完と型チェックが効く

実行:
    python example_with_schema.py
"""
from typing import TypedDict, Optional, List

from llm_graph_kit import LLMGraph, NodeState


# ---------------------------------------------------------------------------
# 1. ステートのスキーマを TypedDict で宣言
# ---------------------------------------------------------------------------
class CounterState(TypedDict, total=False):
    """カウンタグラフのステート定義。total=False で全フィールドを任意扱いに。"""
    input: str
    count: int
    history: List[str]
    decision: str            # 条件分岐で参照するキー
    final_message: Optional[str]


# ---------------------------------------------------------------------------
# 2. ノード関数
# ---------------------------------------------------------------------------
def start_node(state: NodeState):
    return {
        "count": 0,
        "history": [f"started with input={state['input']}"],
    }


def increment_node(state: NodeState):
    new_count = state["count"] + 1
    history = state["history"] + [f"tick {new_count}"]
    decision = "stop" if new_count >= 3 else "continue"
    return {
        "count": new_count,
        "history": history,
        "decision": decision,
    }


def finish_node(state: NodeState):
    return {
        "history": state["history"] + ["done"],
        "final_message": f"completed after {state['count']} ticks",
    }


# ---------------------------------------------------------------------------
# 3. グラフ構築
# ---------------------------------------------------------------------------
def build_graph() -> LLMGraph:
    g = LLMGraph(state_schema=CounterState)

    g.add_node("start", start_node)
    g.add_node("increment", increment_node)
    g.add_node("finish", finish_node)

    g.add_edge(LLMGraph.START, "start")
    g.add_edge("start", "increment")
    g.add_conditional_edge(
        "increment",
        "decision",
        {"continue": "increment", "stop": "finish"},
    )
    g.add_edge("finish", LLMGraph.END)
    return g


# ---------------------------------------------------------------------------
# 4. 実行
# ---------------------------------------------------------------------------
def main():
    g = build_graph()

    # run() はジェネレータを返す。最終 state は StopIteration.value で受け取る
    gen = g.run({"input": "hello"})
    final_state = None
    try:
        while True:
            event = next(gen)
            print("event:", event)
    except StopIteration as e:
        final_state = e.value

    print("\n--- final state ---")
    for k, v in final_state.items():
        print(f"  {k}: {v}")


if __name__ == "__main__":
    main()

実行例(このサンプルはノードが yield しないため、中間イベントはなく最終 state のみが得られます):

--- final state ---
  input: hello
  __errors__: []
  count: 3
  history: ['started with input=hello', 'tick 1', 'tick 2', 'tick 3', 'done']
  decision: stop
  final_message: completed after 3 ticks

サンプル 2: LLM を使うエージェント

リポジトリの example_with_llm.py と同じものです。 「質問に答える → 答えをチェック → 短ければやり直す」という 2 段のグラフを構築します。LLM のストリーミングチャンクを yield でそのまま呼び出し側へ流す、エージェント実装の典型構成です。

構成:

  • LLM は外側で生成してエージェントへ注入する(依存注入)
  • グラフ構築を build_graph() メソッドに分離(再利用・テストしやすい)
  • 各ノードはエージェントの private メソッド (_answer, _check など)
  • 公開エントリポイント run() は initial state の組み立て、Mermaid 図の通知、下位グラフからのイベント中継 (yield from) を担当
"""
LLM を使った最小サンプル(エージェント)。

質問に答えるノードと、答えをチェックするノードからなる 2 段グラフ。
チェックで「短すぎる」と判定されたらやり直す。最大 3 回でループを抜ける。

実行:
    python example_with_llm.py
"""
from typing import TypedDict

from augllm import MlxLLM

from llm_graph_kit import LLMGraph, NodeState


# ---------------------------------------------------------------------------
# ステートのスキーマ
# ---------------------------------------------------------------------------
class QAState(TypedDict, total=False):
    question: str
    answer: str
    attempts: int
    decision: str


# ---------------------------------------------------------------------------
# エージェントクラス
# ---------------------------------------------------------------------------
class QAAgent:
    """質問応答エージェント。LLM を保持し、グラフの定義と実行を提供する。"""

    # --------------------------------------------------
    # 初期化
    # --------------------------------------------------
    def __init__(self, llm) -> None:
        self.llm = llm

    # --------------------------------------------------
    # 公開エントリポイント
    # --------------------------------------------------
    def run(self, question: str):
        """質問を受け取り、グラフ実行中のイベントを呼び出し側へ yield する。"""
        graph = self.build_graph()

        # まずグラフ構造を log イベントとして流しておく(呼び出し側で可視化できる)
        yield {
            "type": "log",
            "node": "system",
            "content": f"Workflow Definition:\n{graph.get_graph_mermaid()}",
        }

        # 下位グラフのイベントをそのまま中継
        initial_state = {"question": question, "attempts": 0}
        yield from graph.run(initial_state)

    # --------------------------------------------------
    # グラフ定義
    # --------------------------------------------------
    def build_graph(self) -> LLMGraph:
        """このエージェントのワークフローを構築して返す。"""
        g = LLMGraph(state_schema=QAState)

        # ノード登録
        g.add_node(name="answer", func=self._answer)
        g.add_node(name="check", func=self._check)

        # エッジ定義
        g.add_edge(LLMGraph.START, "answer")
        g.add_edge("answer", "check")

        # 条件付きエッジ: state["decision"] によって遷移先を切り替える
        g.add_conditional_edge(
            "check",
            "decision",
            {
                "retry": "answer",
                "ok": LLMGraph.END,
            },
        )
        return g

    # --------------------------------------------------
    # ノード: LLM で回答を生成(ストリーミング)
    # --------------------------------------------------
    def _answer(self, state: NodeState):
        node_name = "answer"
        yield {"type": "log", "node": node_name, "content": "answering..."}

        system_prompt = "あなたは簡潔に答えるアシスタントです。"
        user_prompt = f"質問: {state['question']}"

        # LLM のチャンクをそのまま呼び出し側へ流す
        text = ""
        for chunk in self.llm.respond(
            system_prompt=system_prompt, user_text=user_prompt, stream=True
        ):
            text += chunk
            yield {
                "type": "answer_text",
                "node": node_name,
                "taskId": f"{node_name}-answer-text",
                "content": chunk,
            }

        # state を更新(回答本文と試行回数のインクリメント)
        return {"answer": text, "attempts": state.get("attempts", 0) + 1}

    # --------------------------------------------------
    # ノード: 回答の品質チェックと分岐シグナルの決定
    # --------------------------------------------------
    def _check(self, state: NodeState):
        node_name = "check"

        too_short = len(state["answer"]) < 30
        give_up = state["attempts"] >= 3
        decision = "retry" if (too_short and not give_up) else "ok"

        yield {
            "type": "log",
            "node": node_name,
            "content": (
                f"len={len(state['answer'])} attempts={state['attempts']}"
                f" -> {decision}"
            ),
        }
        return {"decision": decision}


# ---------------------------------------------------------------------------
# メインプログラム
# ---------------------------------------------------------------------------
if __name__ == "__main__":
    # 1. エージェントを構築
    LLM_PATH = "mlx-community/Qwen3.6-27B-4bit"
    llm = MlxLLM(model_path=LLM_PATH)
    agent = QAAgent(llm=llm)

    # 2. 入力
    question = "地球の半径は何 km?"
    print(f"Request: {question}\n")

    # 3. 実行(ストリーミングを受け取り、イベント種別ごとに表示先を変える)
    for event in agent.run(question=question):

        # LLM 出力のチャンク
        if event["type"] == "answer_text":
            print(event["content"], end="", flush=True)

        # ログ
        elif event["type"] == "log":
            print(f"\n[LOG] {event['node']}: {event['content']}")

        # ノード内例外(ライブラリが自動で yield する)
        elif event["type"] == "error":
            print(f"\n[ERROR] {event['agent']}: {event['content']}")

    print("\n\nProcess Completed.")

このサンプルで示しているパターン:

  • 依存注入: LLM は外側で生成してエージェントへ渡す。テスト時のモック化や LLM 実装の差し替えがしやすい
  • エージェントの境界: グラフとノード関数をひとつのクラスに集約し、外側からは agent.run(...) だけで呼べる
  • build_graph() の分離: グラフ構築を専用メソッドにすることで、テスト・可視化・サブグラフ化が容易
  • run() の責務: initial state の生成と yield from graph.run(...) による中継だけに留め、各ノードの処理は private メソッドに任せる
  • イベントの規約: type で種別を分け(log / answer_text / error 等)、node キーに発火元を入れる
  • __errors__ は触らない: ノードで例外が起きるとライブラリが自動で {"type": "error", "agent": ..., "content": ...} を流す
  • リトライの上限はドメイン側: attempts >= 3decision="ok" にして抜ける。max_steps はライブラリ側のセーフティネット

ライセンス / リポジトリ

https://github.com/ToPo-ToPo-ToPo/llm_graph

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

llm_graph_kit-1.1.1.tar.gz (11.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

llm_graph_kit-1.1.1-py3-none-any.whl (12.9 kB view details)

Uploaded Python 3

File details

Details for the file llm_graph_kit-1.1.1.tar.gz.

File metadata

  • Download URL: llm_graph_kit-1.1.1.tar.gz
  • Upload date:
  • Size: 11.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.10

File hashes

Hashes for llm_graph_kit-1.1.1.tar.gz
Algorithm Hash digest
SHA256 80fe379ad3b3d17467b1741caf8ef13ea9779b714a8ed78b51aa17af3d7ad5a7
MD5 ff6974cb818ff0ee8131bdf9d5030c9b
BLAKE2b-256 6d16615d6ff68515b4f04786a8a0e1dcb82c3c8eb3622d45e8c28451b086f921

See more details on using hashes here.

File details

Details for the file llm_graph_kit-1.1.1-py3-none-any.whl.

File metadata

  • Download URL: llm_graph_kit-1.1.1-py3-none-any.whl
  • Upload date:
  • Size: 12.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.10

File hashes

Hashes for llm_graph_kit-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 aeb715e3c4f2936329c5c16d8ff3d5011ebdc0136df4114aa42f40faf3bd5167
MD5 725c945890c49303c014f670bf72f723
BLAKE2b-256 7df054b3bb4bd54fca00e0b0aaff498190d572393163b390f23a75d5a070c424

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page