Skip to main content

TTL-based thread retention for LangGraph checkpointers

Project description

langgraph-ephemeral-checkpointer

TTL-based thread retention for LangGraph checkpointers. Automatically expire and delete old conversation threads based on idle time, absolute age, or checkpoint count — without touching your application's hot path.


Table of Contents


Why

LangGraph checkpointers persist every conversation state to storage. Without a retention policy, threads accumulate indefinitely. This library runs as a sidecar — it never sits in the hot path of your graph — and periodically sweeps expired threads according to rules you define.


Installation

pip install langgraph-ephemeral-checkpointer

Backend extras (install only what you use):

pip install "langgraph-ephemeral-checkpointer[sqlite]"    # SqliteSaver
pip install "langgraph-ephemeral-checkpointer[postgres]"  # PostgresSaver

Python 3.11+ is required.


Quick Start

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper

checkpointer = SqliteSaver.from_conn_string("threads.db")
policy = TTLPolicy(idle_ttl_seconds=3600)  # delete threads idle for > 1 hour
sweeper = TTLSweeper(checkpointer, policy)

result = sweeper.sweep()
print(f"Deleted {len(result.deleted_thread_ids)} thread(s)")

TTLPolicy

TTLPolicy defines when a thread is considered expired. At least one rule is required.

from langgraph_ephemeral_checkpointer import TTLPolicy

# Idle TTL — delete threads with no activity for 2 hours
policy = TTLPolicy(idle_ttl_seconds=7200)

# Hard age TTL — delete threads older than 7 days regardless of activity
policy = TTLPolicy(hard_age_ttl_seconds=604800)

# Checkpoint count limit — keep only the 20 most recent checkpoints per thread
policy = TTLPolicy(max_checkpoints_per_thread=20)

# Combine rules — a thread is expired if ANY rule matches
policy = TTLPolicy(
    idle_ttl_seconds=3600,
    hard_age_ttl_seconds=86400,
    max_checkpoints_per_thread=50,
)
Parameter Type Description
idle_ttl_seconds int | None Expire threads with no checkpoint activity for this many seconds
hard_age_ttl_seconds int | None Expire threads whose first checkpoint is older than this many seconds
max_checkpoints_per_thread int | None Trim oldest checkpoints so each thread keeps at most this many

TTLPolicy is a frozen dataclass — instances are immutable.


TTLSweeper

TTLSweeper is the main entry point. It takes a checkpointer and a policy, and handles the rest.

TTLSweeper(
    checkpointer,           # any LangGraph BaseCheckpointSaver
    policy,                 # TTLPolicy
    *,
    policy_resolver=None,   # per-thread overrides (see below)
    enable_coordination=False,  # PostgreSQL advisory locks
    safe_delete=True,       # re-verify timestamps before deleting
    on_before_delete=None,  # callback before each deletion
    on_sweep_complete=None, # callback after each sweep
)

Running a sweep

# synchronous
result = sweeper.sweep()

# asynchronous
result = await sweeper.asweep()

Both return a SweepResult.

Background loop

For always-on applications, run a background sweep loop that fires on an interval:

import asyncio
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper

async def main():
    async with AsyncSqliteSaver.from_conn_string("threads.db") as checkpointer:
        policy = TTLPolicy(idle_ttl_seconds=3600)
        sweeper = TTLSweeper(checkpointer, policy)

        # sweep every 5 minutes
        asyncio.create_task(sweeper.start(interval_seconds=300))

        # ... your application runs here ...

        await sweeper.stop()

asyncio.run(main())

Dry run

Preview what would be deleted without actually deleting anything:

result = sweeper.sweep(dry_run=True)
print(f"Would delete: {result.deleted_thread_ids}")

on_before_delete is skipped during dry runs. on_sweep_complete still fires.

SweepResult

Every sweep returns a SweepResult:

result = sweeper.sweep()

result.deleted_thread_ids      # list[str] — IDs of deleted threads
result.pruned_checkpoint_count # int — checkpoints removed by max_checkpoints pruning
result.active_thread_count     # int — threads still alive after this sweep
result.sweep_duration_seconds  # float — wall-clock time for the sweep

Per-thread policy overrides

Supply a policy_resolver to apply different rules to individual threads. The resolver receives a thread_id and returns either a custom TTLPolicy or a PolicyOverride.

