Skip to main content

A Python multiprocessing library for data processing with pipelines.

Project description

datamultiproc: A Python multiprocessing data pipeline library

Create Processors, compose them into pipelines and process your data using Python's multiprocessing library.

Concepts

Sample

A Sample is a data object that is passed through the pipeline. A custom sample can be simply implemented by inheriting from BaseSample and adding the desired data fields. Note that an def __init__(self, ...) method is not required. 'BaseSample' is build in top of Pydantics BaseModel and inherits its functionality. For more information, see Pydantic.

Data fields can be of any type and also Optional. The latter is especially handy when they are filled during the execution of the pipeline.

import numpy as np
from datamultiproc import BaseSample
from typing import Optional


class CustomSample(BaseSample):
    data: int
    optional_data: Optional[str] = None
    array: np.ndarray

BaseSample has two special fields (id and processing_history). The first has to be filled during instantiation of a Sample, the latter is appended to during execution of the pipeline, in order to keep track of all processing steps.

class BaseSample(Aggregate):
    processing_history: List[Tuple[str, str]] = []
    id: str

Processor

A Processor carries out an actual processing step on a Sample. To implement a custom Processor, you simply inherit from the Processorclass and implement the process() method.

# Processor without arguments
class CustomProcessor(Processor):
    def process(self, sample: Sample) -> Sample:
        # do something with sample, e.g.
        sample.data = sample.date + 1
        return sample

    
# Processor with arguments
class CustomProcessor(Processor):
    step_size: int
    _hidden_arg: int = 1

    def process(self, sample: Sample) -> Sample:
        # do something with sample, e.g.
        sample.data = sample.date + self.step_size + self._hidden_arg
        return sample

Compose

Compose chains multiple Processor together. They are executed in the order they are passed to the Compose constructor.

process = Compose(
    CustomProcessor(),
    CustomProcessor(step_size=2),
    CustomProcessor(step_size=3),
)

samples = [...] # some list of samples

processed_samples = [process(s) for s in samples]

Cache and Save

Cache and Save are special Processors that can be used to store intermediate results. Cache stores Samplesafter the processing step, that it is wrapped around as a '.pickle' file. The files are stored inside a subfolder of the specified save_to_dir folder. The subfolder is named after the "Processor" class. If the pipeline is executed again, Cache will first check if the file already exists and if so, it will load the Samples from the file instead of executing the wrapped Processor again.

Save works similarly, but does not load samples from file, if they already exist.

Cache is especially useful for storing intermediate results, that are expensive to compute. Save is useful for storing the final results of a pipeline.

process = Compose(
    Cache(
        processor=CustomProcessor(),
        cache_dir="path/to/cache"
    ),
    CustomProcessor(step_size=2),
    CustomProcessor(step_size=3),
    Save(save_to_dir="path/to/save"),
)

Multiprocessing

The library can be used with Python's multiprocessing library, as follows. A full working example can be found in example.py.

from multiprocessing import Process, Queue

# number of processor cores of your machine
NUM_PROCESSES = 8

# list of samples
samples = [...]

process = Compose([SomeProcessor(), 
                   AnotherProcessor(), 
                   Save(save_to_dir="path/to/save"),
                   ])

# put the samples into a multiprocessing Queue
samples_queue = Queue()
for s in samples:
    samples_queue.put(s)
    

def do_processing(process: Callable, queue: Queue):
    while not queue.empty():
        sample = queue.get()
        try:
            sample = process(sample)
        except ProcessingError:
            print(f"failed processing: {sample.id}")
    
processes = []
for _ in range(NUM_PROCESSES):
    p = Process(target=do_processing, args=(process, samples_queue))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datamultiproc-0.1.3.tar.gz (7.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datamultiproc-0.1.3-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file datamultiproc-0.1.3.tar.gz.

File metadata

  • Download URL: datamultiproc-0.1.3.tar.gz
  • Upload date:
  • Size: 7.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.13 CPython/3.9.16 Darwin/22.2.0

File hashes

Hashes for datamultiproc-0.1.3.tar.gz
Algorithm Hash digest
SHA256 7e9c35fa6dddeddf8098358a69da8b771dd8986d0496a404d207de7a088c6fde
MD5 b35e475a4323ec52cd9b8d8e5ee894bc
BLAKE2b-256 17475c4f793044d2f5e0fe6998af9e52f5fcee44f1815df51c5744c520f803dd

See more details on using hashes here.

File details

Details for the file datamultiproc-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: datamultiproc-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.13 CPython/3.9.16 Darwin/22.2.0

File hashes

Hashes for datamultiproc-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 a5076029cb199dd58b583c24b2ee160e1f7b14cb902768f799688d599a906403
MD5 7cef1258dc8c9ce7714c17461beb8ee0
BLAKE2b-256 f6aa13e2a65446da4b2801e2bff37ec90b6cafd5d2bd3249dac7136077fa7eee

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page