Robotics-aware inference orchestration on top of Ray Serve
Project description
Inferential
Multi-client inference orchestration on top of Ray Serve.
Inferential sits between your clients and your ML models. It receives observations over ZMQ, schedules inference requests using cadence-aware priority scoring, dispatches to Ray Serve, and streams results back — all with sub-millisecond transport overhead. Built for any scenario where multiple clients need concurrent access to shared models: robotics fleets, game agents, IoT devices, real-time ML pipelines.
Features
- ZMQ transport — ROUTER/DEALER sockets with automatic reconnection and zero-copy tensor payloads
- Pluggable schedulers — Deadline-aware (default), batch-optimized, priority-tiered, round-robin
- Cadence learning — EMA-based tracking of each client's request pattern to predict urgency
- Protobuf wire protocol — Typed tensor metadata (dtype, shape, encoding) with binary payload
- In-memory metrics — Ring-buffer storage with label filtering and percentile stats (p50/p95/p99)
- Flexible client IDs — String or integer, your choice
- Lightweight client SDK — No Ray dependency; just
pyzmq,protobuf, andnumpy
Install
# Client SDK only (robot side)
pip install inferential
# Server with Ray Serve
pip install inferential[server]
# Development
pip install inferential[dev]
Quick Start
Server
import asyncio
from inferential import Server
server = Server(bind="tcp://*:5555", models=["policy-v2"])
server.use_scheduler("deadline_aware")
@server.on_metric
def log_latency(name, value, labels):
if name == "inference_latency_ms":
print(f"Client {labels.get('client')}: {value:.1f}ms")
asyncio.run(server.run())
Client
import numpy as np
from inferential import Connection
conn = Connection(server="tcp://localhost:5555", client_id="agent-01", client_type="sensor")
model = conn.model("policy-v2", latency_budget_ms=30.0)
# Send observation
readings = np.array([0.0, -0.78, 0.0, -2.35, 0.0, 1.57, 0.78], dtype=np.float32)
model.observe(
urgency=0.8,
sensor_readings=readings,
prompt="classify anomaly", # strings go to metadata
)
# Get action
result = model.get_result(timeout_ms=50)
if result is not None:
output = result["actions"] # np.ndarray
Observation API
model.observe() accepts any combination of keyword arguments:
np.ndarrayvalues are serialized as tensors with full dtype/shape metadatastrvalues are passed as metadata key-value pairsurgency(float, 0.0–1.0) — hints how time-critical this request issteps_remaining(int, optional) — remaining steps in the current trajectory
# Multi-modal observation
model.observe(
urgency=0.5,
steps_remaining=120,
state_vector=np.zeros(7, dtype=np.float32),
image=np.zeros((3, 224, 224), dtype=np.uint8),
prompt="describe the scene",
)
Schedulers
Four built-in strategies, selectable via config or at runtime:
| Strategy | Description |
|---|---|
deadline_aware |
Weighted scoring: cadence, urgency, priority, age (default) |
batch_optimized |
Groups requests per model, flushes on size or time |
priority_tiered |
Strict priority tiers, FIFO within each tier |
round_robin |
Fair rotation across clients |
from inferential.scheduler.base import create_scheduler
from inferential.config.schema import SchedulingConfig
# Standalone usage — no server needed
scheduler = create_scheduler(SchedulingConfig(strategy="batch_optimized"))
scheduler.submit(request)
batch = scheduler.next_batch()
The server wraps this and lets you swap strategies at runtime:
server.use_scheduler("deadline_aware", cadence_weight=50.0, urgency_weight=30.0)
Custom Scoring Policy
Override how the deadline-aware scheduler scores requests:
from inferential import register_policy, InferenceRequest
@register_policy("safety_first")
def score(req: InferenceRequest) -> float:
if req.urgency > 0.9:
return 1000.0
return req.priority * 10.0
server.use_scheduler("deadline_aware", policy="safety_first")
Custom Scheduler
Implement the Scheduler interface and register it:
from inferential.scheduler.base import Scheduler, register_scheduler
@register_scheduler("my_scheduler")
class MyScheduler(Scheduler):
def __init__(self, config):
self._queue = []
def submit(self, request):
self._queue.append(request)
def next_batch(self):
batch, self._queue = self._queue[:8], self._queue[8:]
return batch
def tick(self):
pass
def status(self):
return {"queue_len": len(self._queue)}
def queue_len(self):
return len(self._queue)
server.use_scheduler("my_scheduler")
Metrics
# Query stats over a time window
stats = server.metrics.get_stats("inference_latency_ms", window_seconds=60)
print(f"p95 latency: {stats.p95:.1f}ms")
# Latest value
depth = server.metrics.get_latest("queue_depth")
# Full snapshot of all metrics
snapshot = server.metrics.snapshot()
Tracked metrics: inference_latency_ms, observation_staleness_ms, payload_size_bytes, queue_depth, queue_full_drops, dispatch_errors, dispatch_retries, requests_expired, active_clients, observation_errors.
Queue Management
The scheduler queue supports TTL, overflow policies, and dispatch retry:
| Parameter | Default | Description |
|---|---|---|
request_ttl_ms |
5000 | Drop queued requests older than this (avoids wasting GPU on stale data) |
overflow_policy |
"drop_oldest" |
What to do when the queue is full: "drop_oldest" evicts the stalest request, "reject_newest" drops the incoming one |
max_retries |
0 | Re-queue failed dispatches up to N times before dropping |
server.use_scheduler(
"deadline_aware",
request_ttl_ms=2000, # 2s TTL
overflow_policy="drop_oldest", # keep fresh data, discard stale
max_retries=2, # retry transient failures twice
)
Configuration
from inferential.config import InferentialConfig
config = InferentialConfig(
transport={"bind": "tcp://*:5555", "recv_hwm": 2000},
scheduling={
"strategy": "deadline_aware",
"max_queue_size": 500,
"request_ttl_ms": 3000,
"overflow_policy": "drop_oldest",
"max_retries": 1,
},
clients={
"defaults": {"latency_budget_ms": 50.0, "priority": 1},
"known": [
{"id": "agent-01", "model": "policy-v2", "latency_budget_ms": 30.0, "priority": 2},
],
"accept_unknown": True,
},
response_tracking={"cadence_alpha": 0.3, "disconnect_timeout_s": 10.0},
metrics={"ring_buffer_size": 10000},
)
server = Server(config=config)
Wire Protocol
Messages use protobuf serialization over ZMQ multipart frames:
[identity | "" | envelope (protobuf) | payload (binary tensors)]
- Observation (client -> server): client info, model ID, urgency, tensor descriptors, metadata
- ModelResponse (server -> client): response ID, inference latency, output tensors, metadata
Supported tensor dtypes: float16, float32, float64, bfloat16, uint8, int32, int64, bool.
Supported encodings: raw, jpeg, png.
Development
# Generate protobuf code
make proto
# Run tests
make test
# Lint
make lint
License
Apache-2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file inferential-0.1.0.tar.gz.
File metadata
- Download URL: inferential-0.1.0.tar.gz
- Upload date:
- Size: 430.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
16949008a619d343a19393230fb95f1d5b4b3907eb8b4b53133981ec9dd32997
|
|
| MD5 |
61ceac349e42cf72ef7a5cbd4de33a2c
|
|
| BLAKE2b-256 |
338de9afc44e2bcc5d144a05c15b6cad14edbcaead630de7bf8ebd43c9c16b96
|
File details
Details for the file inferential-0.1.0-py3-none-any.whl.
File metadata
- Download URL: inferential-0.1.0-py3-none-any.whl
- Upload date:
- Size: 38.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3c2c8ad42c2175a4b244088a1ae139e4fdaec658c0d8e7a83f411e2dad69df83
|
|
| MD5 |
a2cedf7044d02201962307c0ea72f4cd
|
|
| BLAKE2b-256 |
02ac01965f605da84c7e8cc06254a8c030461cc2755ace67a89547037fa0b3ee
|