Skip to main content

Resumable execution for Python. One decorator. Zero retry loops.

Project description

safe-state

Resumable execution for Python. One decorator. Zero retry loops.

PyPI Python License

You wrote a Python script that loops through 10,000 things — sending welcome emails, downloading files, calling an API for each user in your database, resizing images, scraping URLs. Somewhere around item 6,432 the network blips, a rate-limit kicks in, or someone unplugs your laptop. Everything dies. You have no idea what was done and what wasn't.

The usual fix is a thicket of try/except blocks, manual retry loops, a "last processed ID" column in some side database, and a --resume-from CLI flag. safe-state deletes all of that:

from safe_state import safe_state

@safe_state
def send_welcome_emails(users, mailer):
    for user in users:
        mailer.send(user.email, "Welcome!", render_template(user))

send_welcome_emails(load_users(), open_mailer())
# Crashes at user 6,432? Just run the script again. It skips the first 6,431
# and picks up at 6,432. No code changes needed.

What makes this hard (and why most checkpointing tools don't actually work)

Python's built-in pickle can serialize dictionaries, lists, integers, and most plain objects. It cannot serialize:

  • Open network sockets
  • Live database connections (sqlite3, psycopg2, pymongo)
  • Open file handles
  • requests.Session objects with active TCP keep-alives
  • Any object holding a C-level resource

So a naive "just pickle everything" checkpointer crashes the moment your script holds anything useful. safe-state solves this with a reconnect registry: when it finds a live object, it serializes a small metadata record describing how to recreate the object, then rebuilds a fresh one on resume.

Built-in handlers ship for sqlite3.Connection, socket.socket, requests.Session, and file handles. Custom types are a five-line register_reconnector() call away.


Install

pip install safe-state

Requires Python 3.9+ and dill (the only runtime dependency; pickle isn't powerful enough on its own).

Optional extras:

pip install safe-state[redis]   # adds Redis-backed shared state

How it works

@safe_state does three things to the function it wraps:

  1. Intercepts the first iterable argument. The function still sees a normal iterable, but safe-state is silently tracking which items have completed.
  2. Persists progress after every item (or every N items — configurable) to a .safestate file on disk (or Redis, SQL, or any custom backend) via a durable atomic write.
  3. Captures locals on failure. When an exception escapes the function, safe-state walks the traceback, grabs the local variables from the failing frame, freezes them with dill plus the reconnect registry, and writes them to the checkpoint. The exception then re-raises as normal — safe-state never silently swallows errors.

On the next invocation with the same job name, the checkpoint is loaded, already-completed indices are skipped, and the iteration resumes from where it stopped.

On successful completion, the checkpoint file is deleted.


Full example: downloading 500 images

import requests
from safe_state import safe_state

@safe_state(name="image-scrape", verbose=True)
def download_all(urls, session):
    for url in urls:
        filename = url.split("/")[-1]
        response = session.get(url, timeout=10)
        response.raise_for_status()
        with open(f"downloads/{filename}", "wb") as f:
            f.write(response.content)

if __name__ == "__main__":
    urls = open("urls.txt").read().splitlines()
    download_all(urls, requests.Session())

Run 1 — connection times out on file 234:

[safe_state] starting fresh job 'image-scrape'
[safe_state] 'image-scrape' failed at item 233:
  ConnectionError: HTTPSConnectionPool... Read timed out.
  Progress 233/500 saved to .safe_state/image-scrape.safestate
Traceback (most recent call last): ...

Run 2 — same command, no flags, no edits:

[safe_state] resuming 'image-scrape': 233/500 done (run #2)
[safe_state] skip index 0 (done)
...
[safe_state] skip index 232 (done)
# resumes at item 233, completes through 499
[✓] Job complete. Checkpoint cleared.

Idempotent side effects (v0.2.0+)

A checkpoint records "iteration 14 done", but if a crash happened mid-iteration the side effect (file written, email sent, row inserted) might be only partially complete. On resume, you'd re-execute it — a duplicate.

Two helpers protect against this:

from safe_state import safe_state, skip_if_exists, idempotent

# Pattern 1: one-line check inside the loop, for filesystem artifacts.
@safe_state
def download_all(urls, dest_dir):
    for url in urls:
        target = dest_dir / url.split("/")[-1]
        if skip_if_exists(target):
            continue
        download(url, target)

# Pattern 2: declarative decorator on the side-effect function.
@idempotent(check=lambda url, dest: dest.exists() and dest)
def download(url, dest):
    response = requests.get(url)
    dest.write_bytes(response.content)

For side effects with no natural artifact (an API call, a webhook), use IdempotencyCache to track which keys were processed:

from safe_state import safe_state, IdempotencyCache

cache = IdempotencyCache()

@safe_state
def send_messages(messages, client):
    for msg in messages:
        if cache.seen(msg.id):
            continue
        client.send(msg)
        cache.mark(msg.id)

Transactional iterations (v0.2.0+)

When an iteration does multiple writes that must succeed or fail together, wrap them in transaction(). On exception, every connection rolls back, the iteration is not marked complete, and the next run retries it cleanly.

from safe_state import safe_state, transaction

@safe_state
def process(items, db):
    for item in items:
        with transaction(db):
            db.execute("INSERT INTO ledger ...", item)
            db.execute("UPDATE balances ...", item)
            external_api_call(item)  # if this raises, both DB writes roll back

Works with any PEP 249 DB-API connection (sqlite3, psycopg, psycopg2, mysqlclient) and with SQLAlchemy connections. Pass multiple connections to commit or roll back together:

with transaction(primary_db, replica_db, cache_conn):
    ...

Note: this is not a distributed 2PC. Each connection commits independently. If commit succeeds on one but fails on another, the first is already committed. For true cross-system atomicity, use a saga pattern.


Pluggable storage backends (v0.2.0+)

The default FileBackend writes one .safestate file per job to disk. For containers, distributed workers, or environments without persistent local storage, swap it for one of these:

from safe_state import safe_state, FileBackend, MemoryBackend, RedisBackend, SQLBackend

# Local files (default).
@safe_state(state_dir=".safe_state")
def f(items): ...

# In-memory — useful for tests or short-lived workers.
@safe_state(backend=MemoryBackend())
def f(items): ...

# Redis — shared state across containers and workers.
# Requires: pip install safe-state[redis]
@safe_state(backend=RedisBackend.from_url("redis://localhost:6379/0"))
def f(items): ...

# Any SQL database via a DB-API connection.
import psycopg
conn = psycopg.connect("postgresql://user:pass@host/db")
@safe_state(backend=SQLBackend(conn, dialect="postgres"))
def f(items): ...

import sqlite3
conn = sqlite3.connect("checkpoints.db")
@safe_state(backend=SQLBackend(conn))   # sqlite is the default dialect
def f(items): ...

Custom backends are a four-method subclass — load, save, delete, exists. See safe_state/backends.py for the reference implementations.


More use cases

Anything that loops through a batch of work benefits from this:

# Bulk database backfill
@safe_state(name="backfill-2026")
def backfill(user_ids, conn):
    for uid in user_ids:
        new_value = expensive_computation(uid)
        conn.execute("UPDATE users SET score = ? WHERE id = ?", (new_value, uid))
        conn.commit()

# Processing a giant CSV
@safe_state(name="csv-cleanup")
def clean_rows(rows, output_writer):
    for row in rows:
        cleaned = normalize(row)
        output_writer.writerow(cleaned)

# Calling an API for every record
@safe_state(name="enrich-leads", save_every=10)
def enrich(leads, api_client):
    for lead in leads:
        data = api_client.lookup(lead.email)
        lead.enriched_data = data
        lead.save()

# Resizing thousands of images
@safe_state(name="thumbnails")
def make_thumbs(image_paths):
    for path in image_paths:
        img = Image.open(path)
        img.thumbnail((256, 256))
        img.save(path.replace(".jpg", "_thumb.jpg"))

In every case, if the script crashes partway, you just rerun it. No retry logic, no progress columns, no resume flags.


API

@safe_state

@safe_state(
    name=None,             # job identifier; defaults to {module}.{qualname}.{path_hash}
    state_dir=".safe_state",  # checkpoint directory (when using FileBackend)
    backend=None,          # custom StateBackend (Redis, SQL, Memory, ...)
    iterable_arg=0,        # which arg is the iterable (int index or kwarg name)
    save_every=1,          # persist every N completed items
    store_results=False,   # also store each item's value (must be serializable)
    keep_on_success=False, # keep checkpoint after successful completion
    verbose=False,         # print progress to stderr
    auto_iterate=True,     # set False for manual checkpoint() mode
)

Inspecting checkpoints

Every decorated function exposes:

my_job.peek_checkpoint()    # -> Checkpoint object, or None
my_job.clear_checkpoint()   # -> deletes the checkpoint
my_job.checkpoint_path      # -> Path to the .safestate file (None for non-file backends)
my_job.job_name             # -> the resolved job name

A Checkpoint holds completed_indices, total_items, last_failure, frozen_state (dill-serialized locals), run_count, and a progress() method.

Reconnect registry

Built-in handlers cover sqlite3.Connection, socket.socket, requests.Session, and io.IOBase. To add your own:

from safe_state import register_reconnector

register_reconnector(
    MyApiClient,
    extract=lambda c: {"host": c.host, "token": c.token},
    reconnect=lambda meta: MyApiClient(meta["host"], meta["token"]),
)

Manual checkpointing (advanced)

For non-iteration shaped work, set auto_iterate=False and call checkpoint() manually:

from safe_state import safe_state, checkpoint

@safe_state(auto_iterate=False)
def big_job(graph):
    visited = set()
    for node in graph.walk():
        process(node)
        visited.add(node.id)
        checkpoint(visited=visited)

What safe-state is not

  • Not a distributed task queue. For multi-machine job dispatch use Celery, Dramatiq, or RQ. safe-state solves the much smaller problem of "this one process crashed; let me rerun the same script and resume."
  • Not a transaction manager. transaction() wraps a single DB connection's commit/rollback; it's not a 2PC coordinator. For cross-system atomicity, use a saga pattern.
  • Not magic. It doesn't freeze CPython frames mid-instruction. The iteration boundary is the resume granularity. If a single item's work is itself a long pipeline, decompose it into smaller items.

License

MIT. See LICENSE.


Contributing

Issues and pull requests welcome. Run the test suite with:

pip install -e ".[dev]"
pytest

Test count by feature area:

  • Core resume + serialization: 36 tests
  • Storage backends (File, Memory, SQL): 11 tests
  • Idempotency utilities: 10 tests
  • Transaction manager: 9 tests
  • Total: 66 tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

safe_state-0.2.0.tar.gz (39.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

safe_state-0.2.0-py3-none-any.whl (29.8 kB view details)

Uploaded Python 3

File details

Details for the file safe_state-0.2.0.tar.gz.

File metadata

  • Download URL: safe_state-0.2.0.tar.gz
  • Upload date:
  • Size: 39.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for safe_state-0.2.0.tar.gz
Algorithm Hash digest
SHA256 e3a09ecfad6e6a5797f12963402107e2d8e9524657678d77d0c5aa555520e535
MD5 3f187d288b41ea3dacf7410d52ff48cc
BLAKE2b-256 97c3f5d8ec47819fa9b4cef38823e9a47814b032ab45f5af48b7bc1a41808c12

See more details on using hashes here.

File details

Details for the file safe_state-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: safe_state-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 29.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for safe_state-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f7220ba85856840c83449c18b3749db40e4599ba1c08e761fa02805fe00caf28
MD5 d163456b9ce3599377b066b34fae4e52
BLAKE2b-256 2407b0d97d5178834586db655682242088d66f1fcc26c3cacdf755e0ed64dc1b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page