Skip to main content

Robotics-aware inference orchestration on top of Ray Serve

Project description

Inferential

Multi-client inference orchestration on top of Ray Serve.

Inferential sits between your clients and your ML models. It receives observations over ZMQ, schedules inference requests using cadence-aware priority scoring, dispatches to Ray Serve, and streams results back — all with sub-millisecond transport overhead. Built for any scenario where multiple clients need concurrent access to shared models: robotics fleets, game agents, IoT devices, real-time ML pipelines.

Features

  • ZMQ transport — ROUTER/DEALER sockets with automatic reconnection and zero-copy tensor payloads
  • Pluggable schedulers — Deadline-aware (default), batch-optimized, priority-tiered, round-robin
  • Cadence learning — EMA-based tracking of each client's request pattern to predict urgency
  • Protobuf wire protocol — Typed tensor metadata (dtype, shape, encoding) with binary payload
  • In-memory metrics — Ring-buffer storage with label filtering and percentile stats (p50/p95/p99)
  • Flexible client IDs — String or integer, your choice
  • Lightweight client SDK — No Ray dependency; just pyzmq, protobuf, and numpy

Install

# Client SDK only (robot side)
pip install inferential

# Server with Ray Serve
pip install inferential[server]

# Development
pip install inferential[dev]

Quick Start

Server

import asyncio
from inferential import Server

server = Server(bind="tcp://*:5555", models=["policy-v2"])
server.use_scheduler("deadline_aware")

@server.on_metric
def log_latency(name, value, labels):
    if name == "inference_latency_ms":
        print(f"Client {labels.get('client')}: {value:.1f}ms")

asyncio.run(server.run())

Client

import numpy as np
from inferential import Connection

conn = Connection(server="tcp://localhost:5555", client_id="agent-01", client_type="sensor")
model = conn.model("policy-v2", latency_budget_ms=30.0)

# Send observation
readings = np.array([0.0, -0.78, 0.0, -2.35, 0.0, 1.57, 0.78], dtype=np.float32)
model.observe(
    urgency=0.8,
    sensor_readings=readings,
    prompt="classify anomaly",  # strings go to metadata
)

# Get action
result = model.get_result(timeout_ms=50)
if result is not None:
    output = result["actions"]  # np.ndarray

Observation API

model.observe() accepts any combination of keyword arguments:

  • np.ndarray values are serialized as tensors with full dtype/shape metadata
  • str values are passed as metadata key-value pairs
  • urgency (float, 0.0–1.0) — hints how time-critical this request is
  • steps_remaining (int, optional) — remaining steps in the current trajectory
# Multi-modal observation
model.observe(
    urgency=0.5,
    steps_remaining=120,
    state_vector=np.zeros(7, dtype=np.float32),
    image=np.zeros((3, 224, 224), dtype=np.uint8),
    prompt="describe the scene",
)

Schedulers

Four built-in strategies, selectable via config or at runtime:

Strategy Description
deadline_aware Weighted scoring: cadence, urgency, priority, age (default)
batch_optimized Groups requests per model, flushes on size or time
priority_tiered Strict priority tiers, FIFO within each tier
round_robin Fair rotation across clients
from inferential.scheduler.base import create_scheduler
from inferential.config.schema import SchedulingConfig

# Standalone usage — no server needed
scheduler = create_scheduler(SchedulingConfig(strategy="batch_optimized"))
scheduler.submit(request)
batch = scheduler.next_batch()

The server wraps this and lets you swap strategies at runtime:

server.use_scheduler("deadline_aware", cadence_weight=50.0, urgency_weight=30.0)

Custom Scoring Policy

Override how the deadline-aware scheduler scores requests:

from inferential import register_policy, InferenceRequest

@register_policy("safety_first")
def score(req: InferenceRequest) -> float:
    if req.urgency > 0.9:
        return 1000.0
    return req.priority * 10.0

server.use_scheduler("deadline_aware", policy="safety_first")

Custom Scheduler

Implement the Scheduler interface and register it:

from inferential.scheduler.base import Scheduler, register_scheduler

@register_scheduler("my_scheduler")
class MyScheduler(Scheduler):
    def __init__(self, config):
        self._queue = []

    def submit(self, request):
        self._queue.append(request)

    def next_batch(self):
        batch, self._queue = self._queue[:8], self._queue[8:]
        return batch

    def tick(self):
        pass

    def status(self):
        return {"queue_len": len(self._queue)}

    def queue_len(self):
        return len(self._queue)

server.use_scheduler("my_scheduler")

Metrics

# Query stats over a time window
stats = server.metrics.get_stats("inference_latency_ms", window_seconds=60)
print(f"p95 latency: {stats.p95:.1f}ms")

# Latest value
depth = server.metrics.get_latest("queue_depth")

# Full snapshot of all metrics
snapshot = server.metrics.snapshot()

Tracked metrics: inference_latency_ms, observation_staleness_ms, payload_size_bytes, queue_depth, queue_full_drops, dispatch_errors, dispatch_retries, requests_expired, active_clients, observation_errors.

Queue Management

The scheduler queue supports TTL, overflow policies, and dispatch retry:

Parameter Default Description
request_ttl_ms 5000 Drop queued requests older than this (avoids wasting GPU on stale data)
overflow_policy "drop_oldest" What to do when the queue is full: "drop_oldest" evicts the stalest request, "reject_newest" drops the incoming one
max_retries 0 Re-queue failed dispatches up to N times before dropping
server.use_scheduler(
    "deadline_aware",
    request_ttl_ms=2000,           # 2s TTL
    overflow_policy="drop_oldest", # keep fresh data, discard stale
    max_retries=2,                 # retry transient failures twice
)

Configuration

from inferential.config import InferentialConfig

config = InferentialConfig(
    transport={"bind": "tcp://*:5555", "recv_hwm": 2000},
    scheduling={
        "strategy": "deadline_aware",
        "max_queue_size": 500,
        "request_ttl_ms": 3000,
        "overflow_policy": "drop_oldest",
        "max_retries": 1,
    },
    clients={
        "defaults": {"latency_budget_ms": 50.0, "priority": 1},
        "known": [
            {"id": "agent-01", "model": "policy-v2", "latency_budget_ms": 30.0, "priority": 2},
        ],
        "accept_unknown": True,
    },
    response_tracking={"cadence_alpha": 0.3, "disconnect_timeout_s": 10.0},
    metrics={"ring_buffer_size": 10000},
)

server = Server(config=config)

Wire Protocol

Messages use protobuf serialization over ZMQ multipart frames:

[identity | "" | envelope (protobuf) | payload (binary tensors)]
  • Observation (client -> server): client info, model ID, urgency, tensor descriptors, metadata
  • ModelResponse (server -> client): response ID, inference latency, output tensors, metadata

Supported tensor dtypes: float16, float32, float64, bfloat16, uint8, int32, int64, bool. Supported encodings: raw, jpeg, png.

Development

# Generate protobuf code
make proto

# Run tests
make test

# Lint
make lint

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

inferential-0.1.0.tar.gz (430.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

inferential-0.1.0-py3-none-any.whl (38.6 kB view details)

Uploaded Python 3

File details

Details for the file inferential-0.1.0.tar.gz.

File metadata

  • Download URL: inferential-0.1.0.tar.gz
  • Upload date:
  • Size: 430.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for inferential-0.1.0.tar.gz
Algorithm Hash digest
SHA256 16949008a619d343a19393230fb95f1d5b4b3907eb8b4b53133981ec9dd32997
MD5 61ceac349e42cf72ef7a5cbd4de33a2c
BLAKE2b-256 338de9afc44e2bcc5d144a05c15b6cad14edbcaead630de7bf8ebd43c9c16b96

See more details on using hashes here.

File details

Details for the file inferential-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: inferential-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 38.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for inferential-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3c2c8ad42c2175a4b244088a1ae139e4fdaec658c0d8e7a83f411e2dad69df83
MD5 a2cedf7044d02201962307c0ea72f4cd
BLAKE2b-256 02ac01965f605da84c7e8cc06254a8c030461cc2755ace67a89547037fa0b3ee

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page