Skip to main content

Flexible batch processing utilities

Project description

nbatch

License BSD-3 PyPI Python Version tests codecov napari hub npe2 Copier

Lightweight batch processing utilities for the ndev-kit ecosystem.

nbatch provides a foundation for batch processing operations. It's designed to work seamlessly with napari plugins but has no napari or Qt dependencies.

Features

  • @batch decorator - Transform single-item functions into batch-capable functions
  • BatchContext - Track progress through batch operations
  • BatchRunner - Orchestrate batch operations with threading, progress callbacks, and cancellation
  • discover_files() - Flexible file discovery with natural sorting (like file explorers)
  • batch_logger - Scoped logging for batch operations with headers/footers
  • Minimal dependencies - Only requires natsort for natural file ordering
  • Optional napari integration - Uses napari's threading when available, falls back to standard threads

Installation

pip install nbatch

For development:

pip install -e . --group dev

Quick Start

Basic Batch Processing

The @batch decorator transforms a function that processes a single item into one that handles both single items and batches:

from pathlib import Path
from nbatch import batch

@batch
def process_image(path: Path) -> str:
    # Your processing logic here
    return path.stem.upper()

# Single item - returns result directly
result = process_image(Path("image.tif"))
# Returns: "IMAGE"

# List of items - returns generator
results = process_image([Path("a.tif"), Path("b.tif")])
list(results)
# Returns: ["A", "B"]

# Directory - discovers files and returns generator
results = process_image(Path("/data/images"))
# Processes all files in directory

Progress Tracking

Use with_context=True to get progress information:

@batch(with_context=True)
def process_image(path: Path) -> str:
    return path.stem

for result, ctx in process_image(files):
    print(f"{ctx.progress:.0%} complete: {result}")
    # 10% complete: image1
    # 20% complete: image2
    # ...

The BatchContext provides:

  • ctx.index - Zero-based index of current item
  • ctx.total - Total number of items
  • ctx.item - The current item being processed
  • ctx.progress - Progress as fraction (0.0 to 1.0)
  • ctx.is_first / ctx.is_last - Boolean flags

Error Handling

Control how errors are handled with on_error:

# 'raise' (default) - Re-raise exceptions immediately
@batch(on_error='raise')
def strict_process(path): ...

# 'continue' - Log error and yield None for failed items
@batch(on_error='continue')
def lenient_process(path): ...
# Results: ["good", None, "ok"]

# 'skip' - Log error and skip failed items entirely
@batch(on_error='skip')
def skip_errors(path): ...
# Results: ["good", "ok"]

File Discovery

Control which files are processed:

# Custom glob patterns
@batch(patterns='*.tif')
def process_tiffs(path): ...

# Multiple patterns
@batch(patterns=['*.tif', '*.tiff', '*.png'])
def process_images(path): ...

# Non-recursive (top-level only)
@batch(recursive=False)
def process_top_level(path): ...

Or use discover_files() directly:

from nbatch import discover_files

# From directory with patterns
files = discover_files("/data/images", patterns=["*.tif", "*.png"])

# From explicit list
files = discover_files([path1, path2, path3])

Logging

Use batch_logger for structured logging. By default, it outputs to the console (stderr). Optionally log to a file:

from nbatch import batch, batch_logger

@batch(with_context=True)
def process(path):
    return path.stem

# Console only (default)
with batch_logger() as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")

# With file logging (appends by default)
with batch_logger(log_file="output/process.log", header={"Files": 100}) as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")
        # Or use log.info(), log.warning(), log.error()

# File only (no console output)
with batch_logger(log_file="output/quiet.log", console=False) as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")

Log file output:

============================================================
Batch processing started at 2025-01-29 10:30:00
------------------------------------------------------------
Files: 100
============================================================
2025-01-29 10:30:01 - INFO - [1/100] image1.tif - Processed: image1
2025-01-29 10:30:02 - INFO - [2/100] image2.tif - Processed: image2
...
============================================================
Batch processing completed at 2025-01-29 10:35:00
============================================================

Integration with napari

Using BatchRunner (Recommended)

BatchRunner provides clean orchestration for widgets with threading, progress callbacks, and cancellation:

from nbatch import batch, BatchRunner

# Define your processing function (pure, testable)
@batch(on_error='continue')
def process_image(path, model, output_dir):
    result = model.predict(load_image(path))
    save_result(result, output_dir / path.name)
    return result

# In your widget class
class MyWidget:
    def __init__(self, viewer):
        self._viewer = viewer
        
        # Create runner once - reusable for all batches
        self.runner = BatchRunner(
            on_item_complete=self._on_item_complete,
            on_complete=self._on_batch_complete,
            on_error=self._on_item_error,
            on_cancel=self._on_cancelled,
        )
        
        self._run_button.clicked.connect(self.run_batch)
        self._cancel_button.clicked.connect(self.runner.cancel)
    
    def _on_item_complete(self, result, ctx):
        """Called after each item completes."""
        self._progress_bar.setValue(ctx.index + 1)
        # Optionally add result to viewer
        if result is not None:
            self._viewer.add_image(result, name=f"Result {ctx.index}")
    
    def _on_batch_complete(self):
        self._progress_bar.label = "Complete!"
    
    def _on_item_error(self, ctx, exception):
        self._progress_bar.label = f"Error on {ctx.item.name}"
    
    def _on_cancelled(self):
        self._progress_bar.label = "Cancelled"
    
    def run_batch(self):
        """Triggered by 'Run' button - just one line!"""
        self._progress_bar.max = len(self.files)
        self.runner.run(
            process_image,
            self.files,
            model=self.model,
            output_dir=self.output_dir,
            log_file=self.output_dir / "batch.log",
        )

