Simple python multiprocessing manager
Project description
Simple multiprocessing management tool
Handles big amounts of data using python multiprocessing
Concept
The main idea is to utilize multiprocessing, by building source queue and handle items to process in workers.
Multiprocess managing is implemented using mp_manager.manager.py with help of mp_manager.process_helpers.py that provides events, locks, shared values and stats, exception handler helper and ExitStatus
Workers for multiprocessing are built based on handlers classes.
Handlers
Handlers must inherit mp_manager.base_handler.BaseHandler and at least implement:
classmethod build_items() that must return iterable of items to handle by workers
_handle_item() which is used to handle current item
They also could implement:
log_item() that is used by manager as a source for logging
_reset_stats() is used internally to reset worker dict stats related to current item
Sample usage
#!/usr/bin/env python3.7
# -*- coding: utf-8 -*-
import math
import random
import time
from mp_manager.base_handler import BaseHandler
from mp_manager.process_helpers import Stats, ns_container
from mp_manager.utils import build_batches
class SampleHandler(BaseHandler):
batch_size = 2
@classmethod
def build_items(cls, *args, **kwargs):
total = 100
batches_total = int(math.ceil(total / cls.batch_size))
Stats.set_stats_val('items_total', total)
Stats.set_stats_val('batches_total', batches_total)
ns_container.set_shared_value('shared_data', cls._data_to_share(),
True)
for i, val in enumerate(build_batches(range(total), cls.batch_size)):
yield i, val
@property
def log_item(self):
return f'{self.current_item[0]} / {Stats.stats.batches_total}'
@classmethod
def _data_to_share(cls):
return dict(somekey="some value")
def _reset_stats(self):
self.stats = {
'items_processed': 0,
'items_positive': 0,
'items_negative': 0,
}
def _handle_item(self):
num, items = self.current_item
for item in items:
self._process_number(item)
Stats.inc_stats_vals(self.stats)
self.logger.info(Stats.get_stats())
def _process_number(self, item):
time.sleep(random.random())
value = math.factorial(item) / math.cos(item)
self.stats['items_processed'] += 1
if value > 0:
self.stats['items_positive'] += 1
else:
self.stats['items_negative'] += 1
if __name__ == '__main__':
import logging
from mp_manager.manager import ImportManager
logging.basicConfig(level=logging.DEBUG)
status = ImportManager(SampleHandler, num_workers=8).run()
print(status)
exit(status.code)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mp_manager-0.2-py3-none-any.whl.
File metadata
- Download URL: mp_manager-0.2-py3-none-any.whl
- Upload date:
- Size: 7.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.18.4 setuptools/40.4.3 requests-toolbelt/0.9.1 tqdm/4.32.1 CPython/2.7.15rc1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fef727e05e05541632fb029e8004a39d732c59714ecb2559a6597a1fcbf02254
|
|
| MD5 |
20a0e5f50763b482e79a2f58034e32e9
|
|
| BLAKE2b-256 |
c7b25668a7753640a15bcc7c2c998fa679842459bc349b0a4eff1f4b713e42c0
|