A library used to build custom endpoints in Cozy Creator's serverless function platform.
Project description
gen-worker
A Python SDK for building serverless api-endpoints for AI inference. Just write your custom function, create a manifest specifying what model-weights you need from Cozy-Hub, and then deploy it! We take care of the rest!
Tenant Worker Build Contract (Dockerfile-First)
When publishing a tenant worker, Cozy expects a Dockerfile-first project layout.
Build inputs MUST include:
cozy.toml(Cozy manifest; used at build/publish time)Dockerfile(builds the worker image)- tenant code (
pyproject.toml,uv.lock,src/, etc.)
The built image MUST:
- Install
gen-worker(so discovery + runtime can run). - Bake endpoint discovery output at build time:
RUN mkdir -p /app/.cozy && python -m gen_worker.discover > /app/.cozy/manifest.json
- Use the Cozy worker runtime as the ENTRYPOINT:
ENTRYPOINT ["python", "-m", "gen_worker.entrypoint"]
Notes:
cozy.tomlis not required to be present in the final image; it is a build-time input.- The platform reads
/app/.cozy/manifest.jsonfrom the built image and stores it in Cozy Hub DB for routing/invocation.
Installation
Start a python project, and then run:
uv add gen-worker
With PyTorch support:
uv add gen-worker[torch]
Quick Start
import msgspec
from gen_worker import ActionContext, worker_function
class Input(msgspec.Struct):
prompt: str
class Output(msgspec.Struct):
text: str
@worker_function()
def generate(ctx: ActionContext, payload: Input) -> Output:
return Output(text=f"Hello, {payload.prompt}!")
Features
- Function discovery - Automatic detection of
@worker_functiondecorated functions - Schema generation - Input/output schemas extracted from msgspec types
- Model injection - Dependency injection for ML models with caching
- Streaming output - Support for incremental/streaming responses
- Progress reporting - Built-in progress events via
ActionContext - Perf metrics - Best-effort per-run metrics emitted to gen-orchestrator (
metrics.*worker events) - File handling - Upload/download assets via Cozy hub file API
- Model caching - LRU cache with VRAM/disk management and cache-aware routing
Usage
Basic Function
import msgspec
from gen_worker import ActionContext, worker_function
class Input(msgspec.Struct):
prompt: str
class Output(msgspec.Struct):
result: str
@worker_function()
def my_function(ctx: ActionContext, payload: Input) -> Output:
return Output(result=f"Processed: {payload.prompt}")
Streaming Output
from typing import Iterator
class Delta(msgspec.Struct):
chunk: str
@worker_function()
def stream(ctx: ActionContext, payload: Input) -> Iterator[Delta]:
for word in payload.prompt.split():
if ctx.is_canceled():
raise InterruptedError("canceled")
yield Delta(chunk=word)
Model Injection
Declare your model keyspace in cozy.toml:
[models]
sd15 = "hf:stable-diffusion-v1-5/stable-diffusion-v1-5"
from typing import Annotated
from diffusers import DiffusionPipeline
from gen_worker.injection import ModelRef, ModelRefSource as Src
@worker_function()
def generate(
ctx: ActionContext,
pipe: Annotated[DiffusionPipeline, ModelRef(Src.FIXED, "sd15")], # key from cozy.toml [models]
payload: Input,
) -> Output:
# Use the injected pipeline (loaded/cached by the worker's model manager).
return Output(result="done")
Payload-Selected Model (Short Key)
If you want the client payload to choose which repo to run, declare a short-key
mapping in cozy.toml and use ModelRef(PAYLOAD, ...):
[models]
sd15 = "hf:stable-diffusion-v1-5/stable-diffusion-v1-5"
flux = "hf:black-forest-labs/FLUX.2-klein-4B"
from typing import Annotated
import msgspec
from diffusers import DiffusionPipeline
from gen_worker import ActionContext, worker_function
from gen_worker.injection import ModelRef, ModelRefSource as Src
class Input(msgspec.Struct):
prompt: str
model: str # must be one of: "sd15" | "flux"
@worker_function()
def generate(
ctx: ActionContext,
pipe: Annotated[DiffusionPipeline, ModelRef(Src.PAYLOAD, "model")],
payload: Input,
):
...
Note: by default the worker requires payload model selection to use a known
short-key from [models] (cozy.toml). It will not accept arbitrary repo refs in
the payload. ModelRef(FIXED, ...) is also restricted to keys declared in the
manifest mapping (no inline hf:/cozy: refs).
Saving Files
@worker_function()
def process(ctx: ActionContext, payload: Input) -> Output:
# Save bytes and get asset reference
asset = ctx.save_bytes(f"runs/{ctx.run_id}/outputs/output.png", image_bytes)
return Output(result=asset.ref)
Dev HTTP Runner (Local Inference Without gen-orchestrator)
For local testing of a built worker image (without standing up gen-orchestrator), run the dev HTTP runner and write outputs to a mounted local directory.
Container example:
docker run --rm --gpus all -p 8081:8081 \
-v "$(pwd)/out:/outputs" \
-e COZY_HUB_URL='http://host.docker.internal:7777' \
<your-worker-image> \
python -m gen_worker.testing.http_runner --listen 0.0.0.0:8081 --outputs /outputs
Prefetch a public model (example: SD1.5 on Hugging Face, via Cozy Hub mirror):
curl -sS -X POST 'http://localhost:8081/v1/models/prefetch' \
-H 'content-type: application/json' \
-d '{"models":[{"ref":"hf:runwayml/stable-diffusion-v1-5@main","dtypes":["bf16","fp16"]}]}'
Invoke an endpoint:
curl -sS -X POST 'http://localhost:8081/v1/run/generate' \
-H 'content-type: application/json' \
-d '{"payload": {"prompt": "a tiny robot watering a bonsai, macro photo"}}'
Outputs are written under /outputs/runs/<run_id>/outputs/... (matching Cozy ref semantics).
Configuration
cozy.toml
schema_version = 1
name = "my-worker"
main = "my_pkg.main"
gen_worker = ">=0.2.0,<0.3.0"
[models]
sdxl = { ref = "hf:stabilityai/stable-diffusion-xl-base-1.0", dtypes = ["fp16","bf16"] }
[models] entries support two forms:
- String form (defaults to
dtypes=["fp16","bf16"]):sd15 = "hf:stable-diffusion-v1-5/stable-diffusion-v1-5"
- Table form:
flux_fp8 = { ref = "hf:black-forest-labs/FLUX.2-klein-4B", dtypes = ["fp8"] }
Environment Variables
Orchestrator-injected (production contract):
| Variable | Default | Description |
|---|---|---|
SCHEDULER_PUBLIC_ADDR |
- | Scheduler address workers should dial |
SCHEDULER_ADDRS |
- | Optional comma-separated seed addresses for leader discovery |
WORKER_JWT |
- | Worker-connect JWT (required; claims are authoritative) |
Local dev / advanced (not injected by orchestrator):
| Variable | Default | Description |
|---|---|---|
SCHEDULER_JWKS_URL |
- | Optional: verify WORKER_JWT locally against scheduler JWKS |
SCHEDULER_JWT_ISSUER |
- | Optional: expected iss when verifying WORKER_JWT locally |
SCHEDULER_JWT_AUDIENCE |
- | Optional: expected aud when verifying WORKER_JWT locally |
USE_TLS |
false |
Local-dev knob for plaintext vs TLS gRPC; production typically terminates TLS upstream |
WORKER_MAX_CONCURRENCY |
- | Max concurrent task executions |
WORKER_MAX_INPUT_BYTES |
- | Max input payload size |
WORKER_MAX_OUTPUT_BYTES |
- | Max output payload size |
WORKER_MAX_UPLOAD_BYTES |
- | Max file upload size |
WORKER_MAX_VRAM_GB |
Auto | Maximum VRAM for models |
WORKER_VRAM_SAFETY_MARGIN_GB |
3.5 | Reserved VRAM for working memory |
TENSORHUB_CACHE_DIR |
~/.cache/tensorhub |
TensorHub cache root; worker CAS defaults derive from this (${TENSORHUB_CACHE_DIR}/cas/...) |
WORKER_LOCAL_MODEL_CACHE_DIR |
/tmp/tensorhub/local-model-cache |
Optional local (non-NFS) cache for snapshot localization |
WORKER_REGISTER_TIMEOUT_S |
90 |
Startup watchdog: fail fast if worker never registers with scheduler |
WORKER_WARN_MODEL_RESOLVE_S |
30 |
Emit task.model_resolve.stuck warning after this duration |
WORKER_WARN_MODEL_LOAD_S |
60 |
Emit task.model_load.stuck warning after this duration |
WORKER_WARN_INFERENCE_S |
60 |
Emit task.inference.stuck warning after this duration |
WORKER_MAX_CONCURRENT_DOWNLOADS |
2 | Max parallel model downloads |
COZY_HUB_URL |
- | Cozy Hub base URL (used for public model requests and, if enabled, Cozy Hub API resolve) |
WORKER_ALLOW_COZY_HUB_API_RESOLVE |
false |
Local dev only: allow the worker to call Cozy Hub resolve APIs |
COZY_HUB_TOKEN |
- | Cozy Hub bearer token (optional; enables ingest-if-missing for public HF models, if Cozy Hub requires auth) |
HF_TOKEN |
- | Hugging Face token (for private hf: refs) |
Metrics
The worker can emit best-effort performance/debug metrics to gen-orchestrator via WorkerEvent messages.
See docs/metrics.md.
See docs/worker-stuck-visibility.md for startup/task watchdog events used to diagnose stuck workers.
Hugging Face (hf:) download behavior
By default, hf: model refs do not download the full repo. The worker uses huggingface_hub.snapshot_download(allow_patterns=...) to avoid pulling huge legacy weights.
Defaults:
- Download only what a diffusers pipeline needs (derived from
model_index.json). - Skip
safety_checkerandfeature_extractorby default. - Download only reduced-precision safetensors weights (
fp16/bf16); never download.ckptor.binby default. - For sharded safetensors, also download the
*.safetensors.index.jsonand the referenced shard files.
Overrides:
COZY_HF_COMPONENTS="unet,vae,text_encoder,tokenizer,scheduler": hard override component list.COZY_HF_INCLUDE_OPTIONAL_COMPONENTS=1: include components likesafety_checker/feature_extractorif present.COZY_HF_WEIGHT_PRECISIONS="fp16,bf16": change which weight suffixes are accepted (addfp32only if you really need it).COZY_HF_ALLOW_ROOT_JSON=1: allow additional small root*.jsonfiles (some repos need extra root config).COZY_HF_FULL_REPO_DOWNLOAD=1: disable filtering and download the entire repo (not recommended; can be 10s of GB).
Cozy Hub (cozy:) download behavior
Cozy snapshot/object file downloads are written to *.part and then atomically renamed on success. If a *.part file exists from a previous interrupted download, the worker attempts to resume it using HTTP Range requests (if supported by the presigned object-store URL), and falls back to a full re-download if Range is not supported.
Docker Deployment
Project Structure
my-worker/
├── pyproject.toml
├── uv.lock
└── src/
└── my_module/
└── main.py
Local Dev Build (Using Root Dockerfile)
For production, use the cozyctl CLI to build and deploy worker-images to our network. But for local testing, you can build images using our provided Dockerfile:
# Build an example using the same root Dockerfile
docker build -t sd15-worker -f Dockerfile examples/sd15
# Run
docker run \
-e SCHEDULER_PUBLIC_ADDR=orchestrator:8080 \
-e WORKER_JWT='<worker-connect-jwt>' \
sd15-worker
Canonical local dev build args (GPU, CUDA 12.6, torch 2.10.x, Python 3.12):
cd ~/cozy/python-gen-worker
docker build \
--build-arg PYTHON_VERSION=3.12 \
--build-arg UV_TORCH_BACKEND=cu126 \
--build-arg TORCH_SPEC='~=2.10.0' \
-f Dockerfile \
-t my-worker:dev \
examples/sd15
Optional build args:
docker build \
--build-arg PYTHON_VERSION=3.12 \
--build-arg UV_TORCH_BACKEND=cu128 \
--build-arg TORCH_SPEC=">=2.9,<3" \
-t my-worker -f Dockerfile examples/sd15
Build Base
Worker images build directly from a Python+uv base image:
ghcr.io/astral-sh/uv:python3.12-bookworm-slim
PyTorch/CUDA dependencies are installed as part of your worker's dependency set during image build.
Publish/Promote Lifecycle
Control-plane behavior (cozy-hub + orchestrator):
- Every publish creates a new immutable internal
release_id. - End users invoke endpoints by
tenant/project/endpoint(defaultprod) ortenant/project/endpoint@tag. projectis derived frompyproject.toml[project].nameand normalized to a URL-safe slug.- Endpoint names are derived from worker function names and slugified to URL-safe form (for example,
medasr_transcribe->medasr-transcribe). - Publishing does not move traffic by default.
- Promoting an endpoint/tag moves traffic to that release.
- Rollback is just retargeting the tag to an older release.
Model Cache
Workers report model availability for intelligent job routing:
| State | Location | Latency |
|---|---|---|
| Hot | VRAM | Instant |
| Warm | Disk | Seconds |
| Cold | None | Minutes (download required) |
Dev Testing (Mock Orchestrator)
For local end-to-end tests without standing up gen-orchestrator, use the one-off mock orchestrator invoke command (curl-like workflow). It starts a temporary scheduler, waits for a worker to connect, sends one TaskExecutionRequest, prints the result, and exits.
Start your worker container first:
docker run --rm \
--add-host=host.docker.internal:host-gateway \
-e SCHEDULER_PUBLIC_ADDR=host.docker.internal:8080 \
-e WORKER_JWT='dev-worker-jwt' \
<your-worker-image>
In another terminal, send one request:
python -m gen_worker.testing.mock_orchestrator \
--listen 0.0.0.0:8080 \
--run hello \
--payload-json '{"name":"world"}'
Run the command again with a different payload whenever you want to send another request.
from gen_worker.model_cache import ModelCache
cache = ModelCache(max_vram_gb=20.0)
cache.mark_loaded_to_vram("model-a", pipeline, size_gb=8.0)
cache.is_in_vram("model-a") # True
cache.get_vram_models() # ["model-a"]
Error Handling
from gen_worker.errors import RetryableError, ValidationError, FatalError
@worker_function()
def process(ctx: ActionContext, payload: Input) -> Output:
if not payload.prompt:
raise ValidationError("prompt is required") # 400, no retry
try:
result = call_external_api()
except TimeoutError:
raise RetryableError("API timeout") # Will be retried
return Output(result=result)
Development
# Install dev dependencies
uv sync --extra dev
# Run tests
uv run pytest
# Type checking
uv run mypy src/gen_worker
# Build
uv build
Regenerating Protobuf Stubs
Requires gen-orchestrator as a sibling repo:
uv sync --extra dev
python -m grpc_tools.protoc -I../gen-orchestrator/proto --python_out=src/gen_worker/pb --grpc_python_out=src/gen_worker/pb ../gen-orchestrator/proto/*.proto
Worker Wire Protocol
The worker advertises a protocol MAJOR.MINOR in WorkerRegistration (protocol_major, protocol_minor).
- Current runtime constants live in
src/gen_worker/wire_protocol.py. - Orchestrator compatibility policy/ranges are documented in
../gen-orchestrator/docs/worker_wire_protocol.md.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gen_worker-0.3.0.tar.gz.
File metadata
- Download URL: gen_worker-0.3.0.tar.gz
- Upload date:
- Size: 130.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
689be664a9f8b7c3a052103675bea097594934b04ed08d02bf4b9d06c2e1468f
|
|
| MD5 |
fb176511aa20abdc6be90c82e605d407
|
|
| BLAKE2b-256 |
37ac309ca10137c52be07071a058affcf991e91ef1cae6c7bc2c3fc6fc726e55
|
File details
Details for the file gen_worker-0.3.0-py3-none-any.whl.
File metadata
- Download URL: gen_worker-0.3.0-py3-none-any.whl
- Upload date:
- Size: 145.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c4e5011e89bedbcd52e04637282942cbc4270c55d2056176863e16a855d803e3
|
|
| MD5 |
30dc83de314a0cf9f2d1b6cb7d7b4faf
|
|
| BLAKE2b-256 |
25b5658d13ad74ea95661b6e8eaca426bc7500f72887b47404e8d00bc9a2d369
|