Skip to main content

Flexible batch processing utilities

Project description

nbatch

License BSD-3 PyPI Python Version tests codecov napari hub npe2 Copier

Lightweight batch processing utilities for the ndev-kit ecosystem.

nbatch provides a foundation for batch processing operations. It's designed to work seamlessly with napari plugins but has no napari or Qt dependencies.

Features

  • @batch decorator - Transform single-item functions into batch-capable functions
  • BatchContext - Track progress through batch operations
  • BatchRunner - Orchestrate batch operations with threading, progress callbacks, and cancellation
  • discover_files() - Flexible file discovery with natural sorting (like file explorers)
  • batch_logger - Scoped logging for batch operations with headers/footers
  • Minimal dependencies - Only requires natsort for natural file ordering
  • Optional napari integration - Uses napari's threading when available, falls back to standard threads

Installation

pip install nbatch

For development:

pip install -e . --group dev

Quick Start

Basic Batch Processing

The @batch decorator transforms a function that processes a single item into one that handles both single items and batches:

from pathlib import Path
from nbatch import batch

@batch
def process_image(path: Path) -> str:
    # Your processing logic here
    return path.stem.upper()

# Single item - returns result directly
result = process_image(Path("image.tif"))
# Returns: "IMAGE"

# List of items - returns generator
results = process_image([Path("a.tif"), Path("b.tif")])
list(results)
# Returns: ["A", "B"]

# Directory - discovers files and returns generator
results = process_image(Path("/data/images"))
# Processes all files in directory

Progress Tracking

Use with_context=True to get progress information:

@batch(with_context=True)
def process_image(path: Path) -> str:
    return path.stem

for result, ctx in process_image(files):
    print(f"{ctx.progress:.0%} complete: {result}")
    # 10% complete: image1
    # 20% complete: image2
    # ...

The BatchContext provides:

  • ctx.index - Zero-based index of current item
  • ctx.total - Total number of items
  • ctx.item - The current item being processed
  • ctx.progress - Progress as fraction (0.0 to 1.0)
  • ctx.is_first / ctx.is_last - Boolean flags

Error Handling

Control how errors are handled with on_error:

# 'raise' (default) - Re-raise exceptions immediately
@batch(on_error='raise')
def strict_process(path): ...

# 'continue' - Log error and yield None for failed items
@batch(on_error='continue')
def lenient_process(path): ...
# Results: ["good", None, "ok"]

# 'skip' - Log error and skip failed items entirely
@batch(on_error='skip')
def skip_errors(path): ...
# Results: ["good", "ok"]

File Discovery

Control which files are processed:

# Custom glob patterns
@batch(patterns='*.tif')
def process_tiffs(path): ...

# Multiple patterns
@batch(patterns=['*.tif', '*.tiff', '*.png'])
def process_images(path): ...

# Non-recursive (top-level only)
@batch(recursive=False)
def process_top_level(path): ...

Or use discover_files() directly:

from nbatch import discover_files

# From directory with patterns
files = discover_files("/data/images", patterns=["*.tif", "*.png"])

# From explicit list
files = discover_files([path1, path2, path3])

Logging

Use batch_logger for structured logging. By default, it outputs to the console (stderr). Optionally log to a file:

from nbatch import batch, batch_logger

@batch(with_context=True)
def process(path):
    return path.stem

# Console only (default)
with batch_logger() as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")

# With file logging (appends by default)
with batch_logger(log_file="output/process.log", header={"Files": 100}) as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")
        # Or use log.info(), log.warning(), log.error()

# File only (no console output)
with batch_logger(log_file="output/quiet.log", console=False) as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")

Log file output:

============================================================
Batch processing started at 2025-01-29 10:30:00
------------------------------------------------------------
Files: 100
============================================================
2025-01-29 10:30:01 - INFO - [1/100] image1.tif - Processed: image1
2025-01-29 10:30:02 - INFO - [2/100] image2.tif - Processed: image2
...
============================================================
Batch processing completed at 2025-01-29 10:35:00
============================================================

Integration with napari

Using BatchRunner (Recommended)

BatchRunner provides clean orchestration for widgets with threading, progress callbacks, and cancellation:

from nbatch import batch, BatchRunner

# Define your processing function (pure, testable)
@batch(on_error='continue')
def process_image(path, model, output_dir):
    result = model.predict(load_image(path))
    save_result(result, output_dir / path.name)
    return result

# In your widget class
class MyWidget:
    def __init__(self, viewer):
        self._viewer = viewer
        
        # Create runner once - reusable for all batches
        self.runner = BatchRunner(
            on_start=self._on_batch_start,
            on_item_complete=self._on_item_complete,
            on_complete=self._on_batch_complete,
            on_error=self._on_item_error,
            on_cancel=self._on_cancelled,
        )
        
        self._run_button.clicked.connect(self.run_batch)
        self._cancel_button.clicked.connect(self.runner.cancel)
    
    def _on_batch_start(self, total):
        """Called when batch starts with total item count."""
        self._progress_bar.setValue(0)
        self._progress_bar.setMaximum(total)
    
    def _on_item_complete(self, result, ctx):
        """Called after each item completes."""
        self._progress_bar.setValue(ctx.index + 1)
        # Optionally add result to viewer
        if result is not None:
            self._viewer.add_image(result, name=f"Result {ctx.index}")
    
    def _on_batch_complete(self):
        errors = self.runner.error_count
        if errors > 0:
            self._progress_bar.label = f"Done with {errors} errors"
        else:
            self._progress_bar.label = "Complete!"
    
    def _on_item_error(self, ctx, exception):
        self._progress_bar.label = f"Error on {ctx.item.name}"
    
    def _on_cancelled(self):
        self._progress_bar.label = "Cancelled"
    
    def run_batch(self):
        """Triggered by 'Run' button - just one line!"""
        self.runner.run(
            process_image,
            self.files,
            model=self.model,
            output_dir=self.output_dir,
            log_file=self.output_dir / "batch.log",
        )

