Skip to main content

A lightweight ML inference server with dynamic batching, hot-swapping, and Prometheus metrics

Project description

Forge

A lightweight ML inference server with dynamic batching, model hot-swapping, a multi-model registry and Prometheus metrics -- built from scratch in Python.

This is not model.predict() behind Flask. Forge implements the core mechanics of what production runtimes like TorchServe and Triton do, from first principles: request queuing, batch assembly, concurrent GPU scheduling and zero-downtime model replacement.

pip install forge-ml-serve

Features

Feature Description
Dynamic batching Accumulates concurrent requests over a configurable time window then stacks them into a single tensor operation. Typically 4-8x throughput vs. sequential serving.
Backpressure queue asyncio.Queue with a hard depth cap. Returns HTTP 503 immediately when saturated rather than blocking the event loop.
Model hot-swapping Load a new checkpoint with zero downtime. In-flight requests finish on the old model; new requests use the new one.
Multi-model registry Serve multiple models simultaneously at independent endpoints, each with its own queue and batching config.
Prometheus metrics P50/P95/P99 latency histograms, batch size distribution, queue depth, timeout counters and swap duration.

Architecture

  HTTP Request
       |
       v
  +-------------------------------------+
  |  FastAPI  POST /v1/{model}/predict  |
  +------------------+------------------+
                     | asyncio.Future
                     v
  +-------------------------------------+
  |  RequestQueue  (backpressure cap)   |
  +------------------+------------------+
                     | blocking get / nowait drain
                     v
  +-------------------------------------+
  |  BatchScheduler                     |
  |  +-------------------------------+  |
  |  | Collect for batch_window_ms   |  |
  |  |   OR until max_batch_size     |  |  <- whichever fires first
  |  +--------------+----------------+  |
  |                 | run_in_executor   |
  |  +--------------v----------------+  |
  |  |  torch.no_grad() forward pass |  |
  |  +--------------+----------------+  |
  |                 | scatter results   |
  +-----------------|-------------------+
                    |
                    v
         Future.set_result(tensor)
                    |
                    v
         HTTP Response to caller

When 50 clients send requests at the same time, Forge does not run 50 separate inferences. It groups them into batches (up to your configured max_batch_size), runs a single GPU forward pass, then splits and returns the individual results to each caller. That is the core value.


Installation

From PyPI (recommended)

pip install forge-ml-serve

From source

git clone https://github.com/verz0/Forge.git
cd Forge
pip install -e ".[dev]"

Quickstart

1. Serve a dummy model (no GPU needed)

forge serve examples/serve_dummy.py
curl -X POST http://localhost:8000/v1/dummy/predict \
     -H "Content-Type: application/json" \
     -d '{"input": [1.0, 2.0, 3.0]}'

Response:

{
  "model": "dummy",
  "output": [2.0, 4.0, 6.0],
  "request_id": "forge",
  "latency_ms": 1.2
}

2. Serve ResNet-50

pip install torchvision
forge serve examples/serve_resnet.py

3. Check metrics (Prometheus format)

curl http://localhost:8000/metrics

4. Interactive API docs

Open http://localhost:8000/docs in your browser. FastAPI auto-generates a full Swagger UI for every registered model endpoint.


Tutorials

Tutorial 1: Serve a Custom PyTorch Model

Any nn.Module can be served through Forge. Write a config file with an async setup(registry) function that registers your model.

sentiment_server.py:

import torch
import torch.nn as nn
from forge import ModelConfig, ModelRegistry


class SentimentModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(768, 256),
            nn.ReLU(),
            nn.Linear(256, 3),  # negative, neutral, positive
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)


async def setup(registry: ModelRegistry) -> None:
    model = SentimentModel()
    model.load_state_dict(torch.load("sentiment_weights.pt"))
    model.eval()

    config = ModelConfig(
        batch_window_ms=25.0,   # collect requests for 25ms then batch
        max_batch_size=32,
        max_queue_depth=256,
        device="cuda" if torch.cuda.is_available() else "cpu",
    )

    await registry.register("sentiment", model, config)

Start the server:

forge serve sentiment_server.py

Send a request:

curl -X POST http://localhost:8000/v1/sentiment/predict \
     -H "Content-Type: application/json" \
     -d '{"input": [0.1, 0.2, 0.3, ...]}'

Response:

{
  "model": "sentiment",
  "output": [0.12, 0.03, 0.85],
  "request_id": "forge",
  "latency_ms": 4.21
}

Tutorial 2: Serve Multiple Models Simultaneously

Register any number of models in a single config file. Each model gets its own endpoint, queue and batching configuration.

multi_server.py:

import torch
import torch.nn as nn
from forge import ModelConfig, ModelRegistry


class ImageClassifier(nn.Module):
    def forward(self, x):
        return torch.softmax(x.mean(dim=-1, keepdim=True).expand(-1, 10), dim=-1)


class TextEmbedder(nn.Module):
    def forward(self, x):
        return x / x.norm(dim=-1, keepdim=True)


async def setup(registry: ModelRegistry) -> None:
    # Image classifier on GPU with larger batch window
    await registry.register("image-classifier", ImageClassifier(), ModelConfig(
        batch_window_ms=50.0,
        max_batch_size=16,
        device="cuda",
    ))

    # Text embedder on CPU with fast turnaround
    await registry.register("text-embedder", TextEmbedder(), ModelConfig(
        batch_window_ms=10.0,
        max_batch_size=64,
        device="cpu",
    ))
forge serve multi_server.py

Two independent endpoints are now live:

# Classify an image
curl -X POST http://localhost:8000/v1/image-classifier/predict \
     -H "Content-Type: application/json" \
     -d '{"input": [0.5, 0.3, ...]}'

