Skip to main content

Parallel processing with progress bars using joblib and tqdm

Project description

job-tqdflex

License: CC BY-SA 4.0 GitHub Actions

A Python library supporting parallel processing with progress bars using joblib (job) and tqdm (tqd), with flexibility (flex) for chunked processing for memory efficiency.

Features

  • Memory efficient - supports generators and iterators
  • Context manager support - automatic cleanup of resources
  • Easy parallel processing with automatic chunking for optimal performance
  • Error handling - support for error handling with detailed logging
  • Custom logging support - compatible with loguru and standard python logging

Installation

pip install job-tqdflex

Quick Start

from job_tqdflex import ParallelApplier
import time

def slow_square(x): 
    time.sleep(0.1) # (slow) function to apply
    return x ** 2

data = range(20)

# Create and run parallel applier
applier = ParallelApplier(slow_square, data, n_jobs=4)
results = applier()

print(results)  # [0, 1, 4, 9, 16, 25, ...]

Usage Examples

Basic Usage

from job_tqdflex import ParallelApplier

def process_item(item):
    # Your processing logic here
    return item * 2

data = [1, 2, 3, 4, 5]
applier = ParallelApplier(process_item, data)
results = applier()

With Additional Arguments

def power_function(base, exponent=2):
    return base ** exponent

data = [1, 2, 3, 4, 5]
applier = ParallelApplier(power_function, data)
results = applier(exponent=3)  # [1, 8, 27, 64, 125]

Using functools.partial for Complex Arguments

from functools import partial

def complex_function(item, multiplier, offset=0):
    return item * multiplier + offset

# Pre-configure the function
configured_func = partial(complex_function, multiplier=3, offset=10)

data = [1, 2, 3, 4, 5]
applier = ParallelApplier(configured_func, data)
results = applier()  # [13, 16, 19, 22, 25]

Working with Generators

def data_generator():
    for i in range(1000):
        yield i

def expensive_computation(x):
    return sum(range(x))

# Works seamlessly with generators
applier = ParallelApplier(expensive_computation, data_generator(), n_jobs=8)
results = applier()

Context Manager Usage

def process_data(item):
    return item ** 2

data = range(100)

# Automatic resource cleanup
with ParallelApplier(process_data, data, n_jobs=4) as applier:
    results = applier()

Different Backends

# For CPU-bound tasks (default)
applier = ParallelApplier(cpu_intensive_func, data, backend="loky")

# For I/O-bound tasks
applier = ParallelApplier(io_bound_func, data, backend="threading")

# For other use cases
applier = ParallelApplier(some_func, data, backend="multiprocessing")

Custom Progress Bar Settings

# Disable progress bar
applier = ParallelApplier(func, data, show_progress=False)

# Custom chunk size for memory management
applier = ParallelApplier(func, large_dataset, chunk_size=100)

# Custom progress bar description (default: "Applying {func_name} to chunks")
applier = ParallelApplier(func, data, custom_desc="Processing...")

Using the Low-Level tqdm_joblib Context Manager

from job_tqdflex import tqdm_joblib
from joblib import Parallel, delayed
from tqdm import tqdm

def slow_function(x):
    time.sleep(0.1)
    return x ** 2

# Direct integration with joblib
with tqdm_joblib(tqdm(total=10, desc="Processing")) as progress_bar:
    results = Parallel(n_jobs=4)(delayed(slow_function)(i) for i in range(10))

Configuration Options

ParallelApplier Parameters

  • func: The function to apply to each item
  • iterable: Input data (list, generator, or any iterable)
  • show_progress: Whether to show progress bars (default: True)
  • n_jobs: Number of parallel jobs (default: 8, use -1 for all cores)
  • backend: Parallelization backend ("loky", "threading", or "multiprocessing")
  • chunk_size: Size of chunks to process (default: auto-calculated)
  • custom_desc: Custom description for the progress bar (default: None, uses "Applying {func_name} to chunks")
  • logger: Optional custom logger instance (supports standard logging and loguru)

