Smart thread/process/interpreter pool implementation.
Project description
SmartPool
SmartPool is a Python library that provides intelligent resource-aware pooling mechanisms for parallel computing. It automatically manages CPU and GPU resources to optimize performance while preventing resource exhaustion.
Features
- Multiple Pool Types: ProcessPool, ThreadPool, and InterpreterPool for different use cases
- Intuitive API Design: Almost the same usage as
concurrent.futurespools - Automatic Resource Management: Monitors and manages CPU cores, memory, and GPU resources
- Hardware-Aware Scheduling: Automatically detects system resources and schedules tasks accordingly
- PyTorch Integration: Support for PyTorch multiprocessing with tensor sharing to avoid serialization
- Training Hot Migration: Automatically moves CPU training tasks to GPU when
best_device()changes - InferSessionPool: Thread-safe ONNX Runtime session pool for concurrent inference
Installation
pip install pysmartpool
Examples
Basic Usage
from smartpool import ProcessPool
if __name__ == "__main__":
# Create a process pool that automatically manages system resources
with ProcessPool() as pool:
# Submit tasks with proper argument passing
futures = [pool.submit(expensive_computation, args=(arg,)) for arg in arguments]
# Get results
results = [future.result() for future in futures]
Resource-Aware Task Scheduling
import os
from smartpool import ProcessPool, DataSize, Resource
# Tasks can specify their resource requirements
def memory_intensive_task(data):
# Your computation here
return processed_data
if __name__ == "__main__":
with ProcessPool(use_torch=True) as pool:
# Pool automatically schedules tasks based on available memory
future = pool.submit(
memory_intensive_task,
args=(large_dataset,),
cpu_mode_res=Resource(
cpu_cores_in_python=os.cpu_count(),
cpu_mem=1*DataSize.GB,
),
gpu_mode_res=Resource(
cpu_cores_in_python=1,
cpu_mem=500*DataSize.MB,
gpu_cores=1024, # Request 1024 CUDA cores (NOT percentage)
gpu_mem=1*DataSize.GB, # Request 2GB GPU memory
),
)
PyTorch Training Hot Migration from CPU to GPU
SmartPool automatically migrates training tasks from CPU to GPU when better devices become available:
# Complete setup for training with optimizer migration
from smartpool import (
limit_num_single_thread,
best_device,
move_optimizer_to,
ProcessPool
)
# Critical: Call before importing torch/numpy
limit_num_single_thread()
import torch
def training_task():
device = best_device() # <-- get best suitable device at init time
old_device = device
for epoch in range(epochs):
for x, y in data_loader:
device = best_device() # <-- get best suitable device at each batch
x, y = x.to(device), y.to(device)
if old_device != device:
model.to(device) # move model to new device
move_optimizer_to(optimizer, device) # move optimizer to new device
old_device = device
do_other_things()
if __name__ == "__main__":
with ProcessPool(use_torch=True) as pool:
future = pool.submit(training_task, args=(model, optimizer, data))
See more examples at smartpool-examples.
API
ProcessPool
Each worker run as a separate process with seperated GIL. Suitable for CPU-intensive tasks.
class ProcessPool:
def __init__(
self, max_workers:int=0,
process_name_prefix:str="ProcessPool.worker:",
mp_context:str="spawn",
initializer:Optional[Callable[..., Any]]=None,
initargs:Tuple[Any, ...]=(),
initkwargs:Optional[Dict[str, Any]]=None,
*,
max_tasks_per_child:Optional[int]=None,
use_torch:bool=False
): ...
"""
Initializes a new ProcessPool instance.
Args:
max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
mp_context: Select process start method from ['fork', 'spawn', 'forkserver']
initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
initkwargs: A dictionary of keyword arguments to pass to the initializer.
max_tasks_per_child: The maximum number of tasks a worker process
can complete before it will exit and be replaced with a fresh
worker process. The default of None means worker process will
live as long as the executor. Requires a non-'fork' mp_context
start method. When given, we default to using 'spawn' if no
mp_context is supplied.
use_torch: Whether to use PyTorch multiprocessing with tensor sharing and GPU device support.
"""
def submit(
self, func:Callable[..., Any],
args:Optional[Tuple[Any]]=None,
kwargs:Optional[Dict[str, Any]]=None,
cpu_mode_res: Optional[Resource] = None,
gpu_mode_res: Optional[Resource] = None,
use_torch: Optional[bool] = None
)->concurrent.futures.Future: ...
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Args:
func: The callable to execute.
args: The arguments to pass to the callable.
kwargs: The keyword arguments to pass to the callable.
cpu_mode_res: Resource requirements when running on CPU.
gpu_mode_res: Resource requirements when running on GPU.
use_torch: Whether to enable PyTorch tensor sharing and device migration.
Returns:
A concurrent.futures.Future representing the given call.
"""
def map(
self, func:Callable[..., Any],
iterable:Iterable[Any],
cpu_mode_res:Optional[Union[Resource, Iterable[Resource]]]=None,
gpu_mode_res:Optional[Union[Resource, Iterable[Resource]]]=None,
timeout:Optional[Union[float, int]]=None,
chunksize:int=1
)->Iterable[Any]: ...
"""
Returns an iterator equivalent to map(func, iterable).
Args:
func: A callable that will take as many arguments as there are
passed iterables.
iterable: An iterable whose items will be passed to func as arguments.
cpu_mode_res: Resource requirements (or iterable of resources) for CPU mode.
gpu_mode_res: Resource requirements (or iterable of resources) for GPU mode.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: If greater than one, the iterables will be chopped into
chunks of size chunksize and submitted to the process pool.
If set to one, the items in the list will be sent one at a time.
Returns:
An iterator equivalent to: map(func, iterables).
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
def starmap(
self, func:Callable[..., Any],
args_iterables:Iterable[Tuple[Any, ...]],
cpu_mode_res:Union[Resource, Iterable[Resource]] = Resource(cpu_cores_in_python=1),
gpu_mode_res:Union[Resource, Iterable[Resource], None]=None,
timeout:Optional[Union[float, int]]=None,
chunksize:int=1
)->Iterable[Any]: ...
"""
Like `map()` method but the elements of the `args_iterables` are expected to
be iterables as well and will be unpacked as arguments. Hence
`func` and (a, b) becomes func(a, b).
"""
def shutdown(self, wait:bool=True, *, cancel_futures:bool=False)->None: ...
"""
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other
methods can be called after this one.
Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
executor have been reclaimed.
cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
def __enter__(self)->ProcessPool: ...
def __exit__(self, exc_type, exc_val, exc_tb)->None: ...
ThreadPool
Each worker run as a thread. Suitable for IO-intensive tasks.
class ThreadPool:
def __init__(
self, max_workers:int=0,
thread_name_prefix:str="ThreadPool.worker:",
initializer:Optional[Callable[..., Any]]=None,
initargs:Tuple[Any, ...]=(),
initkwargs:Optional[Dict[str, Any]]=None,
*,
max_tasks_per_child:Optional[int]=None,
use_torch:bool=False
): ...
"""
Initializes a new ThreadPool instance.
Same as ProcessPool
"""
def submit(self, ...): ...
def map(self, ...): ...
def starmap(self, ...): ...
def shutdown(self, ...): ...
def __enter__(self): ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
"""
All same as ProcessPool
"""
InterpreterPool (Python 3.14+)
Each worker run as a thread within a isolated interpreter with seperated GIL. Suitable for CPU-intensive tasks.
Less overhead than ProcessPool when create/destroy workers and task switching.
But not support for numpy/torch.
class InterpreterPool:
def __init__(
self, max_workers:int=0,
initializer:Optional[Callable[..., Any]]=None,
initargs:Tuple[Any, ...]=(),
initkwargs:Optional[Dict[str, Any]]=None,
*,
max_tasks_per_child:Optional[int]=None,
use_torch:bool=False
): ...
"""
Initializes a new InterpreterPool instance.
Same as ProcessPool
"""
def submit(self, ...): ...
def map(self, ...): ...
def starmap(self, ...): ...
def shutdown(self, ...): ...
def __enter__(self): ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
"""
All same as ProcessPool
"""
InferSessionPool
Each worker runs as a thread with a dedicated ONNX Runtime InferenceSession.
Suitable for concurrent inference on CPU, CUDA, or DML devices.
class InferSessionPool(ThreadPool):
def __init__(
self, max_workers:int=0,
thread_name_prefix:str="InferSessionPool.worker:",
initializer:Optional[Callable[..., Any]]=None,
initargs:Tuple[Any, ...]=(),
initkwargs:Optional[Dict[str, Any]]=None,
*,
max_tasks_per_child:Optional[int]=None,
): ...
"""
Initializes a new InferSessionPool instance.
Same as ThreadPool
"""
def submit(
self, model_path:str,
args:Optional[Tuple[Any]]=None,
kwargs:Optional[Dict[str, Any]]=None,
cpu_mode_res: Optional[Resource] = None,
gpu_mode_res: Optional[Resource] = None,
)->Future: ...
"""
Submits an ONNX model for inference with the given input tensors.
Args:
model_path: Path to the .onnx model file.
args: Input tensors to pass to the model.
kwargs: Additional keyword arguments.
cpu_mode_res: Resource requirements when running on CPU.
gpu_mode_res: Resource requirements when running on GPU.
Returns:
A concurrent.futures.Future representing the inference result.
"""
def map(self, ...): ...
def starmap(self, ...): ...
def shutdown(self, ...): ...
def __enter__(self): ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
"""
All same as ThreadPool
"""
Resource & DataSize
Resource describes the CPU/GPU resources a task requires. DataSize provides
human-readable memory size constants.
class Resource:
def __init__(
self, *,
cpu_cores: float = 1,
cpu_cores_in_python: Optional[float] = None,
cpu_cores_out_of_python: float = 0,
cpu_mem: int = 0,
gpu_cores: float = 0,
gpu_mem: int = 0,
): ...
"""
Args:
cpu_cores: Shorthand for cpu_cores_in_python (used when
cpu_cores_in_python is not given).
cpu_cores_in_python: Number of CPU cores consumed inside the
Python process (e.g. by numpy/torch).
cpu_cores_out_of_python: CPU cores consumed outside Python
(e.g. by subprocesses).
cpu_mem: CPU memory required in bytes.
gpu_cores: Number of GPU cores (CUDA cores) required.
gpu_mem: GPU memory required in bytes.
"""
@property
def cpu_cores(self) -> float:
"""Sum of cpu_cores_in_python and cpu_cores_out_of_python."""
class DataSize:
B = 1
KB = 1024 * B
MB = 1024 * KB
GB = 1024 * MB
TB = 1024 * GB
Example:
from smartpool import Resource, DataSize
# Request 2 CPU cores + 1 GB RAM + 1024 CUDA cores + 2 GB GPU memory
res = Resource(
cpu_cores_in_python=2,
cpu_mem=1 * DataSize.GB,
gpu_cores=1024,
gpu_mem=2 * DataSize.GB,
)
License
MIT License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pysmartpool-0.1.6.tar.gz.
File metadata
- Download URL: pysmartpool-0.1.6.tar.gz
- Upload date:
- Size: 27.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
888a5455aaf4267860910e2838961f08b391ddb72fe1af7475704a805efc4013
|
|
| MD5 |
a747c15137f9e6593995507b9ca792bd
|
|
| BLAKE2b-256 |
32d09e472b6aafe608515b5d6e2b9c1e8911f6def2f983b62d8d4f7053b559c9
|
File details
Details for the file pysmartpool-0.1.6-py3-none-any.whl.
File metadata
- Download URL: pysmartpool-0.1.6-py3-none-any.whl
- Upload date:
- Size: 37.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ca5fdbb2d66081627af84da5bd9e6cafb377c1caf2091400bba4b306f5b9f764
|
|
| MD5 |
5882fa779e1b40ae71bfb39c1afc37b8
|
|
| BLAKE2b-256 |
0ae86cf2fce2507f3369ed48281ca9d40caf98471188f8e8dd8f6831e61b798f
|