Resumable execution for Python. One decorator. Zero retry loops.
Project description
safe-state
Resumable execution for Python. One decorator. Zero retry loops.
You wrote a Python script that loops through 10,000 things — sending welcome emails, downloading files, calling an API for each user in your database, resizing images, scraping URLs. Somewhere around item 6,432 the network blips, a rate-limit kicks in, or someone unplugs your laptop. Everything dies. You have no idea what was done and what wasn't.
The usual fix is a thicket of try/except blocks, manual retry loops, a "last
processed ID" column in some side database, and a --resume-from CLI flag.
safe-state deletes all of that:
from safe_state import safe_state
@safe_state
def send_welcome_emails(users, mailer):
for user in users:
mailer.send(user.email, "Welcome!", render_template(user))
send_welcome_emails(load_users(), open_mailer())
# Crashes at user 6,432? Just run the script again. It skips the first 6,431
# and picks up at 6,432. No code changes needed.
What makes this hard (and why most checkpointing tools don't actually work)
Python's built-in pickle can serialize dictionaries, lists, integers, and most
plain objects. It cannot serialize:
- Open network sockets
- Live database connections (
sqlite3,psycopg2,pymongo) - Open file handles
requests.Sessionobjects with active TCP keep-alives- Any object holding a C-level resource
So a naive "just pickle everything" checkpointer crashes the moment your script
holds anything useful. safe-state solves this with a reconnect registry:
when it finds a live object, it serializes a small metadata record describing
how to recreate the object, then rebuilds a fresh one on resume.
Built-in handlers ship for sqlite3.Connection, socket.socket,
requests.Session, and file handles. Custom types are a five-line
register_reconnector() call away.
Install
pip install safe-state
Requires Python 3.9+ and dill (the only runtime dependency; pickle isn't
powerful enough on its own).
How it works
@safe_state does three things to the function it wraps:
- Intercepts the first iterable argument. The function still sees a normal
iterable, but
safe-stateis silently tracking which items have completed. - Persists progress after every item (or every N items — configurable) to
a
.safestatefile on disk via an atomic write. - Captures locals on failure. When an exception escapes the function,
safe-statewalks the traceback, grabs the local variables from the failing frame, freezes them withdillplus the reconnect registry, and writes them to the checkpoint. The exception then re-raises as normal —safe-statenever silently swallows errors.
On the next invocation with the same job name, the checkpoint is loaded, already-completed indices are skipped, and the iteration resumes from where it stopped.
On successful completion, the checkpoint file is deleted.
Full example: downloading 500 images
import requests
from safe_state import safe_state
@safe_state(name="image-scrape", verbose=True)
def download_all(urls, session):
for url in urls:
filename = url.split("/")[-1]
response = session.get(url, timeout=10)
response.raise_for_status()
with open(f"downloads/{filename}", "wb") as f:
f.write(response.content)
if __name__ == "__main__":
urls = open("urls.txt").read().splitlines()
download_all(urls, requests.Session())
Run 1 — connection times out on file 234:
[safe_state] starting fresh job 'image-scrape'
[safe_state] 'image-scrape' failed at item 233:
ConnectionError: HTTPSConnectionPool... Read timed out.
Progress 233/500 saved to .safe_state/image-scrape.safestate
Traceback (most recent call last): ...
Run 2 — same command, no flags, no edits:
[safe_state] resuming 'image-scrape': 233/500 done (run #2)
[safe_state] skip index 0 (done)
...
[safe_state] skip index 232 (done)
# resumes at item 233, completes through 499
[✓] Job complete. Checkpoint cleared.
More use cases
Anything that loops through a batch of work benefits from this:
# Bulk database backfill
@safe_state(name="backfill-2026")
def backfill(user_ids, conn):
for uid in user_ids:
new_value = expensive_computation(uid)
conn.execute("UPDATE users SET score = ? WHERE id = ?", (new_value, uid))
conn.commit()
# Processing a giant CSV
@safe_state(name="csv-cleanup")
def clean_rows(rows, output_writer):
for row in rows:
cleaned = normalize(row)
output_writer.writerow(cleaned)
# Calling an API for every record
@safe_state(name="enrich-leads", save_every=10)
def enrich(leads, api_client):
for lead in leads:
data = api_client.lookup(lead.email)
lead.enriched_data = data
lead.save()
# Resizing thousands of images
@safe_state(name="thumbnails")
def make_thumbs(image_paths):
for path in image_paths:
img = Image.open(path)
img.thumbnail((256, 256))
img.save(path.replace(".jpg", "_thumb.jpg"))
In every case, if the script crashes partway, you just rerun it. No retry logic, no progress columns, no resume flags.
API
@safe_state
@safe_state(
name=None, # job identifier; defaults to fn.__qualname__
state_dir=".safe_state", # checkpoint directory
iterable_arg=0, # which arg is the iterable (int index or kwarg name)
save_every=1, # persist every N completed items
store_results=False, # also store each item's value (must be serializable)
keep_on_success=False, # keep checkpoint after successful completion
verbose=False, # print progress to stderr
auto_iterate=True, # set False for manual checkpoint() mode
)
The decorator works with or without parentheses:
@safe_state # equivalent to @safe_state()
def f(items): ...
@safe_state(name="custom")
def g(items): ...
Inspecting checkpoints
Every decorated function exposes three helpers:
@safe_state
def my_job(items): ...
my_job.peek_checkpoint() # -> Checkpoint object, or None
my_job.clear_checkpoint() # -> deletes the .safestate file
my_job.checkpoint_path # -> Path to the .safestate file
A Checkpoint object holds:
completed_indices: set[int]total_items: int | Nonelast_failure: dict | None— exception type, message, traceback, indexfrozen_state: bytes | None—dill-serialized locals from the failing framerun_count: intprogress() -> dict— human-readable summary
Reconnect registry
Built-in handlers cover sqlite3.Connection, socket.socket,
requests.Session, and io.IOBase (file handles). To add your own:
from safe_state import register_reconnector
class MyApiClient:
def __init__(self, host, token):
self.host = host
self.token = token
self.session = open_some_session(host, token)
register_reconnector(
MyApiClient,
extract=lambda c: {"host": c.host, "token": c.token},
reconnect=lambda meta: MyApiClient(meta["host"], meta["token"]),
)
That's it — any MyApiClient instance held in your function's locals will now
survive checkpoint/restore.
Manual checkpointing (advanced)
If your function doesn't fit the "loop over items" mould — e.g. it processes a
graph or a single very long task — set auto_iterate=False and call
checkpoint() manually:
from safe_state import safe_state, checkpoint
@safe_state(auto_iterate=False)
def big_job(graph):
visited = set()
for node in graph.walk():
process(node)
visited.add(node.id)
checkpoint(visited=visited) # freeze progress here
What safe-state is not
- Not a distributed task queue. For multi-machine job dispatch use Celery,
Dramatiq, or RQ.
safe-statesolves the much smaller problem of "this one process crashed; let me rerun the same script and resume." - Not a transaction manager. If your work involves multi-step database
state that needs rollback, use real transactions.
safe-statecheckpoints at iteration boundaries; an item is either complete or it isn't. - Not magic. It doesn't freeze CPython frames mid-instruction. The iteration boundary is the resume granularity. If a single item's work is itself a long pipeline, decompose it into smaller items.
Performance
The default save_every=1 writes a checkpoint after every iteration. For most
real workloads (network calls, DB writes) this is well under a millisecond of
overhead and totally invisible. If your inner loop is microsecond-scale, raise
save_every to batch progress flushes:
@safe_state(save_every=100)
def fast_loop(items):
for item in items:
cheap_in_memory_work(item)
License
MIT. See LICENSE.
Contributing
Issues and pull requests welcome. Run the test suite with:
pip install -e ".[dev]"
pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file safe_state-0.1.1.tar.gz.
File metadata
- Download URL: safe_state-0.1.1.tar.gz
- Upload date:
- Size: 28.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6889992c3aaeb6a401b9352f57b1541dba5c08f31e408e167c49a9927f889d76
|
|
| MD5 |
38022a013f7af3fbcaac128976839530
|
|
| BLAKE2b-256 |
7a421c4d72f8c4ca70753d4f32b4d6967b86ad788511bb7ca89084c2d5da3ae9
|
File details
Details for the file safe_state-0.1.1-py3-none-any.whl.
File metadata
- Download URL: safe_state-0.1.1-py3-none-any.whl
- Upload date:
- Size: 21.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
99bff2acf78e084fb4f5477a73bbdd5b9b2f4d6fd2866eaa4a806710e5943440
|
|
| MD5 |
9fb950860239f9a4aea40aa5dbaab03b
|
|
| BLAKE2b-256 |
4b8c55a2564e51b1460845f2a0ad55f741d69d8cae3f2854d4808be699739bd2
|