TTL-based thread retention for LangGraph checkpointers
Project description
langgraph-ephemeral-checkpointer
TTL-based thread retention for LangGraph checkpointers. Automatically expire and delete old conversation threads based on idle time, absolute age, or checkpoint count — without touching your application's hot path.
Table of Contents
- Why
- Installation
- Quick Start
- TTLPolicy
- TTLSweeper
- Per-thread policy overrides
- Callbacks
- Safe delete
- Multi-instance coordination
- Backends
- CLI
- API reference
Why
LangGraph checkpointers persist every conversation state to storage. Without a retention policy, threads accumulate indefinitely. This library runs as a sidecar — it never sits in the hot path of your graph — and periodically sweeps expired threads according to rules you define.
Installation
pip install langgraph-ephemeral-checkpointer
Backend extras (install only what you use):
pip install "langgraph-ephemeral-checkpointer[sqlite]" # SqliteSaver
pip install "langgraph-ephemeral-checkpointer[postgres]" # PostgresSaver
Python 3.11+ is required.
Quick Start
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper
checkpointer = SqliteSaver.from_conn_string("threads.db")
policy = TTLPolicy(idle_ttl_seconds=3600) # delete threads idle for > 1 hour
sweeper = TTLSweeper(checkpointer, policy)
result = sweeper.sweep()
print(f"Deleted {len(result.deleted_thread_ids)} thread(s)")
TTLPolicy
TTLPolicy defines when a thread is considered expired. At least one rule is required.
from langgraph_ephemeral_checkpointer import TTLPolicy
# Idle TTL — delete threads with no activity for 2 hours
policy = TTLPolicy(idle_ttl_seconds=7200)
# Hard age TTL — delete threads older than 7 days regardless of activity
policy = TTLPolicy(hard_age_ttl_seconds=604800)
# Checkpoint count limit — keep only the 20 most recent checkpoints per thread
policy = TTLPolicy(max_checkpoints_per_thread=20)
# Combine rules — a thread is expired if ANY rule matches
policy = TTLPolicy(
idle_ttl_seconds=3600,
hard_age_ttl_seconds=86400,
max_checkpoints_per_thread=50,
)
| Parameter | Type | Description |
|---|---|---|
idle_ttl_seconds |
int | None |
Expire threads with no checkpoint activity for this many seconds |
hard_age_ttl_seconds |
int | None |
Expire threads whose first checkpoint is older than this many seconds |
max_checkpoints_per_thread |
int | None |
Trim oldest checkpoints so each thread keeps at most this many |
TTLPolicy is a frozen dataclass — instances are immutable.
TTLSweeper
TTLSweeper is the main entry point. It takes a checkpointer and a policy, and handles the rest.
TTLSweeper(
checkpointer, # any LangGraph BaseCheckpointSaver
policy, # TTLPolicy
*,
policy_resolver=None, # per-thread overrides (see below)
enable_coordination=False, # PostgreSQL advisory locks
safe_delete=True, # re-verify timestamps before deleting
on_before_delete=None, # callback before each deletion
on_sweep_complete=None, # callback after each sweep
)
Running a sweep
# synchronous
result = sweeper.sweep()
# asynchronous
result = await sweeper.asweep()
Both return a SweepResult.
Background loop
For always-on applications, run a background sweep loop that fires on an interval:
import asyncio
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper
async def main():
async with AsyncSqliteSaver.from_conn_string("threads.db") as checkpointer:
policy = TTLPolicy(idle_ttl_seconds=3600)
sweeper = TTLSweeper(checkpointer, policy)
# sweep every 5 minutes
asyncio.create_task(sweeper.start(interval_seconds=300))
# ... your application runs here ...
await sweeper.stop()
asyncio.run(main())
Dry run
Preview what would be deleted without actually deleting anything:
result = sweeper.sweep(dry_run=True)
print(f"Would delete: {result.deleted_thread_ids}")
on_before_delete is skipped during dry runs. on_sweep_complete still fires.
SweepResult
Every sweep returns a SweepResult:
result = sweeper.sweep()
result.deleted_thread_ids # list[str] — IDs of deleted threads
result.pruned_checkpoint_count # int — checkpoints removed by max_checkpoints pruning
result.active_thread_count # int — threads still alive after this sweep
result.sweep_duration_seconds # float — wall-clock time for the sweep
Per-thread policy overrides
Supply a policy_resolver to apply different rules to individual threads. The resolver receives a thread_id and returns either a custom TTLPolicy or a PolicyOverride.
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper
from langgraph_ephemeral_checkpointer.types import PolicyOverride
default_policy = TTLPolicy(idle_ttl_seconds=3600)
vip_policy = TTLPolicy(idle_ttl_seconds=604800) # VIP threads last 7 days
def resolver(thread_id: str):
if thread_id.startswith("vip:"):
return vip_policy
if thread_id.startswith("system:"):
return PolicyOverride.EXEMPT # never expire
return PolicyOverride.USE_DEFAULT
sweeper = TTLSweeper(checkpointer, default_policy, policy_resolver=resolver)
| Return value | Behaviour |
|---|---|
TTLPolicy |
Use this policy for the thread instead of the global one |
PolicyOverride.USE_DEFAULT |
Apply the sweeper's global policy |
PolicyOverride.EXEMPT |
Never expire this thread |
If the resolver raises an exception, it is logged and the global policy is used as a fallback. The resolver is called exactly once per thread per sweep.
Callbacks
on_before_delete
Called before each thread is deleted. Return False to skip the deletion.
def on_before_delete(thread_id: str, policy: TTLPolicy, reason: str) -> bool:
print(f"Deleting {thread_id!r} (reason: {reason})")
# return False to abort this specific deletion
return True
sweeper = TTLSweeper(checkpointer, policy, on_before_delete=on_before_delete)
The reason parameter is one of "idle_ttl" or "hard_age_ttl".
on_sweep_complete
Called once after every sweep cycle with the final SweepResult.
import logging
logger = logging.getLogger(__name__)
def on_sweep_complete(result: SweepResult) -> None:
logger.info(
"sweep complete",
extra={
"deleted": len(result.deleted_thread_ids),
"active": result.active_thread_count,
"duration_s": result.sweep_duration_seconds,
},
)
sweeper = TTLSweeper(checkpointer, policy, on_sweep_complete=on_sweep_complete)
Exceptions raised inside either callback are caught, logged, and do not interrupt the sweep.
Safe delete
By default (safe_delete=True), the sweeper re-reads each thread's latest checkpoint immediately before deleting it and compares timestamps. If the thread received a new checkpoint between the scan and the delete — meaning it became active again — the deletion is skipped.
# disable if you don't need the extra safety check (slightly faster)
sweeper = TTLSweeper(checkpointer, policy, safe_delete=False)
Multi-instance coordination
If you run multiple application instances sharing a single PostgreSQL checkpointer, you can enable advisory locks so only one instance sweeps at a time:
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper
async with AsyncPostgresSaver.from_conn_string(dsn) as checkpointer:
sweeper = TTLSweeper(
checkpointer,
TTLPolicy(idle_ttl_seconds=3600),
enable_coordination=True,
)
result = await sweeper.asweep()
When enable_coordination=True and the backend is PostgreSQL, the sweeper acquires a session-level advisory lock before scanning. If another instance holds the lock, the sweep is skipped and SweepResult is returned with empty deleted_thread_ids. The lock is automatically released when the database connection closes, so crashed instances never block others permanently.
This option is a no-op for non-PostgreSQL backends (a warning is logged).
Backends
The sweeper automatically selects the most efficient strategy for your checkpointer.
| Checkpointer | Strategy | Notes |
|---|---|---|
InMemorySaver |
MemoryStrategy |
Reads storage dict directly; extracts timestamps from UUIDv6 checkpoint IDs |
SqliteSaver |
SqliteStrategy |
Single GROUP BY query for all threads |
AsyncSqliteSaver |
AsyncSqliteStrategy |
Async variant of the above |
PostgresSaver |
PostgresStrategy |
GROUP BY query; advisory lock support |
AsyncPostgresSaver |
AsyncPostgresStrategy |
Async variant; advisory lock support |
| Any other | GenericStrategy |
Iterates via .list() / .alist() — works with any checkpointer |
max_checkpoints_per_thread pruning is supported on all backends except GenericStrategy.
CLI
The package ships a CLI for one-off inspection and sweeps without writing any code.
python -m langgraph_ephemeral_checkpointer <command> [options]
status — inspect threads
Show all threads with their age, idle time, checkpoint count, and whether they are expired under the given policy.
# in-memory backend (empty on fresh start, useful for testing)
python -m langgraph_ephemeral_checkpointer status \
--backend memory \
--idle-ttl 3600
# SQLite
python -m langgraph_ephemeral_checkpointer status \
--backend sqlite \
--dsn threads.db \
--idle-ttl 3600 \
--hard-age-ttl 86400
# PostgreSQL
python -m langgraph_ephemeral_checkpointer status \
--backend postgres \
--dsn "postgresql://user:pass@localhost/mydb" \
--idle-ttl 7200
sweep — delete expired threads
# dry run first — see what would be deleted
python -m langgraph_ephemeral_checkpointer sweep \
--backend sqlite \
--dsn threads.db \
--idle-ttl 3600 \
--dry-run
# live run
python -m langgraph_ephemeral_checkpointer sweep \
--backend sqlite \
--dsn threads.db \
--idle-ttl 3600
Options
| Option | Description |
|---|---|
--backend |
memory, sqlite, or postgres (required) |
--dsn |
File path or connection string (required for sqlite/postgres) |
--idle-ttl |
Idle TTL in seconds |
--hard-age-ttl |
Hard age TTL in seconds |
--max-checkpoints |
Max checkpoints per thread |
--dry-run |
Preview deletions without executing (sweep only) |
At least one of --idle-ttl, --hard-age-ttl, or --max-checkpoints must be provided.
API reference
TTLPolicy
@dataclass(frozen=True)
class TTLPolicy:
idle_ttl_seconds: int | None = None
hard_age_ttl_seconds: int | None = None
max_checkpoints_per_thread: int | None = None
TTLSweeper
class TTLSweeper:
def __init__(
self,
checkpointer: BaseCheckpointSaver,
policy: TTLPolicy,
*,
policy_resolver: PolicyResolver | None = None,
enable_coordination: bool = False,
safe_delete: bool = True,
on_before_delete: OnBeforeDelete | None = None,
on_sweep_complete: OnSweepComplete | None = None,
) -> None: ...
def sweep(self, *, dry_run: bool = False) -> SweepResult: ...
async def asweep(self, *, dry_run: bool = False) -> SweepResult: ...
async def start(self, interval_seconds: int = 300) -> None: ...
async def stop(self) -> None: ...
SweepResult
@dataclass
class SweepResult:
deleted_thread_ids: list[str]
pruned_checkpoint_count: int
active_thread_count: int
sweep_duration_seconds: float
PolicyOverride
class PolicyOverride(enum.Enum):
USE_DEFAULT = "use_default"
EXEMPT = "exempt"
Callable types
PolicyResolver = Callable[[str], TTLPolicy | PolicyOverride]
OnBeforeDelete = Callable[[str, TTLPolicy, str], bool]
OnSweepComplete = Callable[[SweepResult], None]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langgraph_ephemeral_checkpointer-0.1.0rc2.tar.gz.
File metadata
- Download URL: langgraph_ephemeral_checkpointer-0.1.0rc2.tar.gz
- Upload date:
- Size: 23.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d4aff6d1ab994b108ff696ac596ce0cf0b7282fe8690ee9407d22252ca523fa2
|
|
| MD5 |
6b23e88772e9490102ca978c7a679bbe
|
|
| BLAKE2b-256 |
42d52606312f5c74be7d0e3998a4bfe63bb35c2b0e9b3f9a30a7558ec3ba7fab
|
Provenance
The following attestation bundles were made for langgraph_ephemeral_checkpointer-0.1.0rc2.tar.gz:
Publisher:
release.yml on 35C4n0r/langgraph_ephemeral_checkpointer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
langgraph_ephemeral_checkpointer-0.1.0rc2.tar.gz -
Subject digest:
d4aff6d1ab994b108ff696ac596ce0cf0b7282fe8690ee9407d22252ca523fa2 - Sigstore transparency entry: 1428991256
- Sigstore integration time:
-
Permalink:
35C4n0r/langgraph_ephemeral_checkpointer@8bef7ec193a0d8b423d2be3c1c119f547eb03b10 -
Branch / Tag:
refs/tags/v0.1.0rc2 - Owner: https://github.com/35C4n0r
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@8bef7ec193a0d8b423d2be3c1c119f547eb03b10 -
Trigger Event:
push
-
Statement type:
File details
Details for the file langgraph_ephemeral_checkpointer-0.1.0rc2-py3-none-any.whl.
File metadata
- Download URL: langgraph_ephemeral_checkpointer-0.1.0rc2-py3-none-any.whl
- Upload date:
- Size: 20.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0fc97b89a5054b44586ae13986bde410f7390b10bfb63cde1486e1bd2231e168
|
|
| MD5 |
24d5cb63ecf889216bf87d56da914e8c
|
|
| BLAKE2b-256 |
d992411917532d583d304ac9dfa19986f867a96072146198a31bdde38b7df3df
|
Provenance
The following attestation bundles were made for langgraph_ephemeral_checkpointer-0.1.0rc2-py3-none-any.whl:
Publisher:
release.yml on 35C4n0r/langgraph_ephemeral_checkpointer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
langgraph_ephemeral_checkpointer-0.1.0rc2-py3-none-any.whl -
Subject digest:
0fc97b89a5054b44586ae13986bde410f7390b10bfb63cde1486e1bd2231e168 - Sigstore transparency entry: 1428991279
- Sigstore integration time:
-
Permalink:
35C4n0r/langgraph_ephemeral_checkpointer@8bef7ec193a0d8b423d2be3c1c119f547eb03b10 -
Branch / Tag:
refs/tags/v0.1.0rc2 - Owner: https://github.com/35C4n0r
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@8bef7ec193a0d8b423d2be3c1c119f547eb03b10 -
Trigger Event:
push
-
Statement type: