Provides a convenient way to display progress bars for concurrent asyncio or multiprocessing Pool processes.
Project description
pypbars
The pypbars
module provides a convenient way to display progress bars for concurrent asyncio or multiprocessing Pool processes. The pypbars
class is a subclass of list2term that displays a list to the terminal, and uses progress1bar to render the progress bar.
Installation
pip install pypbars
example1 - ProgressBars with asyncio
Create ProgressBars
using a lookup list containing unique values, these identifiers will be used to get the index of the appropriate ProgressBar
to be updated. The convention is for the function to include logger.write
calls containing the identifier and a message for when and how the respective progress bar should be updated. In this example the default regex
dict is used but the caller can specify their own, so long as it contains regular expressions for how to detect when total
, count
and optional alias
are set.
Code
import asyncio
import random
from faker import Faker
from pypbars import ProgressBars
async def do_work(worker, logger=None):
logger.write(f'{worker}->worker is {worker}')
total = random.randint(10, 65)
logger.write(f'{worker}->processing total of {total} items')
for count in range(total):
# mimic an IO-bound process
await asyncio.sleep(.1)
logger.write(f'{worker}->processed {count}')
return total
async def run(workers):
with ProgressBars(lookup=workers, show_prefix=False, show_fraction=False) as logger:
doers = (do_work(worker, logger=logger) for worker in workers)
return await asyncio.gather(*doers)
def main():
workers = [Faker().user_name() for _ in range(10)]
print(f'Total of {len(workers)} workers working concurrently')
results = asyncio.run(run(workers))
print(f'The {len(workers)} workers processed a total of {sum(results)} items')
if __name__ == '__main__':
main()
example2 - ProgressBars with multiprocessing Pool
This example demonstrates how pypbars
can be used to display progress bars from processes executing in a multiprocessing Pool. The list2term.multiprocessing
module contains a pool_map
method that fully abstracts the required multiprocessing constructs, you simply pass it the function to execute, an iterable containing the arguments to pass each process, and an instance of ProgressBars
. The method will execute the functions asynchronously, update the progress bars accordingly and return a multiprocessing.pool.AsyncResult object. Each progress bar in the terminal represents a background worker process.
If you do not wish to use the abstraction, the list2term.multiprocessing
module contains helper classes that facilitate communication between the worker processes and the main process; the QueueManager
provide a way to create a LinesQueue
queue which can be shared between different processes. Refer to example2b for how the helper methods can be used.
Note the function being executed must accept a LinesQueue
object that is used to write messages via its write
method, this is the mechanism for how messages are sent from the worker processes to the main process, it is the main process that is displaying the messages to the terminal. The messages must be written using the format {identifier}->{message}
, where {identifier} is a string that uniquely identifies a process, defined via the lookup argument to ProgressBars
.
Code
import time
from pypbars import ProgressBars
from list2term.multiprocessing import pool_map
from list2term.multiprocessing import CONCURRENCY
def is_prime(num):
if num == 1:
return False
for i in range(2, num):
if (num % i) == 0:
return False
else:
return True
def count_primes(start, stop, logger):
workerid = f'{start}:{stop}'
logger.write(f'{workerid}->worker is {workerid}')
logger.write(f'{workerid}->processing total of {stop - start} items')
primes = 0
for number in range(start, stop):
if is_prime(number):
primes += 1
logger.write(f'{workerid}->processed {number}')
logger.write(f'{workerid}->{workerid} processing complete')
return primes
def main(number):
step = int(number / CONCURRENCY)
iterable = [(index, index + step) for index in range(0, number, step)]
lookup = [':'.join(map(str, item)) for item in iterable]
progress_bars = ProgressBars(lookup=lookup, show_prefix=False, show_fraction=False, use_color=True)
# print to screen with progress bars context
results = pool_map(count_primes, iterable, context=progress_bars)
# print to screen without progress bars context
# results = pool_map(count_primes, iterable)
# do not print to screen
# results = pool_map(count_primes, iterable, print_status=False)
return sum(results.get())
if __name__ == '__main__':
start = time.perf_counter()
number = 50_000
result = main(number)
stop = time.perf_counter()
print(f"Finished in {round(stop - start, 2)} seconds\nTotal number of primes between 0-{number}: {result}")
example3 - resettable ProgressBars with multiprocessing Pool and Queue
This example demonstrates how pypbars
can be used to display progress bars from a small set processes executing in a multiprocessing Pool working on large amount of data defined in a shared work Queue. The workers will pop off the work from work queue and process it until there is no more work left in the work Queue. Since the workers are working on multiple sets the ProgressBar is reset everytime a worker begins work on a new set. The ProgressBar maintains the number of iterations it has completed.
Code
import time, random, logging
from multiprocessing import Queue
from queue import Empty
import names
from faker import Faker
from multiprocessing import Pool
from multiprocessing import get_context
from multiprocessing import cpu_count
from list2term.multiprocessing import LinesQueue
from list2term.multiprocessing import QueueManager
from queue import Empty
from pypbars import ProgressBars
logger = logging.getLogger(__name__)
def prepare_queue(queue):
for _ in range(55):
queue.put({'total': random.randint(100, 150)})
def do_work(worker_id, total, logger):
logger.write(f'{worker_id}->worker is {names.get_full_name()}')
logger.write(f'{worker_id}->processing total of {total} items')
for index in range(total):
# simulate work by sleeping
time.sleep(random.choice([.001, .003, .008]))
logger.write(f'{worker_id}->processed {index}')
return total
def run_q(worker_id, queue, logger):
result = 0
while True:
try:
total = queue.get(timeout=1)['total']
result += do_work(worker_id, total, logger)
logger.write(f'{worker_id}->reset')
except Empty:
break
return result
def main(processes):
QueueManager.register('LinesQueue', LinesQueue)
QueueManager.register('Queue', Queue)
with QueueManager() as manager:
queue = manager.LinesQueue(ctx=get_context())
data_queue = manager.Queue()
prepare_queue(data_queue)
with Pool(processes) as pool:
print(f">> Adding {data_queue.qsize()} sets into a data queue that {processes} workers will work from until empty")
process_data = [(Faker().name(), data_queue, queue) for index in range(processes)]
results = pool.starmap_async(run_q, process_data)
lookup = [f'{data[0]}' for data in process_data]
with ProgressBars(lookup=lookup, show_prefix=False, show_fraction=False, use_color=True, show_duration=True, clear_alias=True) as lines:
while True:
try:
item = queue.get(timeout=.1)
if item.endswith('->reset'):
index, message = lines.get_index_message(item)
lines[index].reset(clear_alias=False)
else:
lines.write(item)
except Empty:
if results.ready():
for index, _ in enumerate(lines):
lines[index].complete = True
break
return sum(results.get())
if __name__ == '__main__':
processes = 3
results = main(processes)
print(f">> {processes} workers processed a total of {results} items")
Development
Clone the repository and ensure the latest version of Docker is installed on your development server.
Build the Docker image:
docker image build \
-t \
pypbars:latest .
Run the Docker container:
docker container run \
--rm \
-it \
-v $PWD:/code \
pypbars:latest \
bash
Execute the build:
pyb -X
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pypbars-0.1.13.tar.gz
.
File metadata
- Download URL: pypbars-0.1.13.tar.gz
- Upload date:
- Size: 6.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.14
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 708061d95ea9c99d737cb8e674cec8368a6af94442ac30feaa32a34f1c0762fd |
|
MD5 | f5ceaa255b5600b66f705226d2b8cf10 |
|
BLAKE2b-256 | b7f62e4335be8ac538f61928231fd701ed76a7e9a780105cf810260d8b52abab |
File details
Details for the file pypbars-0.1.13-py3-none-any.whl
.
File metadata
- Download URL: pypbars-0.1.13-py3-none-any.whl
- Upload date:
- Size: 6.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.14
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5ddaadbf3f55041bae65c403bd26bcc671f37d0cfd6c31d497622a86b0d5ff97 |
|
MD5 | 19540c81eaa2aecec4056a2f98aef86c |
|
BLAKE2b-256 | 2c3a7a1f0fe62ddde03ea2eb50f14602f01fbdc3b2e66446f9666302e91d3d16 |