Skip to main content

Interrupt nodes for pydantic-graph

Project description

pedantic-graph-interrupt

Run interruptible Pydantic Graphs that can be interrupted and resumed.

Table of Contents

Use cases

  • LLM AI chatbots implemented using pydantic-ai that need to obtain user input outside of the graph.
  • Document processing workflows where an external approval or human involvement is required.
  • Graphs that can run for a very long time, so their execution must be broken into smaller continuous chunks. Most often each chunk is executed in a background worker, when the graph is ready to resume its run.
  • And more...

Installation

pip install pydantic-graph-interrupt

Concepts

  • InterruptNode is a special node type that interrupts the graph execution.
  • proceed() is an alternative to graph.run(). It can be used to both start the graph run from scratch or resume it after an interruption.

Usage

Define graph with interrupt nodes

"""graph.py"""
from dataclasses import dataclass, field
from pydantic_graph import BaseNode, End, GraphRunContext
from pydantic_graph_interrupt import InterruptibleGraph, InterruptNode, Required

@dataclass
class MyState:
    user_name: str | None = None
    messages: list[str] = field(default_factory=list)

@dataclass
class Greet(BaseNode[MyState]):
    async def run(self, ctx: GraphRunContext[MyState]) -> "WaitForName":
        ctx.state.messages.append("Hello, what is your name?")
        return WaitForName()

@dataclass  # ↓ This is an interrupt node
class WaitForName(InterruptNode[MyState]):
    # ↓ This is a required field that must be provided to resume the graph
    user_name: Annotated[str | None, Required] = None

    async def run(self, ctx: GraphRunContext[MyState]) -> "Goodbye":
        ctx.state.user_name = self.user_name
        return Goodbye(user_name=self.user_name)

@dataclass
class Goodbye(BaseNode[MyState]):
    user_name: str

    async def run(self, ctx: GraphRunContext[MyState]) -> End:
        ctx.state.messages.append(f"Goodbye, {self.user_name}!")
        return End(ctx.state)

my_graph = InterruptibleGraph(nodes=[Greet, WaitForName, Goodbye])

Initialize persistence

When you just start, you have to initialize the persistence. This defines the starting node of the graph and sets correct types inside the persistence.

"""steps.py"""
from pathlib import Path
from pydantic_graph.persistence.file import FileStatePersistence
from .graph import MyState, my_graph, Greet

async def start():
    persistence = FileStatePersistence(Path("offline_state.json"))
    state = MyState()
    await my_graph.initialize(Greet(), persistence=persistence, state=state)

Start the graph

"""main.py"""
from .steps import start
await start()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydantic_graph_interrupt-0.1.0.tar.gz (15.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pydantic_graph_interrupt-0.1.0-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file pydantic_graph_interrupt-0.1.0.tar.gz.

File metadata

File hashes

Hashes for pydantic_graph_interrupt-0.1.0.tar.gz
Algorithm Hash digest
SHA256 fd065c197b4e3f327da49b55809e30980734f71ac7f5ac17fb87d597a3e9ac48
MD5 0bd86dcf268c731cdc4d7c0f23056394
BLAKE2b-256 2e67e4c86337f0da3b02fba0c0a0f59343e95d58bde308cddd9fe21bb8e57c34

See more details on using hashes here.

File details

Details for the file pydantic_graph_interrupt-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pydantic_graph_interrupt-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4b46de6466244f83d8cbbc9c20ce6cfb584d7b80c7c2601a28b380f264f476b5
MD5 bf5f2882942397e645ff6ef947e3ac92
BLAKE2b-256 48ade34e61e86db8ab28d6106575fa663fac7887feba2159273d7916e26c526f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page