Skip to main content

Resumable, checkpointed item processing with graceful interrupts.

Project description

Python PyPI-Server Coverage.. Pipeline status Monthly Downloads Project generated with PyScaffold

stateful-data-processor

Resumable, checkpointed item processing with graceful interrupts — subclass and go.

A tiny utility for long-running, restart-safe loops: process items, persist state, resume exactly where you stopped or when an exception is raised, and handle SIGINT/SIGTERM cleanly.

  • Install: pip install stateful-data-processor
  • Why: Skip rework after crashes/interrupts; keep logic in a single subclass.
  • Good for: Batch jobs, ETL steps, scraping, “process a big list with restarts”.

Quick start (60 seconds)

import time
from stateful_data_processor.file_rw import FileRW
from stateful_data_processor.processor import StatefulDataProcessor

class MyDataProcessor(StatefulDataProcessor):

 def process_item(self, item, iteration_index: int, delay: float):
     ''' item and iteration_index are automatically supplied by the framework.
      iteration_index may or may not be used.
     '''
     self.data[item] = item ** 2  # Example processing: square the item
     time.sleep(delay) # Simulate long processing time

# Example usage
file_rw = FileRW('data.json')
processor = MyDataProcessor(file_rw)

items_to_process = [1, 2, 3, 4, 5]
processor.run(items=items_to_process, delay=1.5) # Ctrl+C anytime; rerun to resume.

stateful-data-processor is a utility designed to handle large amounts of data incrementally. It allows you to process data step-by-step, saving progress to avoid data loss in case of interruptions or errors. The processor can be subclassed to implement custom data processing logic.

Features

  • Incremental & resumable — process large datasets in chunks and pick up exactly where you left off.
  • State persisted to disk — saves progress to a file so restarts are fast and reliable.
  • Graceful shutdown — handles SIGINT/SIGTERM (e.g., Ctrl+C) and saves state before exiting.
  • Crash-safe — catches exceptions, saves current progress, and lets you restart without losing work.
  • Automatic logging — a logger is created for you if you don’t inject one.
  • Skip completed work — automatically avoids already processed items on restart.
  • Easy to extend — subclass to implement custom processing logic.
  • Reprocess cached items — optionally revisit items already stored to explore alternative processing strategies.

Problem

Processing massive datasets is slow, brittle, and easy to interrupt. You need a way to:

  • Iterate through items one-by-one and save progress to disk as you go.
  • Resume exactly where you left off after crashes, timeouts, restarts, or upgrades.
  • Gracefully interrupt with SIGINT/SIGTERM (e.g., Ctrl+C) and persist state before exiting.
  • Subclass cleanly to provide your own process_data and process_item logic.
  • Avoid rework by skipping already-processed items—or intentionally reprocess cached items to explore alternatives.

In short: incremental processing with safety, resumability, and extensibility built in.

Solution

StatefulDataProcessor provides a resilient, incremental pipeline for large datasets:

  • Incremental processing: Iterate through big inputs in manageable chunks (e.g., from a JSON source) without starting over.
  • Persistent state: Progress and results are stored in a dictionary on disk; the processor tracks the current position.
  • Graceful interruption: Handles SIGINT/SIGTERM (e.g., Ctrl+C) and saves state before exiting.
  • Subclass-first design: Implement your own logic by overriding process_item (required) and process_data (optional).
  • Per-item execution: run(**kwargs) forwards all arguments to process_item, iterating over items and processing one at a time.
  • Unique keys: Results are keyed by each item’s unique label, so items must be unique.
  • Customizable workflow: Override process_data to pre/post-process items, filter, batch, or enrich as needed.

Usage

Example usage in a large project:

alphaspread analysis of nasdaq symbols

filter ranging stocks

xtb to yfinance symbol conversion

Example: Passing extra arguments via a subclass

File: processors.py

from typing import Any, Optional
from stateful_data_processor.processor import StatefulDataProcessor

class GenericAnalyzer(StatefulDataProcessor):
    """
    Parent processor that expects an extra kwarg: `payload`.
    In a real project this could be HTML, JSON, text, bytes, etc.
    """

    def process_item(self, item: str, payload: Optional[Any], iteration_index: int):
        # Use the extra arg however you like
        self.logger.info(f"[{iteration_index}] Processing {item}; has_payload={payload is not None}")

        # Do minimal "work" for the README: store something derived from the payload
        result = {
            "item": item,
            "payload_preview": str(payload)[:40],  # keep it tiny for docs
            "payload_length": len(str(payload)) if payload is not None else 0,
        }

        # Persist per-item result; the base class handles saving/resuming
        self.data[item] = result

File: run_example.py

# run_example.py
from datetime import date
from typing import Any, List

from stateful_data_processor.file_rw import JsonFileRW
from processors import GenericAnalyzer

def build_payload(item: str) -> Any:
    """
    Stand-in for I/O or computation (e.g., HTTP GET, DB read, cache lookup).
    Keep it simple for the README.
    """
    return f"payload for {item}"