Performance Tips

  1. Choose the right backend:

    • "loky" (default): Best for CPU-bound tasks
    • "threading": Good for I/O-bound tasks
    • "multiprocessing": For CPU-bound tasks with shared memory concerns
  2. Optimize chunk size:

    • Larger chunks reduce overhead but increase memory usage
    • Smaller chunks provide better load balancing
    • Auto-calculation usually works well
  3. Use generators for large datasets:

    def large_data_generator():
        for i in range(1_000_000):
            yield expensive_data_loader(i)
    
    applier = ParallelApplier(process_func, large_data_generator())
    

Error Handling

The library provides comprehensive error handling:

def potentially_failing_function(x):
    if x == 42:
        raise ValueError("The answer to everything!")
    return x * 2

try:
    applier = ParallelApplier(potentially_failing_function, range(100))
    results = applier()
except RuntimeError as e:
    print(f"Parallel processing failed: {e}")

Logging

Standard Python Logging

Enable debug logging to monitor performance:

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("joblib_tqdm")

# Your parallel processing code here

Custom Logger Support (including Loguru)

The library supports custom logger instances, including loguru:

# With loguru (if installed)
from loguru import logger as loguru_logger

def process_item(x):
    return x ** 2

data = range(100)

# Use loguru for all internal logging
applier = ParallelApplier(process_item, data, logger=loguru_logger)
results = applier()

# Or with tqdm_joblib context manager
from tqdm import tqdm
with tqdm_joblib(tqdm(total=100, desc="Processing"), logger=loguru_logger) as pbar:
    results = Parallel(n_jobs=4)(delayed(process_item)(i) for i in data)
# With standard logging custom logger
import logging

custom_logger = logging.getLogger("my_custom_logger")
custom_logger.setLevel(logging.INFO)

applier = ParallelApplier(process_item, data, logger=custom_logger)
results = applier()

Note: Loguru is not a required dependency. It's included in the [dev] optional dependencies for testing purposes. You can use any logger object that has debug() and error() methods.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the CC BY-SA 4.0 License - see the LICENSE file for details.

Attribution

This project includes code based on the tqdm_joblib implementation by Louis Abraham, which is distributed under CC BY-SA 4.0. The original implementation was inspired by a Stack Overflow solution for integrating tqdm with joblib's parallel processing.

Acknowledgments

  • Built on top of the excellent joblib library
  • Progress bars provided by tqdm
  • Based on the original tqdm_joblib by Louis Abraham
  • Inspired by the need for simple parallel processing with progress tracking and custom logging support

Changelog

0.1.0 (2025)

  • Initial release
  • Basic parallel processing with progress bars
  • Support for multiple backends (loky, threading, multiprocessing)
  • Generator and iterator support
  • Context manager support
  • Custom logger support (compatible with loguru and standard logging)
  • Comprehensive test suite including loguru integration tests
  • Memory efficient chunking with auto-calculated chunk sizes

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

job_tqdflex-0.1.1.tar.gz (23.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

job_tqdflex-0.1.1-py3-none-any.whl (16.2 kB view details)

Uploaded Python 3

File details

Details for the file job_tqdflex-0.1.1.tar.gz.

File metadata

  • Download URL: job_tqdflex-0.1.1.tar.gz
  • Upload date:
  • Size: 23.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.18

File hashes

Hashes for job_tqdflex-0.1.1.tar.gz
Algorithm Hash digest
SHA256 99e82fd08d930a3ce08322cdbb3d39268508bc11ec42b8f7a5e1b1345f85f18c
MD5 029e80367ba98602c3f5c344a23f0db6
BLAKE2b-256 98b178d326f9461b6c3b279d86768f54147ffbd7108f2e055ac0b1d11e7bd255

See more details on using hashes here.

File details

Details for the file job_tqdflex-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: job_tqdflex-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 16.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.18

File hashes

Hashes for job_tqdflex-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 88a926ef720a6267df2c55159b18112e449e1f42e1e4510b3ebe5e64a54917ec
MD5 788c33bbcce6b6085f82c9574750cb65
BLAKE2b-256 eb910371373e633619eb947a14c42bd9ecc5b7658e2e3fe4ea3272df860293ef

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page