Skip to main content

Flexible batch processing utilities

Project description

nbatch

License BSD-3 PyPI Python Version tests codecov napari hub npe2 Copier

Lightweight batch processing utilities for the ndev-kit ecosystem.

nbatch provides a foundation for batch processing operations. It's designed to work seamlessly with napari plugins but has no napari or Qt dependencies.

Features

  • @batch decorator - Transform single-item functions into batch-capable functions
  • BatchContext - Track progress through batch operations
  • BatchRunner - Orchestrate batch operations with threading, progress callbacks, and cancellation
  • discover_files() - Flexible file discovery with natural sorting (like file explorers)
  • batch_logger - Scoped logging for batch operations with headers/footers
  • Minimal dependencies - Only requires natsort for natural file ordering
  • Optional napari integration - Uses napari's threading when available, falls back to standard threads

Installation

pip install nbatch

For development:

pip install -e . --group dev

Quick Start

Basic Batch Processing

The @batch decorator transforms a function that processes a single item into one that handles both single items and batches:

from pathlib import Path
from nbatch import batch

@batch
def process_image(path: Path) -> str:
    # Your processing logic here
    return path.stem.upper()

# Single item - returns result directly
result = process_image(Path("image.tif"))
# Returns: "IMAGE"

# List of items - returns generator
results = process_image([Path("a.tif"), Path("b.tif")])
list(results)
# Returns: ["A", "B"]

# Directory - discovers files and returns generator
results = process_image(Path("/data/images"))
# Processes all files in directory

Progress Tracking

Use with_context=True to get progress information:

@batch(with_context=True)
def process_image(path: Path) -> str:
    return path.stem

for result, ctx in process_image(files):
    print(f"{ctx.progress:.0%} complete: {result}")
    # 10% complete: image1
    # 20% complete: image2
    # ...

The BatchContext provides:

  • ctx.index - Zero-based index of current item
  • ctx.total - Total number of items
  • ctx.item - The current item being processed
  • ctx.progress - Progress as fraction (0.0 to 1.0)
  • ctx.is_first / ctx.is_last - Boolean flags

Error Handling

Control how errors are handled with on_error:

# 'raise' (default) - Re-raise exceptions immediately
@batch(on_error='raise')
def strict_process(path): ...

# 'continue' - Log error and yield None for failed items
@batch(on_error='continue')
def lenient_process(path): ...
# Results: ["good", None, "ok"]

# 'skip' - Log error and skip failed items entirely
@batch(on_error='skip')
def skip_errors(path): ...
# Results: ["good", "ok"]

File Discovery

Control which files are processed:

# Custom glob patterns
@batch(patterns='*.tif')
def process_tiffs(path): ...

# Multiple patterns
@batch(patterns=['*.tif', '*.tiff', '*.png'])
def process_images(path): ...

# Non-recursive (top-level only)
@batch(recursive=False)
def process_top_level(path): ...

Or use discover_files() directly:

from nbatch import discover_files

# From directory with patterns
files = discover_files("/data/images", patterns=["*.tif", "*.png"])

# From explicit list
files = discover_files([path1, path2, path3])

Logging

Use batch_logger for structured logging. By default, it outputs to the console (stderr). Optionally log to a file:

from nbatch import batch, batch_logger

@batch(with_context=True)
def process(path):
    return path.stem

# Console only (default)
with batch_logger() as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")

# With file logging (appends by default)
with batch_logger(log_file="output/process.log", header={"Files": 100}) as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")
        # Or use log.info(), log.warning(), log.error()

# File only (no console output)
with batch_logger(log_file="output/quiet.log", console=False) as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")

Log file output:

============================================================
Batch processing started at 2025-01-29 10:30:00
------------------------------------------------------------
Files: 100
============================================================
2025-01-29 10:30:01 - INFO - [1/100] image1.tif - Processed: image1
2025-01-29 10:30:02 - INFO - [2/100] image2.tif - Processed: image2
...
============================================================
Batch processing completed at 2025-01-29 10:35:00
============================================================

Integration with napari

Using BatchRunner (Recommended)

BatchRunner provides clean orchestration for widgets with threading, progress callbacks, and cancellation:

from nbatch import batch, BatchRunner

# Define your processing function (pure, testable)
@batch(on_error='continue')
def process_image(path, model, output_dir):
    result = model.predict(load_image(path))
    save_result(result, output_dir / path.name)
    return result

# In your widget class
class MyWidget:
    def __init__(self, viewer):
        self._viewer = viewer
        
        # Create runner once - reusable for all batches
        self.runner = BatchRunner(
            on_item_complete=self._on_item_complete,
            on_complete=self._on_batch_complete,
            on_error=self._on_item_error,
            on_cancel=self._on_cancelled,
        )
        
        self._run_button.clicked.connect(self.run_batch)
        self._cancel_button.clicked.connect(self.runner.cancel)
    
    def _on_item_complete(self, result, ctx):
        """Called after each item completes."""
        self._progress_bar.setValue(ctx.index + 1)
        # Optionally add result to viewer
        if result is not None:
            self._viewer.add_image(result, name=f"Result {ctx.index}")
    
    def _on_batch_complete(self):
        self._progress_bar.label = "Complete!"
    
    def _on_item_error(self, ctx, exception):
        self._progress_bar.label = f"Error on {ctx.item.name}"
    
    def _on_cancelled(self):
        self._progress_bar.label = "Cancelled"
    
    def run_batch(self):
        """Triggered by 'Run' button - just one line!"""
        self._progress_bar.max = len(self.files)
        self.runner.run(
            process_image,
            self.files,
            model=self.model,
            output_dir=self.output_dir,
            log_file=self.output_dir / "batch.log",
        )

