Skip to main content

Flexible batch processing utilities

Project description

nbatch

License BSD-3 PyPI Python Version tests codecov napari hub npe2 Copier

Lightweight batch processing utilities for the ndev-kit ecosystem.

nbatch provides a foundation for batch processing operations. It's designed to work seamlessly with napari plugins but has no napari or Qt dependencies.

Features

  • @batch decorator - Transform single-item functions into batch-capable functions
  • BatchContext - Track progress through batch operations
  • BatchRunner - Orchestrate batch operations with threading, progress callbacks, and cancellation
  • discover_files() - Flexible file discovery with natural sorting (like file explorers)
  • batch_logger - Scoped logging for batch operations with headers/footers
  • Minimal dependencies - Only requires natsort for natural file ordering
  • Optional napari integration - Uses napari's threading when available, falls back to standard threads

Installation

pip install nbatch

For development:

pip install -e . --group dev

Quick Start

Basic Batch Processing

The @batch decorator transforms a function that processes a single item into one that handles both single items and batches:

from pathlib import Path
from nbatch import batch

@batch
def process_image(path: Path) -> str:
    # Your processing logic here
    return path.stem.upper()

# Single item - returns result directly
result = process_image(Path("image.tif"))
# Returns: "IMAGE"

# List of items - returns generator
results = process_image([Path("a.tif"), Path("b.tif")])
list(results)
# Returns: ["A", "B"]

# Directory - discovers files and returns generator
results = process_image(Path("/data/images"))
# Processes all files in directory

Progress Tracking

Use with_context=True to get progress information:

@batch(with_context=True)
def process_image(path: Path) -> str:
    return path.stem

for result, ctx in process_image(files):
    print(f"{ctx.progress:.0%} complete: {result}")
    # 10% complete: image1
    # 20% complete: image2
    # ...

The BatchContext provides:

  • ctx.index - Zero-based index of current item
  • ctx.total - Total number of items
  • ctx.item - The current item being processed
  • ctx.progress - Progress as fraction (0.0 to 1.0)
  • ctx.is_first / ctx.is_last - Boolean flags

Error Handling

Control how errors are handled with on_error:

# 'raise' (default) - Re-raise exceptions immediately
@batch(on_error='raise')
def strict_process(path): ...

# 'continue' - Log error and yield None for failed items
@batch(on_error='continue')
def lenient_process(path): ...
# Results: ["good", None, "ok"]

# 'skip' - Log error and skip failed items entirely
@batch(on_error='skip')
def skip_errors(path): ...
# Results: ["good", "ok"]

File Discovery

Control which files are processed:

# Custom glob patterns
@batch(patterns='*.tif')
def process_tiffs(path): ...

# Multiple patterns
@batch(patterns=['*.tif', '*.tiff', '*.png'])
def process_images(path): ...

# Non-recursive (top-level only)
@batch(recursive=False)
def process_top_level(path): ...

Or use discover_files() directly:

from nbatch import discover_files

# From directory with patterns
files = discover_files("/data/images", patterns=["*.tif", "*.png"])

# From explicit list
files = discover_files([path1, path2, path3])

Logging

Use batch_logger for structured logging. By default, it outputs to the console (stderr). Optionally log to a file:

from nbatch import batch, batch_logger

@batch(with_context=True)
def process(path):
    return path.stem

# Console only (default)
with batch_logger() as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")

# With file logging (appends by default)
with batch_logger(log_file="output/process.log", header={"Files": 100}) as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")
        # Or use log.info(), log.warning(), log.error()

# File only (no console output)
with batch_logger(log_file="output/quiet.log", console=False) as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")

Log file output:

============================================================
Batch processing started at 2025-01-29 10:30:00
------------------------------------------------------------
Files: 100
============================================================
2025-01-29 10:30:01 - INFO - [1/100] image1.tif - Processed: image1
2025-01-29 10:30:02 - INFO - [2/100] image2.tif - Processed: image2
...
============================================================
Batch processing completed at 2025-01-29 10:35:00
============================================================

Integration with napari

Using BatchRunner (Recommended)

BatchRunner provides clean orchestration for widgets with threading, progress callbacks, and cancellation:

from nbatch import batch, BatchRunner

# Define your processing function (pure, testable)
@batch(on_error='continue')
def process_image(path, model, output_dir):
    result = model.predict(load_image(path))
    save_result(result, output_dir / path.name)
    return result

# In your widget class
class MyWidget:
    def __init__(self, viewer):
        self._viewer = viewer
        
        # Create runner once - reusable for all batches
        self.runner = BatchRunner(
            on_start=self._on_batch_start,
            on_item_complete=self._on_item_complete,
            on_complete=self._on_batch_complete,
            on_error=self._on_item_error,
            on_cancel=self._on_cancelled,
        )
        
        self._run_button.clicked.connect(self.run_batch)
        self._cancel_button.clicked.connect(self.runner.cancel)
    
    def _on_batch_start(self, total):
        """Called when batch starts with total item count."""
        self._progress_bar.setValue(0)
        self._progress_bar.setMaximum(total)
    
    def _on_item_complete(self, result, ctx):
        """Called after each item completes."""
        self._progress_bar.setValue(ctx.index + 1)
        # Optionally add result to viewer
        if result is not None:
            self._viewer.add_image(result, name=f"Result {ctx.index}")
    
    def _on_batch_complete(self):
        errors = self.runner.error_count
        if errors > 0:
            self._progress_bar.label = f"Done with {errors} errors"
        else:
            self._progress_bar.label = "Complete!"
    
    def _on_item_error(self, ctx, exception):
        self._progress_bar.label = f"Error on {ctx.item.name}"
    
    def _on_cancelled(self):
        self._progress_bar.label = "Cancelled"
    
    def run_batch(self):
        """Triggered by 'Run' button - just one line!"""
        self.runner.run(
            process_image,
            self.files,
            model=self.model,
            output_dir=self.output_dir,
            log_file=self.output_dir / "batch.log",
        )

