Skip to main content

A process pool that allows workers to maintain state across tasks.

Project description

Default concurrent.futures.ProcessPoolExecutor makes it hard to maintain stateful workers (e.g., workers each with a model loaded in GPU memory). This library provides a simple interface to create a pool of stateful workers that can execute tasks in parallel across multiple processes.

+-----------------------+              +----------------------+
|     Main Process      |              |     Process Pool     |
| +--------+ +--------+ |              | +------------------+ |
| | Thread | | Thread | | -- Task ---> | | Worker (Process) | |
| |   1    | |   2    | |              | +------------------+ |
| +--------+ +--------+ |              | +------------------+ |
|      ...     ...      | <--- Res --- | | Worker (Process) | |
| +--------+ +--------+ |              | +------------------+ |
| | Thread | | Thread | |              |         ...          |
| |  N-1   | |   N    | |              | +------------------+ |
| +--------+ +--------+ |              | | Worker (Process) | |
|                       |              | +------------------+ |
+-----------------------+              +----------------------+

Following is a simple example of how to use the library to create a pool of workers each maintaining its own state (in this case, a GPU ID). A task is submitted to the pool, and the result is retrieved in a blocking manner. In practice, you would likely want to submit tasks from a separate thread to avoid blocking.

from stateful_pool import SPool, SWorker
import time, random

# optioanl type annotations are:
# [spawn return, execute argument, execute return]
class SquareWorker(SWorker[str, int, str]):
    gpu_id: int

    def spawn(self, gpu_id):
        self.gpu_id = gpu_id
        return f"Worker initialized on GPU {gpu_id}"
    
    def execute(self, task):
        time.sleep(random.uniform(0.1, 1.0))
        return f"[Execute] Square of {task} is {task * task} (computed on GPU {self.gpu_id})"

if __name__ == "__main__":

    with SPool(SquareWorker, queue_size=100) as pool:
        s1 = pool.spawn(gpu_id=1)
        s2 = pool.spawn(gpu_id=2)
        print(f"{s1}, {s2}")

        res = pool.execute(100)
        print(res)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stateful_pool-0.1.0.tar.gz (3.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stateful_pool-0.1.0-py3-none-any.whl (4.3 kB view details)

Uploaded Python 3

File details

Details for the file stateful_pool-0.1.0.tar.gz.

File metadata

  • Download URL: stateful_pool-0.1.0.tar.gz
  • Upload date:
  • Size: 3.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.10.5

File hashes

Hashes for stateful_pool-0.1.0.tar.gz
Algorithm Hash digest
SHA256 b6f58fbf665325832df2556f7d7872c4d85935ee2b2e205b4ddb335679ba933d
MD5 14910fdc6eb98fb2e6b53cf68031710f
BLAKE2b-256 7a2622db0d8303329bd49acbfe2aac089f9ae4c4c3c48d82a76236dc55448a49

See more details on using hashes here.

File details

Details for the file stateful_pool-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: stateful_pool-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 4.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.10.5

File hashes

Hashes for stateful_pool-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d2bba44561c05bb207ca79ac2c90d0fd55f6a0fd153e4df9ee542a7c7a1b66f5
MD5 ffa8403586ac6288dcecf7ad16fd02b2
BLAKE2b-256 58837fc60e9be4d35b853ea01af80122906ae3705faaedddb6a0d973917d4e1e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page