Using @thread_worker directly

For more control, use napari's @thread_worker with the @batch decorator:

from napari.qt.threading import thread_worker
from nbatch import batch, batch_logger

@batch(with_context=True, on_error='continue')
def process_image(path, model, output_dir):
    # Your processing logic
    result = model.predict(load_image(path))
    save_result(result, output_dir / path.name)
    return result

# In your widget
def run_batch(self):
    @thread_worker
    def _run():
        with batch_logger(log_file=self.output_dir / 'log.txt') as log:
            for result, ctx in process_image(
                self.input_dir,
                model=self.model,
                output_dir=self.output_dir,
            ):
                log(ctx, f"Processed: {ctx.item.name}")
                yield ctx  # Enables progress updates
    
    worker = _run()
    worker.yielded.connect(
        lambda ctx: self.progress_bar.setValue(int(ctx.progress * 100))
    )
    worker.start()

API Reference

@batch Decorator

@batch(
    on_error: Literal['raise', 'continue', 'skip'] = 'raise',
    with_context: bool = False,
    patterns: str | Sequence[str] = '*',
    recursive: bool = False,
)

BatchContext

@dataclass(frozen=True)
class BatchContext:
    index: int      # Zero-based index
    total: int      # Total items
    item: Any       # Current item
    
    @property
    def progress(self) -> float: ...  # (index + 1) / total
    @property
    def is_first(self) -> bool: ...   # index == 0
    @property
    def is_last(self) -> bool: ...    # index == total - 1

discover_files()

def discover_files(
    source: str | Path | Iterable[str | Path],
    patterns: str | Sequence[str] = '*',
    recursive: bool = False,
) -> list[Path]: ...

batch_logger

@contextmanager
def batch_logger(
    log_file: str | Path | None = None,  # Optional file path
    header: Mapping[str, object] | None = None,  # Metadata to write at start
    level: int = logging.INFO,
    console: bool = True,  # Output to stderr
    file_mode: Literal['w', 'a'] = 'a',  # Append by default
) -> Generator[BatchLogger, None, None]: ...

BatchRunner

class BatchRunner:
    def __init__(
        self,
        on_start: Callable[[int], None] | None = None,
        on_item_complete: Callable[[Any, BatchContext], None] | None = None,
        on_complete: Callable[[], None] | None = None,
        on_error: Callable[[BatchContext, Exception], None] | None = None,
        on_cancel: Callable[[], None] | None = None,
    ): ...
    
    def run(
        self,
        func: Callable,
        items: Any,
        *args,
        threaded: bool = True,
        log_file: str | Path | None = None,
        log_header: Mapping[str, object] | None = None,
        patterns: str | Sequence[str] = '*',
        recursive: bool = False,
        **kwargs,  # Passed to func!
    ) -> None: ...
    
    def cancel(self) -> None: ...
    
    @property
    def is_running(self) -> bool: ...
    
    @property
    def was_cancelled(self) -> bool: ...
    
    @property
    def error_count(self) -> int: ...  # Errors in current/last batch

Contributing

Contributions are welcome! Please ensure tests pass before submitting a pull request:

pytest --cov=src/nbatch

License

Distributed under the terms of the BSD-3 license.

Part of ndev-kit

nbatch is part of the ndev-kit ecosystem for no-code bioimage analysis in napari.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nbatch-0.0.5.tar.gz (35.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nbatch-0.0.5-py3-none-any.whl (20.0 kB view details)

Uploaded Python 3

File details

Details for the file nbatch-0.0.5.tar.gz.

File metadata

  • Download URL: nbatch-0.0.5.tar.gz
  • Upload date:
  • Size: 35.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for nbatch-0.0.5.tar.gz
Algorithm Hash digest
SHA256 70781cbe1ba2f99e28f346043b6d51b5c5cab9ec3cb0e210260bce7dd9867deb
MD5 f6f279fcfda77b1c822c70af43b45f97
BLAKE2b-256 7ad5107bd0bbf8ea645dcb3b70493ebe264f00404ac3a71fa6ab405506b95efc

See more details on using hashes here.

Provenance

The following attestation bundles were made for nbatch-0.0.5.tar.gz:

Publisher: test_and_deploy.yml on ndev-kit/nbatch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file nbatch-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: nbatch-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 20.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for nbatch-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 7027b4ce9d41d44508578ebda0834dabaf2c58650dcd9f1ac6b287551ff7c0ef
MD5 43a540c49ef4a9831dd6a56589dfba1f
BLAKE2b-256 e398b2a1527fb0c15e6da4f158010e9c0f908ec1b3bbf45afc2dad16d7ffc357

See more details on using hashes here.

Provenance

The following attestation bundles were made for nbatch-0.0.5-py3-none-any.whl:

Publisher: test_and_deploy.yml on ndev-kit/nbatch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page