Using @thread_worker directly

For more control, use napari's @thread_worker with the @batch decorator:

from napari.qt.threading import thread_worker
from nbatch import batch, batch_logger

@batch(with_context=True, on_error='continue')
def process_image(path, model, output_dir):
    # Your processing logic
    result = model.predict(load_image(path))
    save_result(result, output_dir / path.name)
    return result

# In your widget
def run_batch(self):
    @thread_worker
    def _run():
        with batch_logger(log_file=self.output_dir / 'log.txt') as log:
            for result, ctx in process_image(
                self.input_dir,
                model=self.model,
                output_dir=self.output_dir,
            ):
                log(ctx, f"Processed: {ctx.item.name}")
                yield ctx  # Enables progress updates
    
    worker = _run()
    worker.yielded.connect(
        lambda ctx: self.progress_bar.setValue(int(ctx.progress * 100))
    )
    worker.start()

API Reference

@batch Decorator

@batch(
    on_error: Literal['raise', 'continue', 'skip'] = 'raise',
    with_context: bool = False,
    patterns: str | Sequence[str] = '*',
    recursive: bool = False,
)

BatchContext

@dataclass(frozen=True)
class BatchContext:
    index: int      # Zero-based index
    total: int      # Total items
    item: Any       # Current item
    
    @property
    def progress(self) -> float: ...  # (index + 1) / total
    @property
    def is_first(self) -> bool: ...   # index == 0
    @property
    def is_last(self) -> bool: ...    # index == total - 1

discover_files()

def discover_files(
    source: str | Path | Iterable[str | Path],
    patterns: str | Sequence[str] = '*',
    recursive: bool = False,
) -> list[Path]: ...

batch_logger

@contextmanager
def batch_logger(
    log_file: str | Path | None = None,  # Optional file path
    header: Mapping[str, object] | None = None,  # Metadata to write at start
    level: int = logging.INFO,
    console: bool = True,  # Output to stderr
    file_mode: Literal['w', 'a'] = 'a',  # Append by default
) -> Generator[BatchLogger, None, None]: ...

BatchRunner

class BatchRunner:
    def __init__(
        self,
        on_start: Callable[[int], None] | None = None,
        on_item_complete: Callable[[Any, BatchContext], None] | None = None,
        on_complete: Callable[[], None] | None = None,
        on_error: Callable[[BatchContext, Exception], None] | None = None,
        on_cancel: Callable[[], None] | None = None,
    ): ...
    
    def run(
        self,
        func: Callable,
        items: Any,
        *args,
        threaded: bool = True,
        log_file: str | Path | None = None,
        log_header: Mapping[str, object] | None = None,
        patterns: str | Sequence[str] = '*',
        recursive: bool = False,
        **kwargs,  # Passed to func!
    ) -> None: ...
    
    def cancel(self) -> None: ...
    
    @property
    def is_running(self) -> bool: ...
    
    @property
    def was_cancelled(self) -> bool: ...
    
    @property
    def error_count(self) -> int: ...  # Errors in current/last batch

Contributing

Contributions are welcome! Please ensure tests pass before submitting a pull request:

pytest --cov=src/nbatch

License

Distributed under the terms of the BSD-3 license.

Part of ndev-kit

nbatch is part of the ndev-kit ecosystem for no-code bioimage analysis in napari.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nbatch-0.0.3.tar.gz (35.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nbatch-0.0.3-py3-none-any.whl (20.1 kB view details)

Uploaded Python 3

File details

Details for the file nbatch-0.0.3.tar.gz.

File metadata

  • Download URL: nbatch-0.0.3.tar.gz
  • Upload date:
  • Size: 35.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for nbatch-0.0.3.tar.gz
Algorithm Hash digest
SHA256 da63e6c8de1d75fad3a87e815ef3f9d0446fb00c6eb432af850c948cec10f542
MD5 8148b8399924cd6edd28e4e77f973db0
BLAKE2b-256 07cf384fddb5bc04e527a71af48d590c38bd53f4a7fee7f22dd5eca95e0c6658

See more details on using hashes here.

Provenance

The following attestation bundles were made for nbatch-0.0.3.tar.gz:

Publisher: test_and_deploy.yml on ndev-kit/nbatch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file nbatch-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: nbatch-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 20.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for nbatch-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 2673b77d171e0b56a884e5683d7c52b41e9ca0b6708551ab92cfba1968fc4aed
MD5 172136974df806329ad1df8758fca306
BLAKE2b-256 087a1d9ea724d362e875aa99667d21b6699294b1302b27167a2b1ae99771fb2f

See more details on using hashes here.

Provenance

The following attestation bundles were made for nbatch-0.0.3-py3-none-any.whl:

Publisher: test_and_deploy.yml on ndev-kit/nbatch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page