Skip to main content

Provides a convenient way to display progress bars for concurrent asyncio or multiprocessing Pool processes.

Project description

pypbars

build Code Grade coverage vulnerabilities PyPI version python

The pypbars module provides a convenient way to display progress bars for concurrent asyncio or multiprocessing Pool processes. The pypbars class is a subclass of l2term that displays a list to the terminal, and uses progress1bar to render the progress bar.

Installation

pip install pypbars

example1 - ProgressBars with asyncio

Create ProgressBars using a lookup list containing unique values, these identifiers will be used to get the index of the appropriate ProgressBar to be updated. The convention is for the function to include logger.write calls containing the identifier and a message for when and how the respective progress bar should be updated. In this example the default regex dict is used but the caller can specify their own, so long as it contains regular expressions for how to detect when total, count and optional alias are set.

Code
import asyncio
import random
from faker import Faker
from pypbars import ProgressBars

async def do_work(worker, logger=None):
    logger.write(f'{worker}->worker is {worker}')
    total = random.randint(10, 65)
    logger.write(f'{worker}->processing total of {total} items')
    for count in range(total):
        # mimic an IO-bound process
        await asyncio.sleep(.1)
        logger.write(f'{worker}->processed {count}')
    return total

async def run(workers):
    with ProgressBars(lookup=workers, show_prefix=False, show_fraction=False) as logger:
        doers = (do_work(worker, logger=logger) for worker in workers)
        return await asyncio.gather(*doers)

def main():
    workers = [Faker().user_name() for _ in range(10)]
    print(f'Total of {len(workers)} workers working concurrently')
    results = asyncio.run(run(workers))
    print(f'The {len(workers)} workers processed a total of {sum(results)} items')

if __name__ == '__main__':
    main()

example1

example2 - ProgressBars with multiprocessing Pool

This example demonstrates how pypbars can be used to display progress bars from processes executing in a multiprocessing Pool. The parent l2term.multiprocessing module contains helper classes that define a LinesQueue as well as a QueueManager to facilitate communication between worker processes and the main process. In this example, we leverage a Pool of workers to compute the number of prime numbers in a given number range. The worker processes are passed a queue that they write messages to, while the main process reads messages from the queue, interprets the message and updates the ProgressBar respectively. Note that each line represents a background worker process.

Code
import time
from multiprocessing import Pool
from multiprocessing import get_context
from multiprocessing import cpu_count
from list2term.multiprocessing import LinesQueue
from list2term.multiprocessing import QueueManager
from queue import Empty
from pypbars import ProgressBars

CONCURRENCY = cpu_count()

def is_prime(num):
    if num == 1:
        return False
    for i in range(2, num):
        if (num % i) == 0:
            return False
    else:
        return True

def count_primes(start, stop, logger):
    workerid = f'{start}:{stop}'
    logger.write(f'{workerid}->worker is {workerid}')
    logger.write(f'{workerid}->processing total of {stop - start} items')
    primes = 0
    for number in range(start, stop):
        if is_prime(number):
            primes += 1
        logger.write(f'{workerid}->processed {number}')
    return primes

def main(number):
    step = int(number / CONCURRENCY)
    QueueManager.register('LinesQueue', LinesQueue)
    with QueueManager() as manager:
        queue = manager.LinesQueue(ctx=get_context())
        with Pool(CONCURRENCY) as pool:
            process_data = [(index, index + step, queue) for index in range(0, number, step)]
            results = pool.starmap_async(count_primes, process_data)
            lookup = [f'{data[0]}:{data[1]}' for data in process_data]
            with ProgressBars(lookup=lookup, show_prefix=False, show_fraction=False, use_color=True) as lines:
                while True:
                    try:
                        lines.write(queue.get(timeout=.1))
                    except Empty:
                        if results.ready():
                            break
    return sum(results.get())

if __name__ == '__main__':
    start = time.perf_counter()
    number = 50_000
    result = main(number)
    stop = time.perf_counter()
    print(f"Finished in {round(stop - start, 2)} seconds\nTotal number of primes between 0-{number}: {result}")

example2

Development

Clone the repository and ensure the latest version of Docker is installed on your development server.

Build the Docker image:

docker image build \
-t \
pypbars:latest .

Run the Docker container:

docker container run \
--rm \
-it \
-v $PWD:/code \
pypbars:latest \
bash

Execute the build:

pyb -X

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pypbars-0.1.6.tar.gz (5.4 kB view hashes)

Uploaded Source

Built Distribution

pypbars-0.1.6-py3-none-any.whl (5.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page