Skip to main content

A process pool that allows workers to maintain state across tasks.

Project description

Default ProcessPoolExecutor makes it hard to maintain stateful workers, especially workers with expensive setup (e.g., workers each with a model loaded in GPU memory).

This library lets you create a pool of stateful workers (spawn once) to run tasks in parallel across processes (execute many).

+-----------------------+              +----------------------+
|     Main Process      |              |     Process Pool     |
| +--------+ +--------+ |              | +------------------+ |
| | Thread | | Thread | | --- Task --> | | Worker (Process) | |
| |   1    | |   2    | | <-- Resp --- | +------------------+ |
| +--------+ +--------+ |              | +------------------+ |
|      ...     ...      |              | | Worker (Process) | |
| +--------+ +--------+ |              | +------------------+ |
| | Thread | | Thread | | --- Task --> |         ...          |
| |  N-1   | |   N    | | <-- Resp --- | +------------------+ |
| +--------+ +--------+ |              | | Worker (Process) | |
|                       |              | +------------------+ |
+-----------------------+              +----------------------+

Installation:

pip install stateful-pool

Following is an example of how to define a worker class, spawn workers (each assigned several GPU IDs), and execute tasks on them.

from stateful_pool import SPool, SWorker
import time, random

# will run in another process
class SquareWorker(SWorker):
    gpu_ids: list[int]

    def spawn(self, gpu_ids: list[int]):
        self.gpu_ids = gpu_ids
        return f"Worker initialized on GPU: {self.gpu_ids}"
    
    def execute(self, value):
        time.sleep(random.uniform(0.1, 1.0))
        return f"[Execute] Square of {value} is {value * value} (computed on GPU: {self.gpu_ids})"

if __name__ == "__main__":

    with SPool(SquareWorker, queue_size=100) as pool:
        # spawn a worker, return value can be captured
        s = pool.spawn(gpu_ids=[0, 1])
        print(f"{s}")

        # submit a single task and wait for result
        r = pool.execute(100)
        print(r)

The example calls pool.execute once. This doesn't demonstrate the power of the pool (parallelism). In practice, you would likely want to submit tasks in a non-blocking manner via submit_* counterparts:

with SPool(SquareWorker) as pool:
    spawn_futures = [pool.submit_spawn(gpu_ids=[i, i+1]) for i in range(0, 4, 2)]
    for f in spawn_futures:
        print(f.result())
    
    execute_futures = [pool.submit_execute(i) for i in range(4)]
    for f in execute_futures:
        print(f.result())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stateful_pool-0.3.1.tar.gz (4.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stateful_pool-0.3.1-py3-none-any.whl (4.7 kB view details)

Uploaded Python 3

File details

Details for the file stateful_pool-0.3.1.tar.gz.

File metadata

  • Download URL: stateful_pool-0.3.1.tar.gz
  • Upload date:
  • Size: 4.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.10.5

File hashes

Hashes for stateful_pool-0.3.1.tar.gz
Algorithm Hash digest
SHA256 3f483dc09fc0e289a612a6c80742cb548b49ade3b28972742b36725229c0d13b
MD5 63ac3f0bb13e0fb34851805090194ccb
BLAKE2b-256 e4cf5be8829779dc2d8e6a41ddec7ceb0d8c04b63026a55eb21955410aa72baa

See more details on using hashes here.

File details

Details for the file stateful_pool-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: stateful_pool-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 4.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.10.5

File hashes

Hashes for stateful_pool-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 de9b199e87534e70875bf11ea4d18de17289c09011e4c03f8365297421c6dcbe
MD5 f71519737124b4a540142f0daed40e3a
BLAKE2b-256 fcdf8deb6ccc0432adc42075a1275324eca77b6e4bfe932364868f7225211064

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page