High-performance Rust prediction server for Cog ML models
Project description
coglet-python
PyO3 bindings that bridge the Rust coglet library to Python. This crate implements
the PredictHandler trait by wrapping Python predictor classes.
Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ coglet-python │
│ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ lib.rs │ │
│ │ Python module: serve(), active(), _run_worker(), _is_cancelable() │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────┼───────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌──────────────────┐ │
│ │ worker_bridge.rs │ │ predictor.rs │ │ log_writer.rs │ │
│ │ PredictHandler │ │ PythonPredictor │ │ SlotLogWriter │ │
│ │ impl for Python │ │ load/setup/predict │ │ ContextVar │ │
│ └─────────────────────┘ └─────────────────────┘ └──────────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌──────────────────┐ │
│ │ input.rs │ │ output.rs │ │ audit.rs │ │
│ │ Pydantic/ADT │ │ JSON serialization │ │ TeeWriter │ │
│ │ input processing │ │ make_encodeable │ │ stream protect │ │
│ └─────────────────────┘ └─────────────────────┘ └──────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ cancel.rs │ │
│ │ SIGUSR1 handling, CancelableGuard, KeyboardInterrupt injection │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Directory Structure
coglet-python/
├── Cargo.toml
├── coglet.pyi # Type stubs for Python IDE support
└── src/
├── lib.rs # Python module definition, serve/active/_run_worker
├── predictor.rs # PythonPredictor: wraps Python Predictor class
├── worker_bridge.rs # PythonPredictHandler: implements PredictHandler
├── input.rs # Input processing (Pydantic validation, ADT)
├── output.rs # Output processing (make_encodeable, upload_files)
├── log_writer.rs # SlotLogWriter, ContextVar routing, SetupLogSender
├── audit.rs # Audit hook, TeeWriter for stream protection
└── cancel.rs # Cancellation: SIGUSR1, CancelableGuard
Critical Concepts
The active() Flag
import coglet
if coglet.active():
# Running inside worker subprocess
# stdout/stderr are captured, print goes to slot routing
else:
# Running in parent or standalone
# Normal stdout/stderr behavior
Set to True at the start of _run_worker(). Used by user code and cog internals
to detect worker context.
Single Async Event Loop
Async predictors run on a single Python asyncio event loop, created at worker startup. All slots share this loop.
Worker Subprocess
┌─────────────────────────────────────────────────────────────┐
│ Tokio Runtime (Rust) │
│ └─ run_worker event loop │
│ └─ For each SlotRequest::Predict: │
│ └─ tokio::spawn prediction task │
│ └─ Python::attach (acquire GIL) │
│ └─ asyncio.run_coroutine_threadsafe() │
│ └─ Predictor.predict() coroutine │
└─────────────────────────────────────────────────────────────┘
asyncio event loop (Python)
┌─────────────────────────────────────────────────────────────┐
│ Single event loop, started once at worker init │
│ - concurrent.futures.Future per async prediction │
│ - ContextVar propagates prediction_id to spawned tasks │
│ - Cancellation via future.cancel() │
└─────────────────────────────────────────────────────────────┘
Why single loop?
- Python asyncio has one event loop per thread
- We use
run_coroutine_threadsafeto submit from Rust/Tokio - Multiple slots can have concurrent predictions (up to
max_concurrency)
Prediction Execution
Sync Predictors:
SlotRequest::Predict arrives
│
├─▶ Python::attach (acquire GIL)
├─▶ set_sync_prediction_id(id) # For log routing
├─▶ predictor.predict(input) # Blocking call
├─▶ set_sync_prediction_id(None)
└─▶ Return PredictResult
Async Predictors:
SlotRequest::Predict arrives
│
├─▶ Python::attach (acquire GIL)
├─▶ Create wrapped coroutine:
│ async def _ctx_wrapper(coro, prediction_id, contextvar):
│ contextvar.set(prediction_id) # Set in this task's context
│ return await coro
│
├─▶ asyncio.run_coroutine_threadsafe(wrapper, loop)
├─▶ py.detach() (release GIL)
├─▶ future.result() (block Rust task, Python runs)
└─▶ Return PredictResult
STDOUT/STDERR Routing
All output from user code must be captured and routed through the slot socket.
Architecture:
sys.stdout = SlotLogWriter(stdout)
sys.stderr = SlotLogWriter(stderr)
SlotLogWriter.write(data)
│
├─▶ Get current prediction_id from:
│ 1. SYNC_PREDICTION_ID static (for sync predictors)
│ 2. ContextVar (for async predictors/spawned tasks)
│
├─▶ Look up SlotSender in PREDICTION_REGISTRY
│
└─▶ Route:
Found sender → slot_sender.send_log(source, data)
No sender → Check setup sender (during setup)
Neither → Log as orphan to stderr
Line Buffering:
SlotLogWriter buffers writes until a newline. This coalesces Python's print()
which does separate writes for content and \n.
Audit Hook Protection
User code might replace sys.stdout:
sys.stdout = open("mylog.txt", "w")
We can't prevent this, but we can intercept it with a Python audit hook.
Strategy: TeeWriter
User replaces sys.stdout
│
├─▶ Audit hook fires on object.__setattr__(sys, "stdout", value)
│
├─▶ Check: is value already SlotLogWriter? → Allow (it's us)
│
├─▶ Check: is value already TeeWriter? → Allow (already wrapped)
│
├─▶ Create TeeWriter(inner=SlotLogWriter, user_stream=value)
│
└─▶ Schedule: sys.stdout = tee (via Timer to avoid recursion)
TeeWriter.write(data)
│
├─▶ inner.write(data) # Our SlotLogWriter (routing works)
└─▶ user_stream.write(data) # User's stream (their code works)
Result: Both our log routing AND the user's stream receive the data.
Cancellation
Sync Predictors:
Parent: ControlRequest::Cancel { slot }
│
├─▶ Worker: handler.cancel(slot)
│ └─▶ Set CANCEL_REQUESTED flag for slot
│
├─▶ Worker: send SIGUSR1 to self
│
└─▶ Signal handler: raise KeyboardInterrupt (if in cancelable region)
Prediction code:
with CancelableGuard(): # Sets CANCELABLE=true
predictor.predict() # Can be interrupted
# CANCELABLE=false on exit
Async Predictors:
Parent: ControlRequest::Cancel { slot }
│
└─▶ Worker: handler.cancel(slot)
│
├─▶ Get future from slot state
└─▶ future.cancel()
│
└─▶ Python raises asyncio.CancelledError
Setup Log Routing
During setup (before any prediction), logs go through the control channel:
worker_bridge.setup()
│
├─▶ register_setup_sender(tx) # Control channel sender
│
├─▶ predictor.load() + predictor.setup()
│ │
│ └─▶ print("Loading model...")
│ │
│ └─▶ SlotLogWriter.write()
│ │
│ ├─▶ No prediction_id (not in prediction)
│ └─▶ get_setup_sender() → ControlResponse::Log
│
└─▶ unregister_setup_sender()
Behaviors
Worker Startup:
set_active()- Mark as worker subprocessinit_tracing()- Configure logging (stderr, COG_LOG env)install_slot_log_writers()- Replace sys.stdout/stderrinstall_audit_hook()- Protect streamsinstall_signal_handler()- SIGUSR1 for cancellation- Read Init message from stdin
- Connect to slot sockets
handler.setup()- Load and initialize predictor- Send Ready message
- Enter event loop
Shutdown:
- ControlRequest::Shutdown → Send ShuttingDown, exit
- stdin closes (parent died) → Exit immediately
- All slots poisoned → Exit
Error Handling:
- SetupError::Load - Failed to import/instantiate predictor
- SetupError::Setup - setup() raised exception
- PredictionError - Prediction failed, slot stays healthy
- Slot write error → Slot poisoned (no more predictions on that slot)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file coglet-0.17.0a3-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.
File metadata
- Download URL: coglet-0.17.0a3-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
- Upload date:
- Size: 3.0 MB
- Tags: CPython 3.10+, manylinux: glibc 2.17+ x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
da3a52efcd6084e0d55fb50788f420e981668a6aa4abaca5081507822dd33d64
|
|
| MD5 |
f2a3edf0902e6b5976e572f04d1b3404
|
|
| BLAKE2b-256 |
e06d85f8e07fce079c923377dcba8439049550df7b05351430733a08950b4f91
|
Provenance
The following attestation bundles were made for coglet-0.17.0a3-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:
Publisher:
release-publish.yaml on replicate/cog
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
coglet-0.17.0a3-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl -
Subject digest:
da3a52efcd6084e0d55fb50788f420e981668a6aa4abaca5081507822dd33d64 - Sigstore transparency entry: 934647166
- Sigstore integration time:
-
Permalink:
replicate/cog@339ef965216eb49b9696fe7ca50b13140725b830 -
Branch / Tag:
refs/tags/v0.17.0-alpha3 - Owner: https://github.com/replicate
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-publish.yaml@339ef965216eb49b9696fe7ca50b13140725b830 -
Trigger Event:
release
-
Statement type:
File details
Details for the file coglet-0.17.0a3-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.
File metadata
- Download URL: coglet-0.17.0a3-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
- Upload date:
- Size: 2.9 MB
- Tags: CPython 3.10+, manylinux: glibc 2.17+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8c72666b04b97f95e9d5e5ff1296407ad914cdf6766c579ebea5017e0821cd30
|
|
| MD5 |
4d8a2a16f46c972cd4de3860114f0b1b
|
|
| BLAKE2b-256 |
f1888f00ec2380e849026702c1a35887462f42ae7f3f830ac0d8bcab2a8a13ee
|
Provenance
The following attestation bundles were made for coglet-0.17.0a3-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:
Publisher:
release-publish.yaml on replicate/cog
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
coglet-0.17.0a3-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl -
Subject digest:
8c72666b04b97f95e9d5e5ff1296407ad914cdf6766c579ebea5017e0821cd30 - Sigstore transparency entry: 934647057
- Sigstore integration time:
-
Permalink:
replicate/cog@339ef965216eb49b9696fe7ca50b13140725b830 -
Branch / Tag:
refs/tags/v0.17.0-alpha3 - Owner: https://github.com/replicate
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-publish.yaml@339ef965216eb49b9696fe7ca50b13140725b830 -
Trigger Event:
release
-
Statement type:
File details
Details for the file coglet-0.17.0a3-cp310-abi3-macosx_11_0_arm64.whl.
File metadata
- Download URL: coglet-0.17.0a3-cp310-abi3-macosx_11_0_arm64.whl
- Upload date:
- Size: 3.0 MB
- Tags: CPython 3.10+, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3c2d0ef2f8e3c2aa42c0d1f986f619e68518f06136f58e704d5f60c832e71418
|
|
| MD5 |
59fa0e999f322d551587462563e0b8ee
|
|
| BLAKE2b-256 |
af796fc413c3d1af0bcccbb1b436f9fa05d1140e8385d8b138e3a6b3c7752b8b
|
Provenance
The following attestation bundles were made for coglet-0.17.0a3-cp310-abi3-macosx_11_0_arm64.whl:
Publisher:
release-publish.yaml on replicate/cog
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
coglet-0.17.0a3-cp310-abi3-macosx_11_0_arm64.whl -
Subject digest:
3c2d0ef2f8e3c2aa42c0d1f986f619e68518f06136f58e704d5f60c832e71418 - Sigstore transparency entry: 934647104
- Sigstore integration time:
-
Permalink:
replicate/cog@339ef965216eb49b9696fe7ca50b13140725b830 -
Branch / Tag:
refs/tags/v0.17.0-alpha3 - Owner: https://github.com/replicate
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-publish.yaml@339ef965216eb49b9696fe7ca50b13140725b830 -
Trigger Event:
release
-
Statement type: