Flexible batch processing utilities
Project description
nbatch
Lightweight batch processing utilities for the ndev-kit ecosystem.
nbatch provides a foundation for batch processing operations. It's designed to work seamlessly with napari plugins but has no napari or Qt dependencies.
Features
@batchdecorator - Transform single-item functions into batch-capable functionsBatchContext- Track progress through batch operationsBatchRunner- Orchestrate batch operations with threading, progress callbacks, and cancellationdiscover_files()- Flexible file discovery with natural sorting (like file explorers)batch_logger- Scoped logging for batch operations with headers/footers- Minimal dependencies - Only requires natsort for natural file ordering
- Optional napari integration - Uses napari's threading when available, falls back to standard threads
Installation
pip install nbatch
For development:
pip install -e . --group dev
Quick Start
Basic Batch Processing
The @batch decorator transforms a function that processes a single item into one that handles both single items and batches:
from pathlib import Path
from nbatch import batch
@batch
def process_image(path: Path) -> str:
# Your processing logic here
return path.stem.upper()
# Single item - returns result directly
result = process_image(Path("image.tif"))
# Returns: "IMAGE"
# List of items - returns generator
results = process_image([Path("a.tif"), Path("b.tif")])
list(results)
# Returns: ["A", "B"]
# Directory - discovers files and returns generator
results = process_image(Path("/data/images"))
# Processes all files in directory
Progress Tracking
Use with_context=True to get progress information:
@batch(with_context=True)
def process_image(path: Path) -> str:
return path.stem
for result, ctx in process_image(files):
print(f"{ctx.progress:.0%} complete: {result}")
# 10% complete: image1
# 20% complete: image2
# ...
The BatchContext provides:
ctx.index- Zero-based index of current itemctx.total- Total number of itemsctx.item- The current item being processedctx.progress- Progress as fraction (0.0 to 1.0)ctx.is_first/ctx.is_last- Boolean flags
Error Handling
Control how errors are handled with on_error:
# 'raise' (default) - Re-raise exceptions immediately
@batch(on_error='raise')
def strict_process(path): ...
# 'continue' - Log error and yield None for failed items
@batch(on_error='continue')
def lenient_process(path): ...
# Results: ["good", None, "ok"]
# 'skip' - Log error and skip failed items entirely
@batch(on_error='skip')
def skip_errors(path): ...
# Results: ["good", "ok"]
File Discovery
Control which files are processed:
# Custom glob patterns
@batch(patterns='*.tif')
def process_tiffs(path): ...
# Multiple patterns
@batch(patterns=['*.tif', '*.tiff', '*.png'])
def process_images(path): ...
# Non-recursive (top-level only)
@batch(recursive=False)
def process_top_level(path): ...
Or use discover_files() directly:
from nbatch import discover_files
# From directory with patterns
files = discover_files("/data/images", patterns=["*.tif", "*.png"])
# From explicit list
files = discover_files([path1, path2, path3])
Logging
Use batch_logger for structured logging. By default, it outputs to the console (stderr). Optionally log to a file:
from nbatch import batch, batch_logger
@batch(with_context=True)
def process(path):
return path.stem
# Console only (default)
with batch_logger() as log:
for result, ctx in process(files):
log(ctx, f"Processed: {result}")
# With file logging (appends by default)
with batch_logger(log_file="output/process.log", header={"Files": 100}) as log:
for result, ctx in process(files):
log(ctx, f"Processed: {result}")
# Or use log.info(), log.warning(), log.error()
# File only (no console output)
with batch_logger(log_file="output/quiet.log", console=False) as log:
for result, ctx in process(files):
log(ctx, f"Processed: {result}")
Log file output:
============================================================
Batch processing started at 2025-01-29 10:30:00
------------------------------------------------------------
Files: 100
============================================================
2025-01-29 10:30:01 - INFO - [1/100] image1.tif - Processed: image1
2025-01-29 10:30:02 - INFO - [2/100] image2.tif - Processed: image2
...
============================================================
Batch processing completed at 2025-01-29 10:35:00
============================================================
Integration with napari
Using BatchRunner (Recommended)
BatchRunner provides clean orchestration for widgets with threading, progress callbacks, and cancellation:
from nbatch import batch, BatchRunner
# Define your processing function (pure, testable)
@batch(on_error='continue')
def process_image(path, model, output_dir):
result = model.predict(load_image(path))
save_result(result, output_dir / path.name)
return result
# In your widget class
class MyWidget:
def __init__(self, viewer):
self._viewer = viewer
# Create runner once - reusable for all batches
self.runner = BatchRunner(
on_start=self._on_batch_start,
on_item_complete=self._on_item_complete,
on_complete=self._on_batch_complete,
on_error=self._on_item_error,
on_cancel=self._on_cancelled,
)
self._run_button.clicked.connect(self.run_batch)
self._cancel_button.clicked.connect(self.runner.cancel)
def _on_batch_start(self, total):
"""Called when batch starts with total item count."""
self._progress_bar.setValue(0)
self._progress_bar.setMaximum(total)
def _on_item_complete(self, result, ctx):
"""Called after each item completes."""
self._progress_bar.setValue(ctx.index + 1)
# Optionally add result to viewer
if result is not None:
self._viewer.add_image(result, name=f"Result {ctx.index}")
def _on_batch_complete(self):
errors = self.runner.error_count
if errors > 0:
self._progress_bar.label = f"Done with {errors} errors"
else:
self._progress_bar.label = "Complete!"
def _on_item_error(self, ctx, exception):
self._progress_bar.label = f"Error on {ctx.item.name}"
def _on_cancelled(self):
self._progress_bar.label = "Cancelled"
def run_batch(self):
"""Triggered by 'Run' button - just one line!"""
self.runner.run(
process_image,
self.files,
model=self.model,
output_dir=self.output_dir,
log_file=self.output_dir / "batch.log",
)
Using @thread_worker directly
For more control, use napari's @thread_worker with the @batch decorator:
from napari.qt.threading import thread_worker
from nbatch import batch, batch_logger
@batch(with_context=True, on_error='continue')
def process_image(path, model, output_dir):
# Your processing logic
result = model.predict(load_image(path))
save_result(result, output_dir / path.name)
return result
# In your widget
def run_batch(self):
@thread_worker
def _run():
with batch_logger(log_file=self.output_dir / 'log.txt') as log:
for result, ctx in process_image(
self.input_dir,
model=self.model,
output_dir=self.output_dir,
):
log(ctx, f"Processed: {ctx.item.name}")
yield ctx # Enables progress updates
worker = _run()
worker.yielded.connect(
lambda ctx: self.progress_bar.setValue(int(ctx.progress * 100))
)
worker.start()
API Reference
@batch Decorator
@batch(
on_error: Literal['raise', 'continue', 'skip'] = 'raise',
with_context: bool = False,
patterns: str | Sequence[str] = '*',
recursive: bool = False,
)
BatchContext
@dataclass(frozen=True)
class BatchContext:
index: int # Zero-based index
total: int # Total items
item: Any # Current item
@property
def progress(self) -> float: ... # (index + 1) / total
@property
def is_first(self) -> bool: ... # index == 0
@property
def is_last(self) -> bool: ... # index == total - 1
discover_files()
def discover_files(
source: str | Path | Iterable[str | Path],
patterns: str | Sequence[str] = '*',
recursive: bool = False,
) -> list[Path]: ...
batch_logger
@contextmanager
def batch_logger(
log_file: str | Path | None = None, # Optional file path
header: Mapping[str, object] | None = None, # Metadata to write at start
level: int = logging.INFO,
console: bool = True, # Output to stderr
file_mode: Literal['w', 'a'] = 'a', # Append by default
) -> Generator[BatchLogger, None, None]: ...
BatchRunner
class BatchRunner:
def __init__(
self,
on_start: Callable[[int], None] | None = None,
on_item_complete: Callable[[Any, BatchContext], None] | None = None,
on_complete: Callable[[], None] | None = None,
on_error: Callable[[BatchContext, Exception], None] | None = None,
on_cancel: Callable[[], None] | None = None,
): ...
def run(
self,
func: Callable,
items: Any,
*args,
threaded: bool = True,
log_file: str | Path | None = None,
log_header: Mapping[str, object] | None = None,
patterns: str | Sequence[str] = '*',
recursive: bool = False,
**kwargs, # Passed to func!
) -> None: ...
def cancel(self) -> None: ...
@property
def is_running(self) -> bool: ...
@property
def was_cancelled(self) -> bool: ...
@property
def error_count(self) -> int: ... # Errors in current/last batch
Contributing
Contributions are welcome! Please ensure tests pass before submitting a pull request:
pytest --cov=src/nbatch
License
Distributed under the terms of the BSD-3 license.
Part of ndev-kit
nbatch is part of the ndev-kit ecosystem for no-code bioimage analysis in napari.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nbatch-0.0.4.tar.gz.
File metadata
- Download URL: nbatch-0.0.4.tar.gz
- Upload date:
- Size: 35.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
775c47668afedc30de8291614abd2d239ef137b964b191da68195573b2d746a1
|
|
| MD5 |
0a06aa461a12a8769fe28bd01dcda2cc
|
|
| BLAKE2b-256 |
5ca357f11db8eba03b0b1a3f3d72f3a2df2ee44af7f7015e22e8040464548128
|
Provenance
The following attestation bundles were made for nbatch-0.0.4.tar.gz:
Publisher:
test_and_deploy.yml on ndev-kit/nbatch
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
nbatch-0.0.4.tar.gz -
Subject digest:
775c47668afedc30de8291614abd2d239ef137b964b191da68195573b2d746a1 - Sigstore transparency entry: 735981845
- Sigstore integration time:
-
Permalink:
ndev-kit/nbatch@f94c191107735e0f9fb72f9121e09367c3bdc99f -
Branch / Tag:
refs/tags/v0.0.4 - Owner: https://github.com/ndev-kit
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
test_and_deploy.yml@f94c191107735e0f9fb72f9121e09367c3bdc99f -
Trigger Event:
push
-
Statement type:
File details
Details for the file nbatch-0.0.4-py3-none-any.whl.
File metadata
- Download URL: nbatch-0.0.4-py3-none-any.whl
- Upload date:
- Size: 20.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dbb97e7744a5bb3d6f36d14d5ed56fefcdfcf040cf8c35105801fc37c279f899
|
|
| MD5 |
b85e02e681da34deb17e471a39486257
|
|
| BLAKE2b-256 |
40572aaba8ba1a918ccfb63a84424be99658f5bd33fdd5043c28446e91230bd7
|
Provenance
The following attestation bundles were made for nbatch-0.0.4-py3-none-any.whl:
Publisher:
test_and_deploy.yml on ndev-kit/nbatch
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
nbatch-0.0.4-py3-none-any.whl -
Subject digest:
dbb97e7744a5bb3d6f36d14d5ed56fefcdfcf040cf8c35105801fc37c279f899 - Sigstore transparency entry: 735981854
- Sigstore integration time:
-
Permalink:
ndev-kit/nbatch@f94c191107735e0f9fb72f9121e09367c3bdc99f -
Branch / Tag:
refs/tags/v0.0.4 - Owner: https://github.com/ndev-kit
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
test_and_deploy.yml@f94c191107735e0f9fb72f9121e09367c3bdc99f -
Trigger Event:
push
-
Statement type: