A process pool that allows workers to maintain state across tasks.
Project description
Default ProcessPoolExecutor makes it hard to maintain stateful workers,
especially workers with expensive setup (e.g., workers each with a model loaded in GPU memory).
This library lets you create a pool of stateful workers (spawn once) to run tasks in parallel across processes (execute many).
+-----------------------+ +----------------------+
| Main Process | | Process Pool |
| +--------+ +--------+ | | +------------------+ |
| | Thread | | Thread | | --- Task --> | | Worker (Process) | |
| | 1 | | 2 | | <-- Resp --- | +------------------+ |
| +--------+ +--------+ | | +------------------+ |
| ... ... | | | Worker (Process) | |
| +--------+ +--------+ | | +------------------+ |
| | Thread | | Thread | | --- Task --> | ... |
| | N-1 | | N | | <-- Resp --- | +------------------+ |
| +--------+ +--------+ | | | Worker (Process) | |
| | | +------------------+ |
+-----------------------+ +----------------------+
Installation:
pip install stateful-pool
Following is an example of how to define a worker class, spawn workers (each assigned several GPU IDs), and execute tasks on them.
from stateful_pool import SPool, SWorker
import time, random
# will run in another process
class SquareWorker(SWorker):
gpu_ids: list[int]
def spawn(self, gpu_ids: list[int]):
self.gpu_ids = gpu_ids
return f"Worker initialized on GPU: {self.gpu_ids}"
def execute(self, value):
time.sleep(random.uniform(0.1, 1.0))
return f"[Execute] Square of {value} is {value * value} (computed on GPU: {self.gpu_ids})"
if __name__ == "__main__":
with SPool(SquareWorker, queue_size=100) as pool:
# spawn a worker, return value can be captured
s = pool.spawn(gpu_ids=[0, 1])
print(f"{s}")
# submit a single task and wait for result
r = pool.execute(100)
print(r)
The example calls pool.execute once. This doesn't demonstrate the power of the pool (parallelism).
In practice, you would likely want to submit tasks in a non-blocking manner via submit_* counterparts:
with SPool(SquareWorker) as pool:
spawn_futures = [pool.submit_spawn(gpu_ids=[i, i+1]) for i in range(0, 4, 2)]
for f in spawn_futures:
print(f.result())
execute_futures = [pool.submit_execute(i) for i in range(4)]
for f in execute_futures:
print(f.result())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stateful_pool-0.3.1.tar.gz.
File metadata
- Download URL: stateful_pool-0.3.1.tar.gz
- Upload date:
- Size: 4.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.10.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3f483dc09fc0e289a612a6c80742cb548b49ade3b28972742b36725229c0d13b
|
|
| MD5 |
63ac3f0bb13e0fb34851805090194ccb
|
|
| BLAKE2b-256 |
e4cf5be8829779dc2d8e6a41ddec7ceb0d8c04b63026a55eb21955410aa72baa
|
File details
Details for the file stateful_pool-0.3.1-py3-none-any.whl.
File metadata
- Download URL: stateful_pool-0.3.1-py3-none-any.whl
- Upload date:
- Size: 4.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.10.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
de9b199e87534e70875bf11ea4d18de17289c09011e4c03f8365297421c6dcbe
|
|
| MD5 |
f71519737124b4a540142f0daed40e3a
|
|
| BLAKE2b-256 |
fcdf8deb6ccc0432adc42075a1275324eca77b6e4bfe932364868f7225211064
|