Skip to main content

Offloading CPU intensive model scoring to a pool of workers

Project description

Use case / why

Say, we have a ML model, which is CPU hungry and takes time to score, which we want to serve through an API / Web Sockets while:

  • keeping overall application complexity low (no dedicated service to serve ML models)
  • minimising latency
  • maximasing throughput (the number of requests a single Cloud Run instance / container can handle)
  • providing the ability to avoid making blocking calls, which would block coroutines / web sockets

A typical simple API serving ML model(s) instantiates the models right in the main process, which results in the models consuming the API process's RAM and CPU cycles for scoring. This directly affects the API performance in terms of latency and throughput. It would be helpful if we could move model scoring bit somewhere else away from the API process - other cores. This would let us:

  • use CPU resources more efficiently (loading more cores instead of a single one provided that the container has 1+ cores at its disposal)
  • avoid hurting API performance by moving model scoring that require memory and CPU away from the API process
  • decrease latency
  • increase throughput

On top of that, the solution seems to integrate well with the Cloud Run autoscaling strategy. When the load is low, the workers serving the models are pretty much idle. As the load increases, the workers get busy, which in turn increases overall Cloud Run instance CPU usage. Once it reaches a certain threshold, Cloud Run will spin up another instance to handle the load.


How to use / Examples

  • Instantiating MLPool

In order to use MLPool, for every ML model a user wants to run on the pool, they need to provide a callable, which loads a model into memory and prepares it. Under the hood, MLPool will ensure every worker in the pool running in a dedicated process loads the model(s) in memory when it starts (done only once).

Consider these functions that load the text classification models as an example:

def load_text_classifier(weights_path: str) -> TextClassifier:
    return TextClassifier(weights_path)

When instantiating MLPool, a user needs to pass the parameter models_to_load, which is a dictionary where the keys are model names MLPool will serve, and the values are the callables loading the models (example function above).

In addition, the user can specify the number of workers they want, scoring results TTL, the number of jobs that could be queued on the pool etc.

if __name__ == "__main__":
    with MLPool(
        models_to_load={
            "text_classifier": partial(load_text_classifier, "text_classification.pt"),
            "any_other_model": load_any_other_model_callable
        },
        nb_workers=5,
    ) as pool:
        ...

! IMPORTANT, when instantiating MLPool do it under if __name__ == "__main__":

  • Scoring on MLPool

MLPool gives user full control of what gets executed on the pool, so in order to score a model on the pool, the user needs to provide a callable such as:

def score_text_classifier(model: TextClassifier, text):
    return model.classify_text(text)

where the first parameter is the model and then anything else the user wants to pass

To schedule execution of the function above on the pool, the user can call .create_job() or asyncio friendly .create_job_async() methods passing the:

  • score_model_function - a function to run on the pool, which accepts a model as the first argument
  • model_name - model to be passed to the function as the first argument, must be one of the functions provided when instantiating MLPool
  • args, kwargs - any other arguments to pass to the function
job_id = pool.create_job(
    score_model_function=score_text_classifier,
    model_name="text_classifier",
    kwargs={
        "text": "MEMPHIS, Tenn. – Four days ago, Jon Rahm was \
        enduring the season’s worst weather conditions on Sunday at The \
        Open on his way to a closing 75 at Royal Portrush, which \
        considering the wind and the rain was a respectable showing"
    }
)
print(pool.get_result(job_id))

To execution results back the user can call get_result or asyncio friendly get_result_async passing id of the job.

  • Cancelling

Once a job was created, it is possible to cancel model scoring if it hasn't run yet using .cancel_job(). Convenient, if, say, your web socket client disconnects


End to end example:

Say, we have two models we want to serve.

from functools import partial

from fastapi import FastAPI
import pydantic
import uvicorn

from ml_pool import MLPool
from ml_pool.logger import get_logger
from examples.models import HungryIris, HungryDiabetesClassifier


logger = get_logger("api")

app = FastAPI()


# --------------------- functions a user to provide --------------------------
def load_iris(model_path: str) -> HungryIris:
    # TODO: Loading and preparing ML model goes here

    return HungryIris(model_path)


