Skip to main content

Simple python multiprocessing manager

Project description

Simple multiprocessing management tool

Handles big amounts of data using python multiprocessing

Concept

The main idea is to utilize multiprocessing, by building source queue and handle items to process in workers.

Multiprocess managing is implemented using mp_manager.manager.py with help of mp_manager.process_helpers.py that provides events, locks, shared values and stats, exception handler helper and ExitStatus

Workers for multiprocessing are built based on handlers classes.

Handlers

Handlers must inherit mp_manager.base_handler.BaseHandler and at least implement:

  1. classmethod build_items() that must return iterable of items to handle by workers
  2. _handle_item() which is used to handle current item

They also could implement:

  1. log_item() that is used by manager as a source for logging
  2. _reset_stats() is used internally to reset worker dict stats related to current item

Sample usage

#!/usr/bin/env python3.7
# -*- coding: utf-8 -*-

import math
import random
import time

from mp_manager.base_handler import BaseHandler
from mp_manager.process_helpers import Stats, ns_container
from mp_manager.utils import build_batches


class SampleHandler(BaseHandler):

    batch_size = 2

    @classmethod
    def build_items(cls, *args, **kwargs):
        total = 100
        batches_total = int(math.ceil(total / cls.batch_size))

        Stats.set_stats_val('items_total', total)
        Stats.set_stats_val('batches_total', batches_total)
        ns_container.set_shared_value('shared_data', cls._data_to_share(),
                                      True)
        for i, val in enumerate(build_batches(range(total), cls.batch_size)):
            yield i, val

    @property
    def log_item(self):
        return f'{self.current_item[0]} / {Stats.stats.batches_total}'

    @classmethod
    def _data_to_share(cls):
        return dict(somekey="some value")

    def _reset_stats(self):
        self.stats = {
            'items_processed': 0,
            'items_positive': 0,
            'items_negative': 0,
        }

    def _handle_item(self):
        num, items = self.current_item
        for item in items:
            self._process_number(item)
        Stats.inc_stats_vals(self.stats)
        self.logger.info(Stats.get_stats())

    def _process_number(self, item):
        time.sleep(random.random())
        value = math.factorial(item) / math.cos(item)
        self.stats['items_processed'] += 1
        if value > 0:
            self.stats['items_positive'] += 1
        else:
            self.stats['items_negative'] += 1


if __name__ == '__main__':
    import logging
    from mp_manager.manager import ImportManager

    logging.basicConfig(level=logging.DEBUG)
    status = ImportManager(SampleHandler, num_workers=8).run()

    print(status)
    exit(status.code)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for mp-manager, version 0.2
Filename, size File type Python version Upload date Hashes
Filename, size mp_manager-0.2-py3-none-any.whl (7.3 kB) File type Wheel Python version py3 Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page