Skip to main content

Add a short description here!

Project description

Project generated with PyScaffold Python Pipeline status

stateful-data-processor

stateful-data-processor is a utility designed to handle large amounts of data incrementally. It allows you to process data step-by-step, saving progress to avoid data loss in case of interruptions or errors. The processor can be subclassed to implement custom data processing logic.

Features

  • Incrementally process large datasets.

  • Save the processing state to a file.

  • Resume the processing state and skip already processed items automatically

  • Handle SIGINT and SIGTERM signals for graceful shutdown and state saving.

  • Easily subclass to implement custom data processing.

Problem

You have a large amount of data that you want to loop through and process incrementally. Processing takes time, and in case an error occurs, you do not want to lose all the progress. You want to save the data to a file and be able to continue processing from where you left off. You also want to be able to interrupt the processing with a SIGINT signal and save the data to the file. You want to be able to subclass the processor and implement the process_data and process_item methods. You want to be able to iterate through items and process them one by one.

Solution

StatefulDataProcessor class to process data incrementally:

  • Incremental Processing: Process large amounts of data in a JSON file incrementally.

  • Data Storage: The data is stored in a dictionary, and the processor keeps track of the current step being processed.

  • Graceful Interruption: The processor can be interrupted with a SIGINT or SIGTERM signal, and the data will be saved to the file.

  • Subclassing: The processor is meant to be subclassed, and the process_item method should be implemented.

  • Item Processing: The process_item is being called with all arguments forwarded from run, except for items, which is unpacked and iterated item by item.

  • Unique Labels: The data is be stored in a dictionary using unique labels corresponding to items. Thus, each item must be unique.

  • Customization: The process_data method can be overridden for more customized processing of the items.

Usage

import time
from stateful_data_processor.file_rw import FileRW
from stateful_data_processor.processor import StatefulDataProcessor

class MyDataProcessor(StatefulDataProcessor):

 def process_item(self, item, delay):
     # Process the item
     self.data[item] = item ** 2  # Example processing: square the item
     time.sleep(delay)

# Example usage
file_rw = FileRW('data.json')
processor = MyDataProcessor(file_rw)

items_to_process = [1, 2, 3, 4, 5]
processor.run(items=items_to_process, delay=1.5)

The processor will handle SIGINT and SIGTERM signals to save the current state before exiting. Run your processor, and use Ctrl+C to send a SIGINT signal. When you run again, the processing will pick up from where you left off. A logger is automatically created if you do not inject it into the constructor.

Installation

You can install stateful-data-processor using pip:

pip install stateful-data-processor

Releasing

git tag x.y
tox
tox -e docs
tox -e build
tox -e publish -- --repository pypi --verbose

Note

This project has been set up using PyScaffold 4.5. For details and usage information on PyScaffold see https://pyscaffold.org/.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stateful_data_processor-2.0.tar.gz (24.3 kB view hashes)

Uploaded Source

Built Distribution

stateful_data_processor-2.0-py3-none-any.whl (7.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page