A lightweight ML inference server with dynamic batching, hot-swapping, and Prometheus metrics
Project description
Forge
A lightweight ML inference server with dynamic batching, model hot-swapping, a multi-model registry and Prometheus metrics -- built from scratch in Python.
This is not model.predict() behind Flask. Forge implements the core mechanics of what production runtimes like TorchServe and Triton do, from first principles: request queuing, batch assembly, concurrent GPU scheduling and zero-downtime model replacement.
pip install forge-ml-serve
Why Forge?
Enterprise inference runtimes are built for large-scale cloud clusters. They require Docker, have massive dependency trees and take gigabytes of disk space.
Forge is designed for a different set of problems:
| Forge | TorchServe | Triton | |
|---|---|---|---|
| Setup | pip install |
Docker + config files | Docker + model repository |
| Dependencies | FastAPI + PyTorch | JVM + PyTorch + TS libs | C++ runtime + CUDA toolkit |
| Lines of core code | ~800 | ~50k | ~200k |
| Local-first | Yes | No | No |
| Ideal for | Edge, local dev, private servers | Cloud production | Cloud production at scale |
Forge is not trying to replace Triton in a 10,000 QPS cloud deployment. It demonstrates that you understand what Triton does and can build the critical path yourself -- in clean, readable Python.
Features
| Feature | Description |
|---|---|
| Dynamic batching | Accumulates concurrent requests over a configurable time window then stacks them into a single tensor operation. Typically 4-8x throughput vs. sequential serving. |
| Backpressure queue | asyncio.Queue with a hard depth cap. Returns HTTP 503 immediately when saturated rather than blocking the event loop. |
| Model hot-swapping | Load a new checkpoint with zero downtime. In-flight requests finish on the old model; new requests use the new one. |
| Multi-model registry | Serve multiple models simultaneously at independent endpoints, each with its own queue and batching config. |
| Prometheus metrics | P50/P95/P99 latency histograms, batch size distribution, queue depth, timeout counters and swap duration. |
Architecture
HTTP Request
|
v
+-------------------------------------+
| FastAPI POST /v1/{model}/predict |
+------------------+------------------+
| asyncio.Future
v
+-------------------------------------+
| RequestQueue (backpressure cap) |
+------------------+------------------+
| blocking get / nowait drain
v
+-------------------------------------+
| BatchScheduler |
| +-------------------------------+ |
| | Collect for batch_window_ms | |
| | OR until max_batch_size | | <- whichever fires first
| +--------------+----------------+ |
| | run_in_executor |
| +--------------v----------------+ |
| | torch.no_grad() forward pass | |
| +--------------+----------------+ |
| | scatter results |
+-----------------|-------------------+
|
v
Future.set_result(tensor)
|
v
HTTP Response to caller
When 50 clients send requests at the same time, Forge does not run 50 separate inferences. It groups them into batches (up to your configured max_batch_size), runs a single GPU forward pass, then splits and returns the individual results to each caller. That is the core value.
Installation
From PyPI (recommended)
pip install forge-ml-serve
From source
git clone https://github.com/verz0/Forge.git
cd Forge
pip install -e ".[dev]"
Quickstart
1. Serve a dummy model (no GPU needed)
forge serve examples/serve_dummy.py
curl -X POST http://localhost:8000/v1/dummy/predict \
-H "Content-Type: application/json" \
-d '{"input": [1.0, 2.0, 3.0]}'
Response:
{
"model": "dummy",
"output": [2.0, 4.0, 6.0],
"request_id": "forge",
"latency_ms": 1.2
}
2. Serve ResNet-50
pip install torchvision
forge serve examples/serve_resnet.py
3. Check metrics (Prometheus format)
curl http://localhost:8000/metrics
4. Interactive API docs
Open http://localhost:8000/docs in your browser. FastAPI auto-generates a full Swagger UI for every registered model endpoint.
Tutorials
Tutorial 1: Serve a Custom PyTorch Model
Any nn.Module can be served through Forge. Write a config file with an async setup(registry) function that registers your model.
sentiment_server.py:
import torch
import torch.nn as nn
from forge import ModelConfig, ModelRegistry
class SentimentModel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(768, 256),
nn.ReLU(),
nn.Linear(256, 3), # negative, neutral, positive
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x)
async def setup(registry: ModelRegistry) -> None:
model = SentimentModel()
model.load_state_dict(torch.load("sentiment_weights.pt"))
model.eval()
config = ModelConfig(
batch_window_ms=25.0, # collect requests for 25ms then batch
max_batch_size=32,
max_queue_depth=256,
device="cuda" if torch.cuda.is_available() else "cpu",
)
await registry.register("sentiment", model, config)
Start the server:
forge serve sentiment_server.py
Send a request:
curl -X POST http://localhost:8000/v1/sentiment/predict \
-H "Content-Type: application/json" \
-d '{"input": [0.1, 0.2, 0.3, ...]}'
Response:
{
"model": "sentiment",
"output": [0.12, 0.03, 0.85],
"request_id": "forge",
"latency_ms": 4.21
}
Tutorial 2: Serve Multiple Models Simultaneously
Register any number of models in a single config file. Each model gets its own endpoint, queue and batching configuration.
multi_server.py:
import torch
import torch.nn as nn
from forge import ModelConfig, ModelRegistry
class ImageClassifier(nn.Module):
def forward(self, x):
return torch.softmax(x.mean(dim=-1, keepdim=True).expand(-1, 10), dim=-1)
class TextEmbedder(nn.Module):
def forward(self, x):
return x / x.norm(dim=-1, keepdim=True)
async def setup(registry: ModelRegistry) -> None:
# Image classifier on GPU with larger batch window
await registry.register("image-classifier", ImageClassifier(), ModelConfig(
batch_window_ms=50.0,
max_batch_size=16,
device="cuda",
))
# Text embedder on CPU with fast turnaround
await registry.register("text-embedder", TextEmbedder(), ModelConfig(
batch_window_ms=10.0,
max_batch_size=64,
device="cpu",
))
forge serve multi_server.py
Two independent endpoints are now live:
# Classify an image
curl -X POST http://localhost:8000/v1/image-classifier/predict \
-H "Content-Type: application/json" \
-d '{"input": [0.5, 0.3, ...]}'
# Generate an embedding
curl -X POST http://localhost:8000/v1/text-embedder/predict \
-H "Content-Type: application/json" \
-d '{"input": [0.1, 0.2, ...]}'
# See all registered models
curl http://localhost:8000/v1/models
Tutorial 3: Hot-Swap a Model Without Downtime
You have retrained your model overnight. Instead of restarting the server (which drops all in-flight requests), swap it live.
Step 1 -- Save the new model as TorchScript:
import torch
new_model = SentimentModel()
new_model.load_state_dict(torch.load("sentiment_v2.pt"))
scripted = torch.jit.script(new_model)
torch.jit.save(scripted, "sentiment_v2_scripted.pt")
Step 2 -- Tell Forge to swap:
curl -X POST http://localhost:8000/v1/sentiment/reload \
-H "Content-Type: application/json" \
-d '{"model_path": "/path/to/sentiment_v2_scripted.pt"}'
Response:
{"status": "swapped", "model": "sentiment", "path": "/path/to/sentiment_v2_scripted.pt"}
Zero downtime. Requests that were already being processed finish on the old model. New requests immediately use the new one.
Tutorial 4: Monitor with Prometheus
Forge exposes production-grade metrics at the /metrics endpoint in Prometheus text format.
curl http://localhost:8000/metrics
Tracked metrics include:
- Request latency -- P50, P95, P99 histograms per model
- Batch size distribution -- how effectively requests are being grouped
- Queue depth -- current backlog per model
- Timeout counters -- requests that exceeded the configured timeout
- Swap duration -- time taken for each model hot-swap operation
Connect this to a Prometheus scrape target and visualize in Grafana for real-time dashboards.
Configuration Reference
from forge import ModelConfig
config = ModelConfig(
batch_window_ms=50.0, # Collect requests for 50ms before dispatching
max_batch_size=32, # Early flush if 32 requests accumulate first
max_queue_depth=256, # Return 503 if more than 256 requests are pending
request_timeout_s=30.0, # Fail requests waiting longer than 30s
device="cuda", # "cpu", "cuda", "cuda:0" or "mps"
num_threads=4, # PyTorch intraop threads (CPU only)
)
| Parameter | Default | Description |
|---|---|---|
batch_window_ms |
50.0 | Time window in milliseconds to collect requests before dispatching a batch |
max_batch_size |
32 | Maximum number of requests per batch. Flushes early if reached before the window expires |
max_queue_depth |
256 | Maximum pending requests. Returns HTTP 503 when exceeded |
request_timeout_s |
30.0 | Per-request timeout. Returns HTTP 504 on expiry |
device |
"cpu" | PyTorch device string for inference |
num_threads |
4 | PyTorch intra-op thread count (relevant for CPU inference) |
API Reference
| Endpoint | Method | Description |
|---|---|---|
/v1/{model}/predict |
POST | Submit a tensor for batched inference |
/v1/{model}/reload |
POST | Hot-swap to a new TorchScript checkpoint |
/v1/models |
GET | List all registered models and their queue depths |
/metrics |
GET | Prometheus metrics in text format |
/health |
GET | Liveness probe with per-model readiness status |
/docs |
GET | Interactive Swagger API documentation |
Benchmark
# Start the server
forge serve examples/serve_dummy.py
# Run the sweep in another terminal
python benchmarks/bench_throughput.py --concurrency 1,5,10,25,50,100
# With chart output
python benchmarks/bench_throughput.py --plot
Sample results (CPU, dummy model, 128-float input):
| Concurrency | RPS | P50 (ms) | P95 (ms) | P99 (ms) |
|---|---|---|---|---|
| 1 | 420 | 2.1 | 2.8 | 3.1 |
| 10 | 1,850 | 4.9 | 8.2 | 11.4 |
| 50 | 3,200 | 14.8 | 28.3 | 41.7 |
| 100 | 3,400 | 28.1 | 52.6 | 71.2 |
At concurrency 50, batching yields roughly 7.6x the throughput of a naive sequential server at the cost of approximately 15ms added latency (the batch window).
Running Tests
pytest tests/ -v
Project Structure
forge/
forge/
config.py # ModelConfig and ServerConfig dataclasses
queue.py # RequestQueue with backpressure and InferenceRequest
batcher.py # BatchScheduler -- the core batching engine
worker.py # ModelWorker with hot-swap protocol
registry.py # Multi-model ModelRegistry
metrics.py # Prometheus metric definitions
server.py # FastAPI application and route handlers
cli.py # forge serve CLI entry point
tests/
benchmarks/
examples/
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file forge_ml_serve-0.2.0.tar.gz.
File metadata
- Download URL: forge_ml_serve-0.2.0.tar.gz
- Upload date:
- Size: 20.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
edc7f0b57d736baf1e022761e9f6d104ee2f92869c6bca06cafa831f86fc5774
|
|
| MD5 |
ef86d58069e14ee982cb6d3fe1421b02
|
|
| BLAKE2b-256 |
4be6dfac6aadc014747d80b2ee0aaf464fe6aade5868c2c25f190360fe5af146
|
File details
Details for the file forge_ml_serve-0.2.0-py3-none-any.whl.
File metadata
- Download URL: forge_ml_serve-0.2.0-py3-none-any.whl
- Upload date:
- Size: 19.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
da862e3e282bc09a47d686460b6cbd116ddbb97552956ab808340e6311ffd8b6
|
|
| MD5 |
5b35e97ede73ea4c44e3e16589287815
|
|
| BLAKE2b-256 |
1fedb2bc408eb69f2183ea9358793eb56c48e77d1f9e5ce3672f992d5a419d72
|