Resumable, checkpointed item processing with graceful interrupts.
Project description
stateful-data-processor
Resumable, checkpointed item processing with graceful interrupts — subclass and go.
A tiny utility for long-running, restart-safe loops: process items, persist state, resume exactly where you stopped or when an exception is raised, and handle SIGINT/SIGTERM cleanly.
- Install:
pip install stateful-data-processor - Why: Skip rework after crashes/interrupts; keep logic in a single subclass.
- Good for: Batch jobs, ETL steps, scraping, “process a big list with restarts”.
Quick start (60 seconds)
import time
from stateful_data_processor.file_rw import FileRW
from stateful_data_processor.processor import StatefulDataProcessor
class MyDataProcessor(StatefulDataProcessor):
def process_item(self, item, iteration_index: int, delay: float):
''' item and iteration_index are automatically supplied by the framework.
iteration_index may or may not be used.
'''
self.data[item] = item ** 2 # Example processing: square the item
time.sleep(delay) # Simulate long processing time
# Example usage
file_rw = FileRW('data.json')
processor = MyDataProcessor(file_rw)
items_to_process = [1, 2, 3, 4, 5]
processor.run(items=items_to_process, delay=1.5) # Ctrl+C anytime; rerun to resume.
stateful-data-processor is a utility designed to handle large amounts of data incrementally. It allows you to process data step-by-step, saving progress to avoid data loss in case of interruptions or errors. The processor can be subclassed to implement custom data processing logic.
Features
- Incremental & resumable — process large datasets in chunks and pick up exactly where you left off.
- State persisted to disk — saves progress to a file so restarts are fast and reliable.
- Graceful shutdown — handles
SIGINT/SIGTERM(e.g., Ctrl+C) and saves state before exiting. - Crash-safe — catches exceptions, saves current progress, and lets you restart without losing work.
- Automatic logging — a logger is created for you if you don’t inject one.
- Skip completed work — automatically avoids already processed items on restart.
- Easy to extend — subclass to implement custom processing logic.
- Reprocess cached items — optionally revisit items already stored to explore alternative processing strategies.
Problem
Processing massive datasets is slow, brittle, and easy to interrupt. You need a way to:
- Iterate through items one-by-one and save progress to disk as you go.
- Resume exactly where you left off after crashes, timeouts, restarts, or upgrades.
- Gracefully interrupt with
SIGINT/SIGTERM(e.g., Ctrl+C) and persist state before exiting. - Subclass cleanly to provide your own
process_dataandprocess_itemlogic. - Avoid rework by skipping already-processed items—or intentionally reprocess cached items to explore alternatives.
In short: incremental processing with safety, resumability, and extensibility built in.
Solution
StatefulDataProcessor provides a resilient, incremental pipeline for large datasets:
- Incremental processing: Iterate through big inputs in manageable chunks (e.g., from a JSON source) without starting over.
- Persistent state: Progress and results are stored in a dictionary on disk; the processor tracks the current position.
- Graceful interruption: Handles
SIGINT/SIGTERM(e.g., Ctrl+C) and saves state before exiting. - Subclass-first design: Implement your own logic by overriding
process_item(required) andprocess_data(optional). - Per-item execution:
run(**kwargs)forwards all arguments toprocess_item, iterating overitemsand processing one at a time. - Unique keys: Results are keyed by each item’s unique label, so items must be unique.
- Customizable workflow: Override
process_datato pre/post-process items, filter, batch, or enrich as needed.
Usage
Example usage in a large project:
alphaspread analysis of nasdaq symbols
xtb to yfinance symbol conversion
Example: Passing extra arguments via a subclass
File: processors.py
from typing import Any, Optional
from stateful_data_processor.processor import StatefulDataProcessor
class GenericAnalyzer(StatefulDataProcessor):
"""
Parent processor that expects an extra kwarg: `payload`.
In a real project this could be HTML, JSON, text, bytes, etc.
"""
def process_item(self, item: str, payload: Optional[Any], iteration_index: int):
# Use the extra arg however you like
self.logger.info(f"[{iteration_index}] Processing {item}; has_payload={payload is not None}")
# Do minimal "work" for the README: store something derived from the payload
result = {
"item": item,
"payload_preview": str(payload)[:40], # keep it tiny for docs
"payload_length": len(str(payload)) if payload is not None else 0,
}
# Persist per-item result; the base class handles saving/resuming
self.data[item] = result
File: run_example.py
# run_example.py
from datetime import date
from typing import Any, List
from stateful_data_processor.file_rw import JsonFileRW
from processors import GenericAnalyzer
def build_payload(item: str) -> Any:
"""
Stand-in for I/O or computation (e.g., HTTP GET, DB read, cache lookup).
Keep it simple for the README.
"""
return f"payload for {item}"
class UrlAnalyzer(GenericAnalyzer):
"""
Child processor that *adds* the extra argument (`payload`)
and forwards it to the parent via super().
"""
def process_item(self, item: str, iteration_index: int):
payload = build_payload(item)
# Forward both the original item and the extra kwarg to the parent
super().process_item(item=item, payload=payload, iteration_index=iteration_index)
if __name__ == "__main__":
items: List[str] = ["AAPL", "MSFT", "GOOGL", "NVDA"]
# Results are saved incrementally; reruns resume from where you stopped.
out_file = JsonFileRW(f"./demo-analysis-{date.today()}.json")
analyzer = UrlAnalyzer(json_file_writer=out_file)
analyzer.run(items=items)
# Access in-memory results if needed
data = analyzer.data
print(f"Processed {len(data)} items. Output file: {out_file.path}")
Example: Passing extra arguments via a subclass
Sometimes your per-item logic needs more than just the item itself (e.g., a pre-fetched blob, metadata, cached JSON). You can add any keyword args you want to your processor’s process_item signature, and then supply them from a subclass.
processors.py
# processors.py
from typing import Any, Optional
from stateful_data_processor.processor import StatefulDataProcessor
class GenericAnalyzer(StatefulDataProcessor):
"""
Parent processor that expects an extra kwarg: `payload`.
In a real project this could be HTML, JSON, text, bytes, etc.
"""
def process_item(self, item: str, payload: Optional[Any], iteration_index: int):
# Use the extra arg however you like
self.logger.info(f"[{iteration_index}] Processing {item}; has_payload={payload is not None}")
# Do minimal "work" for demonstrational purpose: store something derived from the payload
result = {
"item": item,
"payload_preview": str(payload)[:40], # keep it tiny for docs
"payload_length": len(str(payload)) if payload is not None else 0,
}
# Persist per-item result; the base class handles saving/resuming
self.data[item] = result
run_example.py
# run_example.py
from datetime import date
from typing import Any, List
from stateful_data_processor.file_rw import JsonFileRW
from processors import GenericAnalyzer
def build_payload(item: str) -> Any:
"""
Stand-in for I/O or computation (e.g., HTTP GET, DB read, cache lookup).
Keep it simple for the README.
"""
return f"payload for {item}"
class UrlAnalyzer(GenericAnalyzer):
"""
Child processor that *adds* the extra argument (`payload`)
and forwards it to the parent via super().
"""
def process_item(self, item: str, iteration_index: int):
payload = build_payload(item)
# Forward both the original item and the extra kwarg to the parent
super().process_item(item=item, payload=payload, iteration_index=iteration_index)
if __name__ == "__main__":
items: List[str] = ["AAPL", "MSFT", "GOOGL", "NVDA"]
# Results are saved incrementally; reruns resume from where you stopped.
out_file = JsonFileRW(f"./demo-analysis-{date.today()}.json")
analyzer = UrlAnalyzer(json_file_writer=out_file)
analyzer.run(items=items)
# Access in-memory results if needed
data = analyzer.data
print(f"Processed {len(data)} items. Output file: {out_file.path}")
What this demonstrates
-
The parent class
GenericAnalyzerdefinesprocess_item(self, item, payload, iteration_index)— here,payloadis an extra kwarg (analogous to thesoupargument in your originalGurufocusAnalyzer). -
The child class
UrlAnalyzeroverridesprocess_item(self, item, iteration_index), computes the extra data (payload = build_payload(item)), and then forwards it with:super().process_item(item=item, payload=payload, iteration_index=iteration_index)
-
You can add any keyword arguments you need to the parent’s
process_item(e.g.,html,row,features,raw_bytes,context, etc.), and supply them from subclasses that know how to build/fetch them. -
All the usual benefits still apply: incremental processing, state persisted to disk, resumability after crashes or Ctrl+C, and a simple subclassing model.
Releasing
git tag x.y
tox
tox -e docs
tox -e build
tox -e publish -- --repository pypi --verbose
Note
This project has been set up using PyScaffold 4.5. For details and usage information on PyScaffold see https://pyscaffold.org/.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stateful_data_processor-3.4.1.tar.gz.
File metadata
- Download URL: stateful_data_processor-3.4.1.tar.gz
- Upload date:
- Size: 27.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2efbc8f024a269f664c774f3f37b5904c57f2abd5bdcc26576fd27d7c10f8cc9
|
|
| MD5 |
44c3160a5a0f12e8c347b5251dcb6933
|
|
| BLAKE2b-256 |
5643e989e0396a74c175cb26221d406ccdbb3b4f6337c6c87cb527186862b18f
|
File details
Details for the file stateful_data_processor-3.4.1-py3-none-any.whl.
File metadata
- Download URL: stateful_data_processor-3.4.1-py3-none-any.whl
- Upload date:
- Size: 10.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4676a52077752fd7e360090ca4eb6924b4317ff5d06fca38cd75c8492c3a2959
|
|
| MD5 |
b1ba7be7e060995012a94b9a0d97bab1
|
|
| BLAKE2b-256 |
200f31c846a1a5fff21a8c28f376ca3a03e5931eb2db8545f971f1987fa3352b
|