from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper
from langgraph_ephemeral_checkpointer.types import PolicyOverride

default_policy = TTLPolicy(idle_ttl_seconds=3600)

vip_policy = TTLPolicy(idle_ttl_seconds=604800)  # VIP threads last 7 days

def resolver(thread_id: str):
    if thread_id.startswith("vip:"):
        return vip_policy
    if thread_id.startswith("system:"):
        return PolicyOverride.EXEMPT  # never expire
    return PolicyOverride.USE_DEFAULT

sweeper = TTLSweeper(checkpointer, default_policy, policy_resolver=resolver)
Return value Behaviour
TTLPolicy Use this policy for the thread instead of the global one
PolicyOverride.USE_DEFAULT Apply the sweeper's global policy
PolicyOverride.EXEMPT Never expire this thread

If the resolver raises an exception, it is logged and the global policy is used as a fallback. The resolver is called exactly once per thread per sweep.


Callbacks

on_before_delete

Called before each thread is deleted. Return False to skip the deletion.

def on_before_delete(thread_id: str, policy: TTLPolicy, reason: str) -> bool:
    print(f"Deleting {thread_id!r} (reason: {reason})")
    # return False to abort this specific deletion
    return True

sweeper = TTLSweeper(checkpointer, policy, on_before_delete=on_before_delete)

The reason parameter is one of "idle_ttl" or "hard_age_ttl".

on_sweep_complete

Called once after every sweep cycle with the final SweepResult.

import logging

logger = logging.getLogger(__name__)

def on_sweep_complete(result: SweepResult) -> None:
    logger.info(
        "sweep complete",
        extra={
            "deleted": len(result.deleted_thread_ids),
            "active": result.active_thread_count,
            "duration_s": result.sweep_duration_seconds,
        },
    )

sweeper = TTLSweeper(checkpointer, policy, on_sweep_complete=on_sweep_complete)

Exceptions raised inside either callback are caught, logged, and do not interrupt the sweep.


Safe delete

By default (safe_delete=True), the sweeper re-reads each thread's latest checkpoint immediately before deleting it and compares timestamps. If the thread received a new checkpoint between the scan and the delete — meaning it became active again — the deletion is skipped.

# disable if you don't need the extra safety check (slightly faster)
sweeper = TTLSweeper(checkpointer, policy, safe_delete=False)

Multi-instance coordination

If you run multiple application instances sharing a single PostgreSQL checkpointer, you can enable advisory locks so only one instance sweeps at a time:

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper

async with AsyncPostgresSaver.from_conn_string(dsn) as checkpointer:
    sweeper = TTLSweeper(
        checkpointer,
        TTLPolicy(idle_ttl_seconds=3600),
        enable_coordination=True,
    )
    result = await sweeper.asweep()

When enable_coordination=True and the backend is PostgreSQL, the sweeper acquires a session-level advisory lock before scanning. If another instance holds the lock, the sweep is skipped and SweepResult is returned with empty deleted_thread_ids. The lock is automatically released when the database connection closes, so crashed instances never block others permanently.

This option is a no-op for non-PostgreSQL backends (a warning is logged).


Backends

The sweeper automatically selects the most efficient strategy for your checkpointer.

Checkpointer Strategy Notes
InMemorySaver MemoryStrategy Reads storage dict directly; extracts timestamps from UUIDv6 checkpoint IDs
SqliteSaver SqliteStrategy Single GROUP BY query for all threads
AsyncSqliteSaver AsyncSqliteStrategy Async variant of the above
PostgresSaver PostgresStrategy GROUP BY query; advisory lock support
AsyncPostgresSaver AsyncPostgresStrategy Async variant; advisory lock support
Any other GenericStrategy Iterates via .list() / .alist() — works with any checkpointer

max_checkpoints_per_thread pruning is supported on all backends except GenericStrategy.


CLI

The package ships a CLI for one-off inspection and sweeps without writing any code.

python -m langgraph_ephemeral_checkpointer <command> [options]

status — inspect threads

Show all threads with their age, idle time, checkpoint count, and whether they are expired under the given policy.

# in-memory backend (empty on fresh start, useful for testing)
python -m langgraph_ephemeral_checkpointer status \
  --backend memory \
  --idle-ttl 3600

