WebSocket relay for real-time token streaming from batch HPC executors
Project description
streamrelay
Real-time token streaming from batch HPC executors via WebSocket relay.
New here? Start with the full tutorial — deploy the relay, write a producer on your HPC node, consume tokens in your app, add encryption. All in one place.
The problem
HPC batch systems execute jobs to completion and return a single result. When that job is an LLM inference request, the user stares at a blank screen for the full generation time — often 15–20 seconds — before seeing any output.
streamrelay solves this with a dual-channel architecture:
- Control plane (unchanged): your existing execution framework — a SLURM or PBS job script, an SSH command, a Globus Compute function call — handles job submission and authentication exactly as before.
- Data plane (streamrelay): a lightweight WebSocket relay through which the compute node streams tokens back in real time as the GPU generates them.
Both the compute node (producer) and your application (consumer) connect outbound to the relay. Neither side accepts an inbound connection — no firewall exceptions, no VPN, no tunnels required.
Your application Relay server HPC compute node
──────────────── ──────────── ────────────────
1. Submit job via 2. Both connect 3. Job starts
SLURM / PBS / outbound here 4. Tokens stream
Globus Compute / (this is streamrelay) to relay →
SSH / anything ◄─────────────────────────────────────────────────
5. Tokens arrive,
first one in < 1s
Measured in the STREAM system at UIC: 0.85 s median time-to-first-token from HPC with streaming, vs. 15.68 s in batch mode.
Installation
pip install streamrelay
Optional — Globus Compute integration (adds StreamingExecutor):
pip install streamrelay[globus]
Requires Python ≥ 3.11.
Quick start
1. Start the relay server
Run this on any machine with a public IP — a small cloud VM, a campus server, or your laptop with a Cloudflare tunnel for development:
streamrelay --host 0.0.0.0 --port 8765 --secret my-shared-secret
Development shortcut (no public server needed):
streamrelay --port 8765 &
cloudflared tunnel --url http://localhost:8765 # gives you a public wss:// URL
2. On the HPC compute node — producer
Inside your job script or remote function, send tokens as your model generates them:
from streamrelay import RelayProducer
# relay_url and channel_id are passed in as job arguments or env vars
with RelayProducer(relay_url, channel_id, relay_secret="my-shared-secret") as relay:
for token in your_model.stream(prompt):
relay.send_token(token)
# done signal is sent automatically on exit
This works in any execution context: a SLURM batch script, a PBS job, a plain SSH command, a Globus Compute function, or any subprocess.
3. On your application — consumer
import uuid
from streamrelay import RelayConsumer
channel_id = str(uuid.uuid4()) # generate before submitting the job
# submit the job here — SLURM, PBS, Globus Compute, SSH — your choice
# pass relay_url and channel_id to the job as arguments or env vars
for token in RelayConsumer(relay_url, channel_id, relay_secret="my-shared-secret").stream():
print(token, end="", flush=True)
Async version for FastAPI or other asyncio applications:
async for token in RelayConsumer(relay_url, channel_id):
yield f"data: {token}\n\n" # forward as Server-Sent Events to a browser
Example: SLURM job
Submitter (your laptop or login node):
# submit.py
import subprocess, uuid
from streamrelay import RelayConsumer
relay_url = "wss://your-relay.example.com"
channel_id = str(uuid.uuid4())
subprocess.run([
"sbatch",
f"--export=ALL,RELAY_URL={relay_url},CHANNEL_ID={channel_id}",
"inference_job.sh",
])
for token in RelayConsumer(relay_url, channel_id).stream():
print(token, end="", flush=True)
Job script (SLURM compute node):
#!/bin/bash
#SBATCH --partition=gpu --gres=gpu:1
python - <<'EOF'
import os
from streamrelay import RelayProducer
with RelayProducer(os.environ["RELAY_URL"], os.environ["CHANNEL_ID"]) as relay:
for token in your_model.stream(prompt):
relay.send_token(token)
EOF
Globus Compute integration
Globus Compute is a federated function execution service that dispatches Python functions to remote HPC endpoints (which themselves run on SLURM or PBS clusters). Because Globus Compute returns a single result when the function completes, it has no native mechanism for streaming incremental output. streamrelay adds that capability.
pip install streamrelay[globus]
StreamingExecutor wraps any Globus Compute function and streams its output:
from streamrelay import StreamingExecutor
async with StreamingExecutor(
endpoint_id="your-globus-endpoint-uuid",
relay_url="wss://your-relay.example.com",
relay_secret="my-shared-secret",
) as executor:
async for token in executor.stream(my_inference_fn, prompt="Explain quantum entanglement"):
print(token, end="", flush=True)
Your remote function receives relay_url and channel_id as keyword arguments
automatically:
def my_inference_fn(prompt, relay_url, channel_id, relay_secret=""):
# all imports must be inline — Globus Compute serializes this function
from streamrelay import RelayProducer
with RelayProducer(relay_url, channel_id, relay_secret=relay_secret) as relay:
for token in call_vllm_streaming(prompt):
relay.send_token(token)
End-to-end encryption
By default, the relay server can see the token payloads it forwards. For sensitive workloads (medical, financial, or personal data), enable AES-256-GCM end-to-end encryption. The relay then forwards opaque ciphertext and cannot read the content.
Generate a key once and store it in your .env:
python -c "from streamrelay import generate_key; print(generate_key())"
Pass the same key to both producer and consumer:
# Producer (HPC node)
with RelayProducer(relay_url, channel_id, encryption_key=KEY) as relay:
relay.send_token(token)
# Consumer (your application)
for token in RelayConsumer(relay_url, channel_id, encryption_key=KEY).stream():
print(token, end="", flush=True)
Each message uses a fresh random 12-byte nonce. The GCM authentication tag detects any tampering in transit.
Security model
streamrelay enforces three independent security layers:
Layer 1 — Transport encryption (TLS)
Deploy the relay behind a TLS-terminating reverse proxy (Caddy, nginx) so all
connections use wss:// (WebSocket over TLS). This encrypts traffic between each
client and the relay server. See docs/deployment.md for a
Caddy setup with auto-provisioned Let's Encrypt certificates.
Layer 2 — Access control (shared secret)
Start the relay with --secret MY_SECRET. Every producer and consumer must supply
the same value as a query parameter (?secret=MY_SECRET). Connections without the
correct secret are rejected at the WebSocket handshake before any channel state is
created.
# Server
streamrelay --port 8765 --secret MY_SECRET
# Producer (HPC node) — same secret
with RelayProducer(relay_url, channel_id, relay_secret="MY_SECRET") as relay: ...
# Consumer (your app) — same secret
RelayConsumer(relay_url, channel_id, relay_secret="MY_SECRET").stream()
How to share the secret with the HPC node: pass it as a job argument, an environment variable in your SLURM/PBS script, or as a keyword argument to your Globus Compute function. It does not need to be embedded in code:
# SLURM — pass via --export
sbatch --export=ALL,RELAY_URL=wss://...,RELAY_SECRET=MY_SECRET job.sh
# Globus Compute — inject as a kwarg
executor.submit(my_fn, relay_url=relay_url, relay_secret=MY_SECRET, ...)
In addition to the shared secret, each request uses a unique UUID channel ID (122 bits of entropy). Even if an attacker knows the relay address, guessing a valid channel ID is computationally infeasible. The relay holds no persistent state — all channel state is discarded once both sides disconnect. No OAuth2 credentials or user identity information traverse the relay at any point.
Layer 3 — End-to-end payload encryption (AES-256-GCM)
TLS protects the link to the relay, but the relay operator can still see plaintext token payloads. For sensitive workloads (medical, financial, or personal data), enable AES-256-GCM end-to-end encryption. The relay then forwards opaque ciphertext and cannot read the content.
Generate a key once and store it securely (e.g., in your .env):
python -c "from streamrelay import generate_key; print(generate_key())"
# Outputs a base64-encoded 32-byte key, e.g.: xK3mP9vQ2rL...
Pass the same key to both producer and consumer:
KEY = os.getenv("RELAY_ENCRYPTION_KEY")
# Producer (HPC node)
with RelayProducer(relay_url, channel_id, encryption_key=KEY) as relay:
relay.send_token(token)
# Consumer (your app)
for token in RelayConsumer(relay_url, channel_id, encryption_key=KEY).stream():
print(token, end="", flush=True)
Each message is encrypted with a fresh random 12-byte nonce (per NIST SP
800-38D). The GCM authentication tag detects any tampering in transit — if the relay
or any intermediary modifies a message, decryption raises an InvalidTag exception
rather than silently returning corrupted data. Encryption is opt-in and
backward-compatible: an unencrypted consumer connecting to an encrypted producer
will receive ciphertext it cannot parse, but no silent data corruption occurs.
Summary
| Layer | Mechanism | Protects against | How to enable |
|---|---|---|---|
TLS (wss://) |
Reverse proxy (Caddy) | Network eavesdropping | Deploy behind Caddy/nginx |
| Shared secret | WebSocket handshake | Unauthorized connections | --secret flag on server |
| AES-256-GCM | Per-message encryption | Relay operator reading payloads | encryption_key= on producer + consumer |
| UUID channel isolation | 122-bit random ID | Channel collision / guessing | Always on |
See docs/deployment.md for a production deployment guide (cloud VM + Caddy + systemd).
Relay protocol
All messages are JSON strings. The relay forwards them without interpretation:
{"type": "token", "content": "Hello"} ← one text chunk
{"type": "done", "usage": {...}} ← generation complete
{"type": "error", "message": "..."} ← something went wrong
When encryption is enabled, each message is wrapped before transmission:
{"type": "enc", "d": "<base64(nonce + ciphertext + GCM tag)>"}
API reference
RelayProducer
Runs on the HPC compute node. Connects outbound to the relay and sends tokens.
from streamrelay import RelayProducer
# Synchronous — use inside SLURM jobs, PBS scripts, Globus Compute functions
with RelayProducer(
relay_url, # str: "wss://relay.example.com" or "ws://localhost:8765"
channel_id, # str: uuid.uuid4() generated before submitting the job
relay_secret="", # str: must match --secret on the relay server
encryption_key="", # str: base64 AES-256 key from generate_key(); "" = no encryption
) as relay:
relay.send_token("Hello") # send one text chunk
relay.send_token(" world")
# send_done() called automatically when the with block exits normally
# send_error() called automatically if an exception is raised inside the block
# Asynchronous — use when your code already runs in an asyncio event loop
async with RelayProducer(relay_url, channel_id) as relay:
await relay._async_send_raw({"type": "token", "content": "Hello"})
Explicit methods (when not using the context manager):
p = RelayProducer(relay_url, channel_id)
p.connect() # open the synchronous WebSocket
p.send_token("chunk") # send a token
p.send_done(usage={"total_tokens": 50}) # signal completion with optional usage stats
p.send_error("something broke") # report an error (also sends done)
p.close() # close the connection
RelayConsumer
Runs on your application side. Connects outbound to the relay and yields tokens.
from streamrelay import RelayConsumer
consumer = RelayConsumer(
relay_url, # str: same relay URL as the producer
channel_id, # str: same channel_id passed to RelayProducer
relay_secret="", # str: same secret as the producer
encryption_key="", # str: same encryption key as the producer
)
# --- Synchronous iteration (CLI scripts, Jupyter notebooks) ---
for token in consumer.stream():
print(token, end="", flush=True)
# --- Asynchronous iteration (FastAPI, aiohttp, any asyncio application) ---
async for token in consumer: # uses __aiter__ → astream()
yield f"data: {token}\n\n" # forward as Server-Sent Events
# --- Collect the full response as a single string ---
text = consumer.collect() # blocking
text = await consumer.acollect() # async
Connect the consumer before (or at the same time as) submitting the HPC job. Any tokens that arrive before you connect are buffered by the relay (default 1,000 messages) and flushed when you connect — you will not miss the beginning of the response.
start_relay / streamrelay CLI
Start the relay server — run this once on any machine with a public IP.
# CLI
streamrelay --host 0.0.0.0 --port 8765 --secret MY_SECRET
# All options:
streamrelay --help
# --host HOST bind address (default: 0.0.0.0)
# --port PORT port to listen on (default: 8765)
# --secret SECRET shared auth secret; also reads RELAY_SECRET env var
# --max-buffer N max buffered messages per channel (default: 1000)
# --channel-timeout N seconds before abandoned channels are reaped (default: 300)
# --log-level LEVEL DEBUG / INFO / WARNING / ERROR (default: INFO)
# Python API — embed the relay inside an existing asyncio application
import asyncio
from streamrelay import start_relay
asyncio.run(start_relay(
host="0.0.0.0",
port=8765,
secret="MY_SECRET",
max_buffer=1000,
channel_timeout=300,
))
Health check — the relay exposes /health (no auth required):
import asyncio, websockets, json
async def check(relay_url):
async with websockets.connect(f"{relay_url}/health") as ws:
status = json.loads(await ws.recv())
print(status) # {"status": "healthy", "active_channels": 0, "timestamp": "..."}
asyncio.run(check("wss://relay.example.com"))
generate_key
from streamrelay import generate_key
key = generate_key() # base64-encoded 32-byte AES-256 key
print(key) # e.g. "xK3mP9vQ2rL8nJ6w..."
# Store in .env as RELAY_ENCRYPTION_KEY=<key>
# Pass the same key to both RelayProducer and RelayConsumer
Or from the shell:
python -c "from streamrelay import generate_key; print(generate_key())"
StreamingExecutor (Globus Compute)
High-level wrapper for Globus Compute users. Handles channel ID generation, function submission with relay coordinates injected, and relay consumption.
from streamrelay import StreamingExecutor
async with StreamingExecutor(
endpoint_id="your-globus-endpoint-uuid",
relay_url="wss://relay.example.com",
relay_secret="MY_SECRET",
encryption_key="", # optional AES-256 key
consumer_timeout=300.0, # seconds to wait for first token
) as executor:
async for token in executor.stream(my_inference_fn, prompt="Hello"):
print(token, end="", flush=True)
Your remote function automatically receives relay_url, channel_id, and
optionally relay_secret / encryption_key as extra kwargs:
def my_inference_fn(prompt, relay_url, channel_id, relay_secret="", encryption_key=""):
# All imports must be inline — Globus Compute serializes only the function body
from streamrelay import RelayProducer # if streamrelay is installed on the endpoint
with RelayProducer(relay_url, channel_id, relay_secret=relay_secret) as relay:
for token in call_vllm_streaming(prompt):
relay.send_token(token)
return "ok"
If streamrelay is not installed on the endpoint workers, use the inline producer
pattern from docs/tutorial.md (Pattern B / Pattern C) — it requires
only websockets and cryptography, which are available on most HPC environments.
Troubleshooting
Consumer hangs and never receives any tokens
- Check the relay is reachable from both sides:
ws://your-relay:8765/health - Check the
channel_idmatches exactly between producer and consumer — a mismatch means they connect to different channels and never find each other - Check the
relay_secretmatches — a wrong secret is rejected at handshake with WebSocket close code 4003; catch with"4003" in str(e) - Check the producer actually ran — if the Globus job failed before connecting, the consumer waits until the channel timeout (default 5 minutes)
ConnectionRefusedError or ConnectionClosedError
- Relay server is not running, or the URL/port is wrong
- For
wss://connections: the TLS certificate must be valid (use Caddy or Let's Encrypt) - For development: use
ws://(unencrypted) with a local relay + Cloudflare tunnel for the public URL
InvalidTag when decrypting
- The
encryption_keydoes not match between producer and consumer — generate once and store in both environments:python -c "from streamrelay import generate_key; print(generate_key())"
Tokens arrive out of order
- The relay forwards messages in arrival order — this should not happen
- If using the buffering path (producer connects first), messages are flushed in FIFO order
streamrelay command not found after pip install
- The
streamrelayCLI is installed into your Python environment's bin directory - Activate your virtual environment first, or use
python -m streamrelay.server
ModuleNotFoundError: No module named 'streamrelay' on the HPC node
- The HPC endpoint workers may not have
streamrelayinstalled - Use the inline producer pattern (no install needed): see Pattern B in docs/tutorial.md
Documentation
| Guide | What it covers |
|---|---|
| docs/tutorial.md | Start here. Zero-to-streaming walkthrough: deploy relay, three producer patterns (pip install / inline / Globus Compute exec), consumer patterns, passing credentials to HPC jobs, E2E encryption, production checklist |
| docs/deployment.md | Relay server deployment: Cloudflare tunnel, VM + Caddy + systemd, Docker Compose, health monitoring |
| CONTRIBUTING.md | Testing at three levels: unit tests, local end-to-end via Cloudflare, live relay test script |
Citation
If you use streamrelay in your research, please cite the JOSS paper:
@article{nassar2026streamrelay,
title = {{streamrelay}: A {WebSocket} Relay for Real-Time Token Streaming
from Batch {HPC} Executors},
author = {Nassar, Anas and Mohr, Steve and Apanasevich, Leonard and Sharma, Himanshu},
journal = {Journal of Open Source Software},
year = {2026},
doi = {10.21105/joss.TODO},
}
streamrelay was developed as part of the STREAM system:
@inproceedings{nassar2026stream,
title = {{STREAM}: Smart Tiered Routing Engine for {AI} Models},
author = {Nassar, Anas and Mohr, Steve and Apanasevich, Leonard and Sharma, Himanshu},
booktitle = {Proceedings of PEARC '26},
year = {2026},
}
License
Apache 2.0 — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamrelay-0.1.0.tar.gz.
File metadata
- Download URL: streamrelay-0.1.0.tar.gz
- Upload date:
- Size: 576.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4f3503397aa9002a86322607d3a3e1ff565ef22a4637a70e9d3a104e52cea18c
|
|
| MD5 |
19fcb336de3c6fdbbd149c158e5aef85
|
|
| BLAKE2b-256 |
bdbcc168b71349d3f5444f0e2d7177853232adb7faf7b32425bfe968807ec8f5
|
File details
Details for the file streamrelay-0.1.0-py3-none-any.whl.
File metadata
- Download URL: streamrelay-0.1.0-py3-none-any.whl
- Upload date:
- Size: 34.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
69c438b8116ad4f477a75af75913cef7534865439135736b1174012cb3cd80da
|
|
| MD5 |
78078cd4f8af1ce0cf6dc129efdf4ca2
|
|
| BLAKE2b-256 |
0c55c462e4e81349400bd521892b324385aee5ced83ce17e6e47cc959ba5ac29
|