Skip to main content

A process pool that allows workers to maintain state across tasks.

Reason this release was yanked:

api change

Project description

Default ProcessPoolExecutor makes it hard to maintain stateful workers, especially workers with expensive setup (e.g., workers each with a model loaded in GPU memory).

This library lets you create a pool of stateful workers (spawn once) to run tasks in parallel across processes (execute many).

+-----------------------+              +----------------------+
|     Main Process      |              |     Process Pool     |
| +--------+ +--------+ |              | +------------------+ |
| | Thread | | Thread | | --- Task --> | | Worker (Process) | |
| |   1    | |   2    | | <-- Resp --- | +------------------+ |
| +--------+ +--------+ |              | +------------------+ |
|      ...     ...      |              | | Worker (Process) | |
| +--------+ +--------+ |              | +------------------+ |
| | Thread | | Thread | | --- Task --> |         ...          |
| |  N-1   | |   N    | | <-- Resp --- | +------------------+ |
| +--------+ +--------+ |              | | Worker (Process) | |
|                       |              | +------------------+ |
+-----------------------+              +----------------------+

Installation:

pip install stateful-pool

Following is an example of how to define a worker class, spawn workers (each assigned several GPU IDs), and execute tasks on them.

from stateful_pool import SPool, SWorker
import time, random

# will run in another process
class SquareWorker(SWorker):
    gpu_ids: list[int]

    def spawn(self, gpu_ids: list[int]):
        self.gpu_ids = gpu_ids
        return f"Worker initialized on GPU: {self.gpu_ids}"
    
    def execute(self, value):
        time.sleep(random.uniform(0.1, 1.0))
        return f"[Execute] Square of {value} is {value * value} (computed on GPU: {self.gpu_ids})"

if __name__ == "__main__":

    with SPool(SquareWorker, queue_size=100) as pool:
        # return value from spawn can be captured
        s1 = pool.spawn(gpu_ids=[0, 1])
        s2 = pool.spawn(gpu_ids=[2, 3])
        print(f"{s1}, {s2}")

        # submit a single task and wait for result
        res = pool.execute(100)
        print(res)

The example calls pool.execute once. This doesn't demonstrate the power of the pool (parallelism). In practice, you would likely want to submit tasks in a non-blocking manner:

with SPool(SquareWorker, thread_pool=ThreadPoolExecutor()) as pool:
    spawn_futures = [pool.spawn_future(gpu_ids=[i, i+1]) for i in range(0, 4, 2)]
    for f in spawn_futures:
        print(f.result())
    
    execute_futures = [pool.execute_future(i) for i in range(4)]
    for f in execute_futures:
        print(f.result())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stateful_pool-0.3.0.tar.gz (4.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stateful_pool-0.3.0-py3-none-any.whl (4.7 kB view details)

Uploaded Python 3

File details

Details for the file stateful_pool-0.3.0.tar.gz.

File metadata

  • Download URL: stateful_pool-0.3.0.tar.gz
  • Upload date:
  • Size: 4.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.10.5

File hashes

Hashes for stateful_pool-0.3.0.tar.gz
Algorithm Hash digest
SHA256 bdf4c8e7be362dea6307a1e7b3227eb07fa5490e0731405e5a3b24f203e1d44f
MD5 6f98c867fa27cc776d6138646dd54396
BLAKE2b-256 bc8fff87a04770d9929c8b3fcb3d2fd6f0327e383938aaca032bd3586594c566

See more details on using hashes here.

File details

Details for the file stateful_pool-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: stateful_pool-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 4.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.10.5

File hashes

Hashes for stateful_pool-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3909cd8e7bf9acec806761c3c92282a780a068547dbedd1860a3e5da63b106a1
MD5 dd53c0247997a92b5d527619f3c531e7
BLAKE2b-256 6b3723d479257ffb7bef17f35b47130079968b88c8e9e446bb3bc68399eac240

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page