class UrlAnalyzer(GenericAnalyzer):
    """
    Child processor that *adds* the extra argument (`payload`)
    and forwards it to the parent via super().
    """
    def process_item(self, item: str, iteration_index: int):
        payload = build_payload(item)
        # Forward both the original item and the extra kwarg to the parent
        super().process_item(item=item, payload=payload, iteration_index=iteration_index)

if __name__ == "__main__":
    items: List[str] = ["AAPL", "MSFT", "GOOGL", "NVDA"]

    # Results are saved incrementally; reruns resume from where you stopped.
    out_file = JsonFileRW(f"./demo-analysis-{date.today()}.json")

    analyzer = UrlAnalyzer(json_file_writer=out_file)
    analyzer.run(items=items)

    # Access in-memory results if needed
    data = analyzer.data
    print(f"Processed {len(data)} items. Output file: {out_file.path}")

Example: Passing extra arguments via a subclass

Sometimes your per-item logic needs more than just the item itself (e.g., a pre-fetched blob, metadata, cached JSON). You can add any keyword args you want to your processor’s process_item signature, and then supply them from a subclass.

processors.py

# processors.py
from typing import Any, Optional
from stateful_data_processor.processor import StatefulDataProcessor

class GenericAnalyzer(StatefulDataProcessor):
    """
    Parent processor that expects an extra kwarg: `payload`.
    In a real project this could be HTML, JSON, text, bytes, etc.
    """

    def process_item(self, item: str, payload: Optional[Any], iteration_index: int):
        # Use the extra arg however you like
        self.logger.info(f"[{iteration_index}] Processing {item}; has_payload={payload is not None}")

        # Do minimal "work" for demonstrational purpose: store something derived from the payload
        result = {
            "item": item,
            "payload_preview": str(payload)[:40],  # keep it tiny for docs
            "payload_length": len(str(payload)) if payload is not None else 0,
        }

        # Persist per-item result; the base class handles saving/resuming
        self.data[item] = result

run_example.py

# run_example.py
from datetime import date
from typing import Any, List

from stateful_data_processor.file_rw import JsonFileRW
from processors import GenericAnalyzer

def build_payload(item: str) -> Any:
    """
    Stand-in for I/O or computation (e.g., HTTP GET, DB read, cache lookup).
    Keep it simple for the README.
    """
    return f"payload for {item}"

class UrlAnalyzer(GenericAnalyzer):
    """
    Child processor that *adds* the extra argument (`payload`)
    and forwards it to the parent via super().
    """
    def process_item(self, item: str, iteration_index: int):
        payload = build_payload(item)
        # Forward both the original item and the extra kwarg to the parent
        super().process_item(item=item, payload=payload, iteration_index=iteration_index)

if __name__ == "__main__":
    items: List[str] = ["AAPL", "MSFT", "GOOGL", "NVDA"]

    # Results are saved incrementally; reruns resume from where you stopped.
    out_file = JsonFileRW(f"./demo-analysis-{date.today()}.json")

    analyzer = UrlAnalyzer(json_file_writer=out_file)
    analyzer.run(items=items)

    # Access in-memory results if needed
    data = analyzer.data
    print(f"Processed {len(data)} items. Output file: {out_file.path}")

What this demonstrates

  • The parent class GenericAnalyzer defines process_item(self, item, payload, iteration_index) — here, payload is an extra kwarg (analogous to the soup argument in your original GurufocusAnalyzer).

  • The child class UrlAnalyzer overrides process_item(self, item, iteration_index), computes the extra data (payload = build_payload(item)), and then forwards it with:

    super().process_item(item=item, payload=payload, iteration_index=iteration_index)
    
  • You can add any keyword arguments you need to the parent’s process_item (e.g., html, row, features, raw_bytes, context, etc.), and supply them from subclasses that know how to build/fetch them.

  • All the usual benefits still apply: incremental processing, state persisted to disk, resumability after crashes or Ctrl+C, and a simple subclassing model.

Releasing

git tag x.y
tox
tox -e docs
tox -e build
tox -e publish -- --repository pypi --verbose

Note

This project has been set up using PyScaffold 4.5. For details and usage information on PyScaffold see https://pyscaffold.org/.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stateful_data_processor-3.4.1.tar.gz (27.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stateful_data_processor-3.4.1-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file stateful_data_processor-3.4.1.tar.gz.

File metadata

  • Download URL: stateful_data_processor-3.4.1.tar.gz
  • Upload date:
  • Size: 27.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for stateful_data_processor-3.4.1.tar.gz
Algorithm Hash digest
SHA256 2efbc8f024a269f664c774f3f37b5904c57f2abd5bdcc26576fd27d7c10f8cc9
MD5 44c3160a5a0f12e8c347b5251dcb6933
BLAKE2b-256 5643e989e0396a74c175cb26221d406ccdbb3b4f6337c6c87cb527186862b18f

See more details on using hashes here.

File details

Details for the file stateful_data_processor-3.4.1-py3-none-any.whl.

File metadata

File hashes

Hashes for stateful_data_processor-3.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 4676a52077752fd7e360090ca4eb6924b4317ff5d06fca38cd75c8492c3a2959
MD5 b1ba7be7e060995012a94b9a0d97bab1
BLAKE2b-256 200f31c846a1a5fff21a8c28f376ca3a03e5931eb2db8545f971f1987fa3352b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page