Wrapper on interable for automatic caching, resuming, retrying, and multiprocessing
Project description
Wrapper on an iterable to support interruption & auto resume, retrying and multiprocessing.
APIs
iterate_wrapper
def iterate_wrapper(
func: Callable[Concatenate[IO, DataType, dict[str, Any], ParamTypes], ReturnType],
data: Iterable[DataType],
output: str | IO | None = None,
restart=False,
retry=5,
on_error: Literal["raise", "continue"] = "raise",
num_workers=1,
bar=True,
flush=True,
total_items: int | None = None,
run_name=__name__,
envs: list[dict[str, str]] = [],
vars_factory: Callable[[], dict[str, Any]] = lambda: {},
*args: ParamTypes.args,
**kwargs: ParamTypes.kwargs,
) -> Sequence[ReturnType] | None:
"""
Wrapper on a processor (func) and iterable (data) to support multiprocessing, retrying and automatic resuming.
Args:
func: The processor function. It should accept three or more arguments: output stream, data item, vars, and additional args (*args and **kwargs, which should be passed to the wrapper). Within func, the output stream can be used to save data in real time.
data: The data to be processed. It can be an iterable or a sequence. In each iteration, the data item in data will be passed to func.
output: The output stream. It can be a file path, a file object or None. If None, no output will be written.
restart: Whether to restart from the beginning.
retry: The number of retries for processing each data item.
on_error: The action to take when an exception is raised in func.
num_workers: The number of workers to use. If set to 1, the processor will be run in the main process.
bar: Whether to show a progress bar (package tqdm required).
flush: Whether to flush the output stream after each data item is processed.
total_items: The total number of items in data. It is required when data is not a sequence.
run_name: The name of the run. It is used to construct the checkpoint file path.
envs: Additional environment variables for each worker. This will be set before spawning new processes.
vars_factory: A function that returns a dictionary of variables to be passed to func. The factory will be called after each process is spawned and before entering the loop. For plain vars, one can simply use closure or functools.partial to pass into func.
*args: Additional positional arguments to be passed to func.
**kwargs: Additional keyword arguments to be passed to func.
Returns:
A list of return values from func.
"""
IterateWrapper
class IterateWrapper(Generic[DataType]):
def __init__(
self,
*data: Iterable[DataType],
mode: Literal["product", "zip"] = "product",
restart=False,
bar=0,
total_items: int | None = None,
convert_type=list,
run_name=__name__,
):
"""
wrap some iterables to provide automatic resuming on interruption, no retrying and limited to sequence
Args:
data: iterables to be wrapped
mode: how to combine iterables. 'product' means Cartesian product, 'zip' means zip()
restart: whether to restart from the beginning
bar: the position of the progress bar. -1 means no bar
total_items: total items to be iterated
convert_type: convert the data to this type
run_name: name of the run to identify the checkpoint and output files
"""
Examples
iterate_wrapper
from typing import IO
from time import sleep
def square(f_io: IO, item: int, vars: dict, sleep_time: float):
from time import sleep
sleep(sleep_time)
result = item * item
f_io.write(f"{result}\n")
data = list(range(10))
num_workers = 3
iterate_wrapper(
square,
data,
output="output.txt",
num_workers=num_workers,
sleep_time=1,
)
with open("output.txt") as f:
print(f.read()) # [1, 4, 9, ..., 100]
IterateWrapper
Just the same as tqdm.tqdm.
data = [1, 2, 3]
results = []
for i in IterateWrapper(data, num_workers=3):
results.append(i * i)
print(results) # [1, 4, 9]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
iterwrap-0.1.7.tar.gz
(19.8 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
iterwrap-0.1.7-py3-none-any.whl
(20.7 kB
view details)
File details
Details for the file iterwrap-0.1.7.tar.gz.
File metadata
- Download URL: iterwrap-0.1.7.tar.gz
- Upload date:
- Size: 19.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.10.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60a61c2caa249484cef49fb550caac0be49571298fda9a46adeb084fe603f242
|
|
| MD5 |
b27d09f491812d712bbc13f8f882ada3
|
|
| BLAKE2b-256 |
7b0d5f79e6588828e76357c47f52043fd65ea81a44edbd7a1b18ac24d3cef550
|
File details
Details for the file iterwrap-0.1.7-py3-none-any.whl.
File metadata
- Download URL: iterwrap-0.1.7-py3-none-any.whl
- Upload date:
- Size: 20.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.10.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a184ae8c4e4ca5db6b92a624a6e7fe13fc43c106c4f825167ef88c242d0c52a4
|
|
| MD5 |
8ed68139351b3232396625989062d810
|
|
| BLAKE2b-256 |
c4e5fb63efb2afaf164ca164ebe7d89a7e531d3feb38784e5d70141c26a973c0
|