Using @thread_worker directly

For more control, use napari's @thread_worker with the @batch decorator:

from napari.qt.threading import thread_worker
from nbatch import batch, batch_logger

@batch(with_context=True, on_error='continue')
def process_image(path, model, output_dir):
    # Your processing logic
    result = model.predict(load_image(path))
    save_result(result, output_dir / path.name)
    return result

# In your widget
def run_batch(self):
    @thread_worker
    def _run():
        with batch_logger(log_file=self.output_dir / 'log.txt') as log:
            for result, ctx in process_image(
                self.input_dir,
                model=self.model,
                output_dir=self.output_dir,
            ):
                log(ctx, f"Processed: {ctx.item.name}")
                yield ctx  # Enables progress updates
    
    worker = _run()
    worker.yielded.connect(
        lambda ctx: self.progress_bar.setValue(int(ctx.progress * 100))
    )
    worker.start()

API Reference

@batch Decorator

@batch(
    on_error: Literal['raise', 'continue', 'skip'] = 'raise',
    with_context: bool = False,
    patterns: str | Sequence[str] = '*',
    recursive: bool = False,
)

BatchContext

@dataclass(frozen=True)
class BatchContext:
    index: int      # Zero-based index
    total: int      # Total items
    item: Any       # Current item
    
    @property
    def progress(self) -> float: ...  # (index + 1) / total
    @property
    def is_first(self) -> bool: ...   # index == 0
    @property
    def is_last(self) -> bool: ...    # index == total - 1

discover_files()

def discover_files(
    source: str | Path | Iterable[str | Path],
    patterns: str | Sequence[str] = '*',
    recursive: bool = False,
) -> list[Path]: ...

batch_logger

@contextmanager
def batch_logger(
    log_file: str | Path | None = None,  # Optional file path
    header: Mapping[str, object] | None = None,  # Metadata to write at start
    level: int = logging.INFO,
    console: bool = True,  # Output to stderr
    file_mode: Literal['w', 'a'] = 'a',  # Append by default
) -> Generator[BatchLogger, None, None]: ...

BatchRunner

class BatchRunner:
    def __init__(
        self,
        on_item_complete: Callable[[Any, BatchContext], None] | None = None,
        on_complete: Callable[[], None] | None = None,
        on_error: Callable[[BatchContext, Exception], None] | None = None,
        on_cancel: Callable[[], None] | None = None,
    ): ...
    
    def run(
        self,
        func: Callable,
        items: Any,
        *args,
        threaded: bool = True,
        log_file: str | Path | None = None,
        log_header: Mapping[str, object] | None = None,
        patterns: str | Sequence[str] = '*',
        recursive: bool = False,
        **kwargs,
    ) -> None: ...
    
    def cancel(self) -> None: ...
    
    @property
    def is_running(self) -> bool: ...
    
    @property
    def was_cancelled(self) -> bool: ...

Contributing

Contributions are welcome! Please ensure tests pass before submitting a pull request:

pytest --cov=src/nbatch

License

Distributed under the terms of the BSD-3 license.

Part of ndev-kit

nbatch is part of the ndev-kit ecosystem for no-code bioimage analysis in napari.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nbatch-0.0.1.tar.gz (34.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nbatch-0.0.1-py3-none-any.whl (19.7 kB view details)

Uploaded Python 3

File details

Details for the file nbatch-0.0.1.tar.gz.

File metadata

  • Download URL: nbatch-0.0.1.tar.gz
  • Upload date:
  • Size: 34.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for nbatch-0.0.1.tar.gz
Algorithm Hash digest
SHA256 44b76e78425cf532694413689a9879cac8ee6670c43a8a3aa78b0bef856b5292
MD5 190231ca8f4768ce66fd4fbb7df0e7af
BLAKE2b-256 e16fb362f5bc25d11c91b8d3eb010dbe238b80aaee9d8b269dd4e8d3ce5c0b37

See more details on using hashes here.

Provenance

The following attestation bundles were made for nbatch-0.0.1.tar.gz:

Publisher: test_and_deploy.yml on ndev-kit/nbatch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file nbatch-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: nbatch-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 19.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for nbatch-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 00604e4bf18069a729d1f9cd8a3bcd8a142f585d9a57c41b7c846552c606a22d
MD5 678be1d1ebbef650dabb8e7ad2bc19a9
BLAKE2b-256 28c51e98b998defb249f55c34a09b9c4b09d7e67eda5ec8d619312080c91c13c

See more details on using hashes here.

Provenance

The following attestation bundles were made for nbatch-0.0.1-py3-none-any.whl:

Publisher: test_and_deploy.yml on ndev-kit/nbatch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page