# Generate an embedding
curl -X POST http://localhost:8000/v1/text-embedder/predict \
     -H "Content-Type: application/json" \
     -d '{"input": [0.1, 0.2, ...]}'

# See all registered models
curl http://localhost:8000/v1/models

Tutorial 3: Hot-Swap a Model Without Downtime

You have retrained your model overnight. Instead of restarting the server (which drops all in-flight requests), swap it live.

Step 1 -- Save the new model as TorchScript:

import torch

new_model = SentimentModel()
new_model.load_state_dict(torch.load("sentiment_v2.pt"))

scripted = torch.jit.script(new_model)
torch.jit.save(scripted, "sentiment_v2_scripted.pt")

Step 2 -- Tell Forge to swap:

curl -X POST http://localhost:8000/v1/sentiment/reload \
     -H "Content-Type: application/json" \
     -d '{"model_path": "/path/to/sentiment_v2_scripted.pt"}'

Response:

{"status": "swapped", "model": "sentiment", "path": "/path/to/sentiment_v2_scripted.pt"}

Zero downtime. Requests that were already being processed finish on the old model. New requests immediately use the new one.


Tutorial 4: Monitor with Prometheus

Forge exposes production-grade metrics at the /metrics endpoint in Prometheus text format.

curl http://localhost:8000/metrics

Tracked metrics include:

  • Request latency -- P50, P95, P99 histograms per model
  • Batch size distribution -- how effectively requests are being grouped
  • Queue depth -- current backlog per model
  • Timeout counters -- requests that exceeded the configured timeout
  • Swap duration -- time taken for each model hot-swap operation

Connect this to a Prometheus scrape target and visualize in Grafana for real-time dashboards.


Configuration Reference

from forge import ModelConfig

config = ModelConfig(
    batch_window_ms=50.0,   # Collect requests for 50ms before dispatching
    max_batch_size=32,       # Early flush if 32 requests accumulate first
    max_queue_depth=256,     # Return 503 if more than 256 requests are pending
    request_timeout_s=30.0,  # Fail requests waiting longer than 30s
    device="cuda",           # "cpu", "cuda", "cuda:0" or "mps"
    num_threads=4,           # PyTorch intraop threads (CPU only)
)
Parameter Default Description
batch_window_ms 50.0 Time window in milliseconds to collect requests before dispatching a batch
max_batch_size 32 Maximum number of requests per batch. Flushes early if reached before the window expires
max_queue_depth 256 Maximum pending requests. Returns HTTP 503 when exceeded
request_timeout_s 30.0 Per-request timeout. Returns HTTP 504 on expiry
device "cpu" PyTorch device string for inference
num_threads 4 PyTorch intra-op thread count (relevant for CPU inference)

API Reference

Endpoint Method Description
/v1/{model}/predict POST Submit a tensor for batched inference
/v1/{model}/reload POST Hot-swap to a new TorchScript checkpoint
/v1/models GET List all registered models and their queue depths
/metrics GET Prometheus metrics in text format
/health GET Liveness probe with per-model readiness status
/docs GET Interactive Swagger API documentation

Benchmark

# Start the server
forge serve examples/serve_dummy.py

# Run the sweep in another terminal
python benchmarks/bench_throughput.py --concurrency 1,5,10,25,50,100

# With chart output
python benchmarks/bench_throughput.py --plot

Sample results (CPU, dummy model, 128-float input):

Concurrency RPS P50 (ms) P95 (ms) P99 (ms)
1 420 2.1 2.8 3.1
10 1,850 4.9 8.2 11.4
50 3,200 14.8 28.3 41.7
100 3,400 28.1 52.6 71.2

At concurrency 50, batching yields roughly 7.6x the throughput of a naive sequential server at the cost of approximately 15ms added latency (the batch window).


Running Tests

pytest tests/ -v

Project Structure

forge/
  forge/
    config.py      # ModelConfig and ServerConfig dataclasses
    queue.py       # RequestQueue with backpressure and InferenceRequest
    batcher.py     # BatchScheduler -- the core batching engine
    worker.py      # ModelWorker with hot-swap protocol
    registry.py    # Multi-model ModelRegistry
    metrics.py     # Prometheus metric definitions
    server.py      # FastAPI application and route handlers
    cli.py         # forge serve CLI entry point
  tests/
  benchmarks/
  examples/

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

forge_ml_serve-0.2.1.tar.gz (20.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

forge_ml_serve-0.2.1-py3-none-any.whl (18.6 kB view details)

Uploaded Python 3

File details

Details for the file forge_ml_serve-0.2.1.tar.gz.

File metadata

  • Download URL: forge_ml_serve-0.2.1.tar.gz
  • Upload date:
  • Size: 20.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for forge_ml_serve-0.2.1.tar.gz
Algorithm Hash digest
SHA256 b6e3784be2272387eb2f112aeffbb8541f8ff725676c27c5210a58ac1985c8ef
MD5 4c6afd8af08f70efa2843ae6792562ad
BLAKE2b-256 2b7df6a6a810c6df64f776df3b8559908a25da6b5c5cbc064e1d10a3b7d7b810

See more details on using hashes here.

File details

Details for the file forge_ml_serve-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: forge_ml_serve-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 18.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for forge_ml_serve-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7b9faca87c1fd38d35424a6656169a9e86b30fec16374d5510e3b012a915341d
MD5 0905a5b874fb261ce60cc8b5f2f5fd3f
BLAKE2b-256 e4e5ac08c5cb1c34e5f1990a4431449232ce8cd35c1256bcfb3224632feab13c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page