Skip to main content

A process pool that allows workers to maintain state across tasks.

Project description

Default concurrent.futures.ProcessPoolExecutor makes it hard to maintain stateful workers (e.g., workers each with a model loaded in GPU memory). This library provides a simple interface to create a pool of stateful workers that can execute tasks in parallel across multiple processes.

+-----------------------+              +----------------------+
|     Main Process      |              |     Process Pool     |
| +--------+ +--------+ |              | +------------------+ |
| | Thread | | Thread | | --- Task --> | | Worker (Process) | |
| |   1    | |   2    | | <-- Resp --- | +------------------+ |
| +--------+ +--------+ |              | +------------------+ |
|      ...     ...      |              | | Worker (Process) | |
| +--------+ +--------+ |              | +------------------+ |
| | Thread | | Thread | | --- Task --> |         ...          |
| |  N-1   | |   N    | | <-- Resp --- | +------------------+ |
| +--------+ +--------+ |              | | Worker (Process) | |
|                       |              | +------------------+ |
+-----------------------+              +----------------------+

Installation:

pip install stateful-pool

Following is a simple example of how to use the library to create a pool of workers each maintaining its own state (in this case, a GPU ID). A task is submitted to the pool, and the result is retrieved in a blocking manner. In practice, you would likely want to submit tasks from a separate thread to avoid blocking.

from stateful_pool import SPool, SWorker
import time, random

class SquareWorker(SWorker):
    gpu_id: int

    def spawn(self, gpu_id):
        self.gpu_id = gpu_id
        return f"Worker initialized on GPU {gpu_id}"
    
    def execute(self, value):
        time.sleep(random.uniform(0.1, 1.0))
        return f"[Execute] Square of {value} is {value * value} (computed on GPU {self.gpu_id})"

if __name__ == "__main__":

    with SPool(SquareWorker, queue_size=100) as pool:
        s1 = pool.spawn(gpu_id=1)
        s2 = pool.spawn(gpu_id=2)
        print(f"{s1}, {s2}")

        res = pool.execute(100)
        print(res)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stateful_pool-0.2.0.tar.gz (3.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stateful_pool-0.2.0-py3-none-any.whl (4.3 kB view details)

Uploaded Python 3

File details

Details for the file stateful_pool-0.2.0.tar.gz.

File metadata

  • Download URL: stateful_pool-0.2.0.tar.gz
  • Upload date:
  • Size: 3.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.10.5

File hashes

Hashes for stateful_pool-0.2.0.tar.gz
Algorithm Hash digest
SHA256 afe1b02eaef70a1c974a9c4f872c4ba6e55088194665df5640e2e983d3fd394f
MD5 ff09249338f528bda438b65fbd7269dd
BLAKE2b-256 13c036012a815d54cf127007e14e6930ccafcc20a2a907c6b6f907885cdc214a

See more details on using hashes here.

File details

Details for the file stateful_pool-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: stateful_pool-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 4.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.10.5

File hashes

Hashes for stateful_pool-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 77585e5f2c9d26ba81d0e685a1348d0fe27a4f960de38965e45d19388731fb9b
MD5 f2a3a0b632e868ee69bb82ba77914aa7
BLAKE2b-256 5d6880ae52c78e3ccaf7a2a50b3ec696a14a77b01076f663634a9d00781b7117

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page