Using @thread_worker directly

For more control, use napari's @thread_worker with the @batch decorator:

from napari.qt.threading import thread_worker
from nbatch import batch, batch_logger

@batch(with_context=True, on_error='continue')
def process_image(path, model, output_dir):
    # Your processing logic
    result = model.predict(load_image(path))
    save_result(result, output_dir / path.name)
    return result

# In your widget
def run_batch(self):
    @thread_worker
    def _run():
        with batch_logger(log_file=self.output_dir / 'log.txt') as log:
            for result, ctx in process_image(
                self.input_dir,
                model=self.model,
                output_dir=self.output_dir,
            ):
                log(ctx, f"Processed: {ctx.item.name}")
                yield ctx  # Enables progress updates
    
    worker = _run()
    worker.yielded.connect(
        lambda ctx: self.progress_bar.setValue(int(ctx.progress * 100))
    )
    worker.start()

API Reference

@batch Decorator

@batch(
    on_error: Literal['raise', 'continue', 'skip'] = 'raise',
    with_context: bool = False,
    patterns: str | Sequence[str] = '*',
    recursive: bool = False,
)

BatchContext

@dataclass(frozen=True)
class BatchContext:
    index: int      # Zero-based index
    total: int      # Total items
    item: Any       # Current item
    
    @property
    def progress(self) -> float: ...  # (index + 1) / total
    @property
    def is_first(self) -> bool: ...   # index == 0
    @property
    def is_last(self) -> bool: ...    # index == total - 1

discover_files()

def discover_files(
    source: str | Path | Iterable[str | Path],
    patterns: str | Sequence[str] = '*',
    recursive: bool = False,
) -> list[Path]: ...

batch_logger

@contextmanager
def batch_logger(
    log_file: str | Path | None = None,  # Optional file path
    header: Mapping[str, object] | None = None,  # Metadata to write at start
    level: int = logging.INFO,
    console: bool = True,  # Output to stderr
    file_mode: Literal['w', 'a'] = 'a',  # Append by default
) -> Generator[BatchLogger, None, None]: ...

BatchRunner

class BatchRunner:
    def __init__(
        self,
        on_item_complete: Callable[[Any, BatchContext], None] | None = None,
        on_complete: Callable[[], None] | None = None,
        on_error: Callable[[BatchContext, Exception], None] | None = None,
        on_cancel: Callable[[], None] | None = None,
    ): ...
    
    def run(
        self,
        func: Callable,
        items: Any,
        *args,
        threaded: bool = True,
        log_file: str | Path | None = None,
        log_header: Mapping[str, object] | None = None,
        patterns: str | Sequence[str] = '*',
        recursive: bool = False,
        **kwargs,
    ) -> None: ...
    
    def cancel(self) -> None: ...
    
    @property
    def is_running(self) -> bool: ...
    
    @property
    def was_cancelled(self) -> bool: ...

Contributing

Contributions are welcome! Please ensure tests pass before submitting a pull request:

pytest --cov=src/nbatch

License

Distributed under the terms of the BSD-3 license.

Part of ndev-kit

nbatch is part of the ndev-kit ecosystem for no-code bioimage analysis in napari.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nbatch-0.0.1rc0.tar.gz (34.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nbatch-0.0.1rc0-py3-none-any.whl (19.7 kB view details)

Uploaded Python 3

File details

Details for the file nbatch-0.0.1rc0.tar.gz.

File metadata

  • Download URL: nbatch-0.0.1rc0.tar.gz
  • Upload date:
  • Size: 34.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for nbatch-0.0.1rc0.tar.gz
Algorithm Hash digest
SHA256 7e604876f526d9cb41dbe70b6f60820c7a0e9c8597ea70c13ee3a2c9c41a4aa2
MD5 4b52cc64110867df336b0c3ee3ba0bcd
BLAKE2b-256 176279a1cdc274bb3ba1149bc2f80956c432bfa7e0c7a26d2f29ab5c35dd60ee

See more details on using hashes here.

Provenance

The following attestation bundles were made for nbatch-0.0.1rc0.tar.gz:

Publisher: test_and_deploy.yml on ndev-kit/nbatch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file nbatch-0.0.1rc0-py3-none-any.whl.

File metadata

  • Download URL: nbatch-0.0.1rc0-py3-none-any.whl
  • Upload date:
  • Size: 19.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for nbatch-0.0.1rc0-py3-none-any.whl
Algorithm Hash digest
SHA256 08ebd0e1e41264294baac87cddf5ae3c760ed66160390a9a2faeb57779be69fd
MD5 936dedaf6c4fa1c4b95d530e72b14b90
BLAKE2b-256 594bd463fe2932c448daf153c8b82d9cbb9cf3c4dd8c364d22c180bf76b4811e

See more details on using hashes here.

Provenance

The following attestation bundles were made for nbatch-0.0.1rc0-py3-none-any.whl:

Publisher: test_and_deploy.yml on ndev-kit/nbatch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page