def score_iris(model: HungryIris, features):
    # TODO: Feature engineering etc goes here

    return model.predict(features)


def load_diabetes_classifier(model_path: str) -> HungryDiabetesClassifier:
    # TODO: Loading and preparing ML model goes here

    return HungryDiabetesClassifier(model_path)


def score_diabetes_classifier(model: HungryDiabetesClassifier, features):
    # TODO: Feature engineering etc goes here

    return model.predict(features)


# ------------------------------- schemas ------------------------------------
class Request(pydantic.BaseModel):
    features: list[float]


class Response(pydantic.BaseModel):
    prediction: int


# ------------------------------- endpoints ----------------------------------
@app.post("/iris")
async def serve_iris(request: Request) -> Response:
    features = request.features
    job_id = await pool.create_job_async(
        score_model_function=score_iris,
        model_name="iris",
        kwargs={"features": features},
    )
    result = await pool.get_result_async(job_id)
    return Response(prediction=result)


@app.post("/diabetes")
async def serve_diabetes_classifier(request: Request) -> Response:
    features = request.features
    job_id = await pool.create_job_async(
        score_model_function=score_diabetes_classifier,
        model_name="diabetes_classifier",
        args=(features,),
    )
    result = await pool.get_result_async(job_id)
    return Response(prediction=result)


if __name__ == "__main__":
    with MLPool(
        models_to_load={
            "iris": partial(load_iris, "../models/iris_xgb.json"),
            "diabetes_classifier": partial(
                load_diabetes_classifier, "../models/diabetes_xgb.json"
            ),
        },
        nb_workers=5,
    ) as pool:
        uvicorn.run(app, workers=1)

Gotchas:

  • MLPool comes with significant overhead which has to do with IPC in Python (pickling/unpicking, etc), so if your model is light and takes very little time to run (~milliseconds), then introducing MLPool will only slow things down. In this case it might make sense to score the model directly in the API process. If there is heavy feature engineering work associated with scoring the model, then it could make sense, it depends.

  • ! It is common to spin up multiple instances of the API app inside a container using tools such as gunicorn etc. Be careful when using MLPool in such configuration as you could overload CPU constantly triggering Cloud Run to scale up lol. In our case each container runs only a single instance of the API app, spinning up more instances within the same container won't help as the bottleneck is CPU hungry model scoring.


Known issues:

  • If a worker dies, but it was processing something, then the caller will infinitely wait for the result!

  • Worker monitoring threads run too rarely, it is possible to create a new job after workers have already failed (the window is ~0.2 sec)

  • DO NOT terminate workers, might corrupt the queue and shared dict (dies with locks acquired), try stopping gracefully, doesnt join within timeout, terminate


TODO:

  • Release as a package

  • Test with your WS project

  • Test with proper CPU hungry model


Brainstorming (maybe TODO / extras):

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

MLPool-0.1.0.tar.gz (14.6 kB view details)

Uploaded Source

Built Distribution

MLPool-0.1.0-py3-none-any.whl (17.3 kB view details)

Uploaded Python 3

File details

Details for the file MLPool-0.1.0.tar.gz.

File metadata

  • Download URL: MLPool-0.1.0.tar.gz
  • Upload date:
  • Size: 14.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.15

File hashes

Hashes for MLPool-0.1.0.tar.gz
Algorithm Hash digest
SHA256 68d3af31d2d603f390de4016b3d6b615d4959c458872815e7fe352c1fb19403d
MD5 c8a1d5e3a5ccae139237fe3f78c21672
BLAKE2b-256 7c5c89a352b2b9d9ec53146867a0ffcb751d76f37d4f6b09f8eb414f49269743

See more details on using hashes here.

File details

Details for the file MLPool-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: MLPool-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 17.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.15

File hashes

Hashes for MLPool-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bc1475dd6c599e49d46e2cfa7fe47bf4aee82450b5738e9ed810abe0f32f2334
MD5 ffc5c0d60d4c73320485430cff56133d
BLAKE2b-256 2b9f20bd3aa2623c9ec0065f3b2abf33a80d2e6ec8897bf26a223e06ff151a7f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page