A multi-processing enabled progress bar.
Project description
mppbar
The mppbar module provides a convenient way to scale execution of a function across multiple input values by distributing the input across a specified number of background processes, it displays the execution status of each background process using a progress bar; the MPpbar class is a subclass of MPmq. The main benefit of using mppbar
is the target function requires only minimal modification (if at all). The progress bar will be setup and determined by inspecting the log messages that generated by your function, thus implementing logging is the only requirement of the target function.
Installation
pip install mppbar
MPpbar class
MPpbar(function, process_data=None, shared_data=None, processes_to_start=None, regex=None, fill=None)
function
- The function to execute. It should accept two positional arguments; the first argument is the dictionary created for the respective process seeprocess_data
below, the second argument is the shared dictionary sent to all proceses seeshared_data
below.
process_data
- A list of dictionaries where each dictionary describes the input data that will be sent to the respective background process executing the function; the length of the list dictates the total number of processes that will be executed.
shared_data
- A dictionary containing arbitrary data that will be sent to all processes.
processes_to_start
- The number of processes to initially start; this represents the number of concurrent processes that will be running. If the total number of processes is greater than this number then execution will be queued and executed to ensure that this concurrency is maintained. Defaults to the length of theprocess_data
lsit.
regex
- A dictionary whose key values are regular expressions fortotal
,count
andalias
. The regular expressions will be checked against the log messages generated by the executing function, if matched will be used to assign the attributes for the respective progress bar. Thetotal
andcount
key values are required, thealias
key value is optional.
fill
- A dictionary whose key values are integers that dictate the number of leading zeros the progress bar should add to thetotal
,index
andcompleted
values; this is optional and should be used to format the progress bar appearance. The supported key values aremax_total
,max_index
andmax_completed
.
execute(raise_if_error=False)
Start execution the process’s activity. If
raise_if_error
is set to True, an exception will be raised if any function encountered an error during execution.
Examples
example1
Distribute work across multiple processes executing concurrently and each displays a progress bar showing its execution status.
Code
from mppbar import MPpbar
import time, names, random, logging
logger = logging.getLogger(__name__)
def do_work(data, *args):
logger.debug(f'processor is {names.get_last_name()}')
total = data['total']
logger.debug(f'processing total of {total}')
for index in range(total):
time.sleep(random.choice([.1, .2, .4]))
logger.debug(f'processed item {index}')
return total
def main():
process_data = [{'total': random.randint(8, 16)} for item in range(6)]
regex = {
'total': r'^processing total of (?P<value>\d+)$',
'count': r'^processed item \d+$',
'alias': r'^processor is (?P<value>.*)$',
}
print('>> Processing items...')
pbars = MPpbar(function=do_work, process_data=process_data, regex=regex, timeout=1)
results = pbars.execute()
print(f">> {len(process_data)} workers processed a total of {sum(result for result in results)} items")
if __name__ == '__main__':
main()
example2
Distribute work across multiple processes but only a subset are executing concurrently and each displays a progress bar showing its execution status. Useful if you can only afford to run a few background processes concurrently.
Code
from mppbar import MPpbar
import time, names, random, logging
logger = logging.getLogger(__name__)
def do_work(data, *args):
logger.debug(f'processor is {names.get_last_name()}')
total = data['total']
logger.debug(f'processing total of {total}')
for index in range(total):
time.sleep(random.choice([.1, .2, .4]))
logger.debug(f'processed item {index}')
return total
def main():
process_data = [{'total': random.randint(8, 16)} for item in range(6)]
regex = {
'total': r'^processing total of (?P<value>\d+)$',
'count': r'^processed item \d+$',
'alias': r'^processor is (?P<value>.*)$',
}
fill = {
'max_total': 100
}
print('>> Processing items...')
pbars = MPpbar(function=do_work, process_data=process_data, regex=regex, fill=fill, processes_to_start=3, timeout=1)
results = pbars.execute()
print(f">> {len(process_data)} workers processed a total of {sum(result for result in results)} items")
if __name__ == '__main__':
main()
example3
Distribute alot of work across a small set of processes using a thread-safe queue, each process get work off the queue until there is no more work, all processes reuse a progress bar to show its execution status. Useful if you have alot of data to distribute across a small set of workers.
Code
from mppbar import MPpbar
import time, names, random, logging
from multiprocessing import Queue
from queue import Empty
logger = logging.getLogger(__name__)
def do_work(total):
logger.debug(f'processor is {names.get_last_name()}')
logger.debug(f'processing total of {total}')
for index in range(total):
time.sleep(random.choice([.001, .003, .005]))
logger.debug(f'processed item {index}')
return total
def prepare_queue():
queue = Queue()
for _ in range(100):
queue.put({'total': random.randint(40, 99)})
return queue
def run_q(data, *args):
queue = data['queue']
result = 0
while True:
try:
total = queue.get(timeout=1)['total']
result += do_work(total)
logger.debug('reset-mppbar')
except Empty:
logger.debug('reset-mppbar-complete')
break
return result
def main():
queue = prepare_queue()
process_data = [{'queue': queue} for item in range(3)]
regex = {
'total': r'^processing total of (?P<value>\d+)$',
'count': r'^processed item \d+$',
'alias': r'^processor is (?P<value>.*)$',
}
print('>> Processing items...')
pbars = MPpbar(function=run_q, process_data=process_data, regex=regex, timeout=1)
results = pbars.execute()
print(f">> {len(process_data)} workers processed a total of {sum(result for result in results)} items")
if __name__ == '__main__':
main()
Development
Clone the repository and ensure the latest version of Docker is installed on your development server.
Build the Docker image:
docker image build \
-t \
mppbar:latest .
Run the Docker container:
docker container run \
--rm \
-it \
-v $PWD:/code \
mppbar:latest \
/bin/bash
Execute the build:
pyb -X
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file mppbar-0.1.0.tar.gz
.
File metadata
- Download URL: mppbar-0.1.0.tar.gz
- Upload date:
- Size: 6.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | dce2053ce24a36a145c323bfc40bd64e998e44396535faf250b756be5a2a377a |
|
MD5 | 1f6a155fb571c3ef827049ce70b21f55 |
|
BLAKE2b-256 | d6fb64e2ffac02ce641aaa0c62b45e51a69f8b0952f84c91eb8177f40bf76a26 |
Provenance
File details
Details for the file mppbar-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: mppbar-0.1.0-py3-none-any.whl
- Upload date:
- Size: 5.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 479b5f03e60c6985024dc5278b415d9245b999be715c1cc3b7a5ccb52bb2eb57 |
|
MD5 | 0d904172a8a8747363c408b1b26154d8 |
|
BLAKE2b-256 | 2bc18427f62f58e08be14954bb5e21486af31e224567e5ae2707728db28f7cea |