Wrapper on iterable for automatic caching, resuming, retrying, and multiprocessing
Project description
Wrapper on an iterable to support interruption & auto resume, retrying and multiprocessing.
The code is tested on Linux.
APIs
iterate_wrapper
def iterate_wrapper(
func: Callable[Concatenate[IO, DataType, dict[str, Any], ParamTypes], ReturnType],
data: Iterable[DataType],
output: str | IO | None = None,
restart=False,
retry=5,
on_error: Literal["raise", "continue"] = "raise",
num_workers=1,
bar=True,
flush=True,
total_items: int | None = None,
run_name=__name__,
envs: list[dict[str, str]] = [],
vars_factory: Callable[[], dict[str, Any]] = lambda: {},
*args: ParamTypes.args,
**kwargs: ParamTypes.kwargs,
) -> Sequence[ReturnType] | None:
"""Wrapper on a processor (func) and iterable (data) to support multiprocessing, retrying and automatic resuming.
Args:
func: The processor function. It should accept the following argument patterns: data item only; output stream, data item; output stream, data item, vars. Additional args (*args and **kwargs) can be added in func, which should be passed to the wrapper. Within func, the output stream can be used to save data in real time. See `vars_factory` for the usage of `vars`.
data: The data to be processed. It can be an iterable or a sequence. In each iteration, the data item in data will be passed to func.
output: The output stream. It can be a file path, a file object or None. If None, no output will be written.
restart: Whether to restart from the beginning.
retry: The number of retries for processing each data item.
on_error: The action to take when an exception is raised in func.
num_workers: The number of workers to use. If set to 1, the processor will be run in the main process.
bar: Whether to show a progress bar (package tqdm required).
flush: Whether to flush the output stream after each data item is processed.
total_items: The total number of items in data. It is required when data is not a sequence.
run_name: The name of the run. It is used to construct the checkpoint file path.
envs: Additional environment variables for each worker. This will be set before spawning new processes.
vars_factory: A callable that returns a dictionary of variables to be passed to func. The factory will be called after each process is spawned and before entering the loop. For plain vars, include them in *args and **kwargs.
*args: Additional positional arguments to be passed to func.
**kwargs: Additional keyword arguments to be passed to func.
Returns:
A list of return values from func.
"""
IterateWrapper
class IterateWrapper(Generic[DataType]):
def __init__(
self,
*data: Iterable[DataType],
mode: Literal["product", "zip"] = "product",
restart=False,
bar=0,
total_items: int | None = None,
convert_type=list,
run_name=__name__,
):
"""
wrap some iterables to provide automatic resuming on interruption, no retrying and limited to sequence
Args:
data: iterables to be wrapped
mode: how to combine iterables. 'product' means Cartesian product, 'zip' means zip()
restart: whether to restart from the beginning
bar: the position of the progress bar. -1 means no bar
total_items: total items to be iterated
convert_type: convert the data to this type
run_name: name of the run to identify the checkpoint and output files
"""
Examples
iterate_wrapper
from typing import IO
from time import sleep
from iterwrap import iterate_wrapper
def square(f_io: IO, item: int, fn: Callable[[float], float]):
result = fn(item)
f_io.write(f"{result}\n")
data = list(range(10))
num_workers = 3
iterate_wrapper(
square,
data,
output="output.txt",
num_workers=num_workers,
fn=lambda x: x * x,
)
with open("output.txt") as f:
print(f.read()) # [0, 1, 4, 9, ..., 81]
IterateWrapper
Just the same as tqdm.tqdm.
from iterwrap import IterateWrapper
data = [1, 2, 3]
results = []
for i in IterateWrapper(data):
results.append(i * i)
print(results) # [1, 4, 9]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
iterwrap-0.1.9.tar.gz
(20.6 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
iterwrap-0.1.9-py3-none-any.whl
(20.9 kB
view details)
File details
Details for the file iterwrap-0.1.9.tar.gz.
File metadata
- Download URL: iterwrap-0.1.9.tar.gz
- Upload date:
- Size: 20.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5bc8fb628012b71e2bad2f6001ac1fc54acd07a61c62a266f1519177bc9978af
|
|
| MD5 |
7e3c36606915a50498da8e0c0614a7ff
|
|
| BLAKE2b-256 |
d3c81354a0e9e7c89e7601c2506cb30e0e867b5d6a01e21d36805d64bad9576f
|
File details
Details for the file iterwrap-0.1.9-py3-none-any.whl.
File metadata
- Download URL: iterwrap-0.1.9-py3-none-any.whl
- Upload date:
- Size: 20.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
73b4b6ab91a4b18e0e76775e1b59760fcc1497f0db6515c2b83bf3a7c8cc4cf3
|
|
| MD5 |
ac766f6c5c151404aeb5f98ba56f0a78
|
|
| BLAKE2b-256 |
5d4ab3be210d5715ac4ca837fd9013d4aaadf1699a96aaff3c3e0b44cb546db9
|