# SQLite
python -m langgraph_ephemeral_checkpointer status \
  --backend sqlite \
  --dsn threads.db \
  --idle-ttl 3600 \
  --hard-age-ttl 86400

# PostgreSQL
python -m langgraph_ephemeral_checkpointer status \
  --backend postgres \
  --dsn "postgresql://user:pass@localhost/mydb" \
  --idle-ttl 7200

sweep — delete expired threads

# dry run first — see what would be deleted
python -m langgraph_ephemeral_checkpointer sweep \
  --backend sqlite \
  --dsn threads.db \
  --idle-ttl 3600 \
  --dry-run

# live run
python -m langgraph_ephemeral_checkpointer sweep \
  --backend sqlite \
  --dsn threads.db \
  --idle-ttl 3600

Options

Option Description
--backend memory, sqlite, or postgres (required)
--dsn File path or connection string (required for sqlite/postgres)
--idle-ttl Idle TTL in seconds
--hard-age-ttl Hard age TTL in seconds
--max-checkpoints Max checkpoints per thread
--dry-run Preview deletions without executing (sweep only)

At least one of --idle-ttl, --hard-age-ttl, or --max-checkpoints must be provided.


API reference

TTLPolicy

@dataclass(frozen=True)
class TTLPolicy:
    idle_ttl_seconds: int | None = None
    hard_age_ttl_seconds: int | None = None
    max_checkpoints_per_thread: int | None = None

TTLSweeper

class TTLSweeper:
    def __init__(
        self,
        checkpointer: BaseCheckpointSaver,
        policy: TTLPolicy,
        *,
        policy_resolver: PolicyResolver | None = None,
        enable_coordination: bool = False,
        safe_delete: bool = True,
        on_before_delete: OnBeforeDelete | None = None,
        on_sweep_complete: OnSweepComplete | None = None,
    ) -> None: ...

    def sweep(self, *, dry_run: bool = False) -> SweepResult: ...
    async def asweep(self, *, dry_run: bool = False) -> SweepResult: ...
    async def start(self, interval_seconds: int = 300) -> None: ...
    async def stop(self) -> None: ...

SweepResult

@dataclass
class SweepResult:
    deleted_thread_ids: list[str]
    pruned_checkpoint_count: int
    active_thread_count: int
    sweep_duration_seconds: float

PolicyOverride

class PolicyOverride(enum.Enum):
    USE_DEFAULT = "use_default"
    EXEMPT      = "exempt"

Callable types

PolicyResolver  = Callable[[str], TTLPolicy | PolicyOverride]
OnBeforeDelete  = Callable[[str, TTLPolicy, str], bool]
OnSweepComplete = Callable[[SweepResult], None]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langgraph_ephemeral_checkpointer-0.1.0rc2.tar.gz (23.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file langgraph_ephemeral_checkpointer-0.1.0rc2.tar.gz.

File metadata

File hashes

Hashes for langgraph_ephemeral_checkpointer-0.1.0rc2.tar.gz
Algorithm Hash digest
SHA256 d4aff6d1ab994b108ff696ac596ce0cf0b7282fe8690ee9407d22252ca523fa2
MD5 6b23e88772e9490102ca978c7a679bbe
BLAKE2b-256 42d52606312f5c74be7d0e3998a4bfe63bb35c2b0e9b3f9a30a7558ec3ba7fab

See more details on using hashes here.

Provenance

The following attestation bundles were made for langgraph_ephemeral_checkpointer-0.1.0rc2.tar.gz:

Publisher: release.yml on 35C4n0r/langgraph_ephemeral_checkpointer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file langgraph_ephemeral_checkpointer-0.1.0rc2-py3-none-any.whl.

File metadata

File hashes

Hashes for langgraph_ephemeral_checkpointer-0.1.0rc2-py3-none-any.whl
Algorithm Hash digest
SHA256 0fc97b89a5054b44586ae13986bde410f7390b10bfb63cde1486e1bd2231e168
MD5 24d5cb63ecf889216bf87d56da914e8c
BLAKE2b-256 d992411917532d583d304ac9dfa19986f867a96072146198a31bdde38b7df3df

See more details on using hashes here.

Provenance

The following attestation bundles were made for langgraph_ephemeral_checkpointer-0.1.0rc2-py3-none-any.whl:

Publisher: release.yml on 35C4n0r/langgraph_ephemeral_checkpointer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page