A library used to build custom functions in Cozy Creator's serverless function platform.
Project description
gen-worker
A Python SDK for building serverless functions for AI inference. Write your function, declare required model refs, publish an endpoint release, and invoke it via Cozy's control plane.
Tenant Worker Build Contract (Dockerfile-First)
When publishing a tenant worker, Cozy expects a Dockerfile-first project layout.
Build inputs MUST include:
endpoint.toml(Cozy manifest; used at build/publish time)Dockerfile(builds the worker image)- tenant code (
pyproject.toml,uv.lock,src/, etc.)
The built image MUST:
- Install
gen-worker(so discovery + runtime can run). - Bake function discovery output (manifest) at build time:
RUN mkdir -p /app/.tensorhub && python -m gen_worker.discover > /app/.tensorhub/endpoint.lock
- Use the Cozy worker runtime as the ENTRYPOINT:
ENTRYPOINT ["python", "-m", "gen_worker.entrypoint"]
Notes:
endpoint.tomlis not required to be present in the final image; it is a build-time input.- The platform reads
/app/.tensorhub/endpoint.lockfrom the built image and stores it in Cozy Hub DB for routing/invocation.
Installation
Start a python project, and then run:
uv add gen-worker
With PyTorch support:
uv add gen-worker[torch]
Quick Start
import msgspec
from gen_worker import RequestContext, worker_function
class Input(msgspec.Struct):
prompt: str
class Output(msgspec.Struct):
text: str
@worker_function()
def generate(ctx: RequestContext, payload: Input) -> Output:
return Output(text=f"Hello, {payload.prompt}!")
Features
- Function discovery - Automatic detection of
@worker_functiondecorated functions - Schema generation - Input/output schemas extracted from msgspec types
- Model injection - Dependency injection for ML models with caching
- Streaming output - Support for incremental/streaming responses
- Progress reporting - Built-in progress events via
RequestContext - Perf metrics - Best-effort per-run metrics emitted to gen-orchestrator (
metrics.*worker events) - Trainer runtime mode - SDK-native trainer loop via
WORKER_MODE=trainer - File handling - Upload/download assets via Cozy hub file API
- Model caching - LRU cache with VRAM/disk management and cache-aware routing
Usage
Basic Function
import msgspec
from gen_worker import RequestContext, worker_function
class Input(msgspec.Struct):
prompt: str
class Output(msgspec.Struct):
result: str
@worker_function()
def my_function(ctx: RequestContext, payload: Input) -> Output:
return Output(result=f"Processed: {payload.prompt}")
Streaming Output
from typing import Iterator
class Delta(msgspec.Struct):
chunk: str
@worker_function()
def stream(ctx: RequestContext, payload: Input) -> Iterator[Delta]:
for word in payload.prompt.split():
if ctx.is_canceled():
raise InterruptedError("canceled")
yield Delta(chunk=word)
Model Injection
Declare fixed model keys in code, with refs/dtypes in endpoint.toml [models]:
[models]
sd15 = { ref = "stable-diffusion-v1-5/stable-diffusion-v1-5", dtypes = ["fp16", "bf16"] }
from typing import Annotated
from diffusers import DiffusionPipeline
from gen_worker.injection import ModelRef, ModelRefSource as Src
@worker_function()
def generate(
ctx: RequestContext,
pipe: Annotated[DiffusionPipeline, ModelRef(Src.FIXED, "sd15")],
payload: Input,
) -> Output:
# Use the injected pipeline (loaded/cached by the worker's model manager).
return Output(result="done")
Payload-Selected Model (Short Key)
If you want the client payload to choose which repo to run, declare selector
keyspaces in endpoint.toml and use ModelRef(PAYLOAD, ...):
[models.generate]
sd15 = { ref = "stable-diffusion-v1-5/stable-diffusion-v1-5", dtypes = ["fp16", "bf16"] }
flux = { ref = "black-forest-labs/flux.2-klein-4b", dtypes = ["bf16"] }
from typing import Annotated
import msgspec
from diffusers import DiffusionPipeline
from gen_worker import RequestContext, worker_function
from gen_worker.injection import ModelRef, ModelRefSource as Src
class Input(msgspec.Struct):
prompt: str
model: str # must be one of: "sd15" | "flux"
@worker_function()
def generate(
ctx: RequestContext,
pipe: Annotated[DiffusionPipeline, ModelRef(Src.PAYLOAD, "model")],
payload: Input,
):
...
Note: by default the worker requires payload model selection to use a known
short-key from the function keyspace in endpoint.toml. It will not accept
arbitrary repo refs in the payload.
Saving Files
@worker_function()
def process(ctx: RequestContext, payload: Input) -> Output:
# Save bytes and get asset reference
asset = ctx.save_bytes(f"runs/{ctx.request_id}/outputs/output.png", image_bytes)
return Output(result=asset.ref)
For conversion/training weight artifacts, use Tensors:
from gen_worker import Tensors
@worker_function()
def convert(ctx: RequestContext, payload: Input) -> Output:
local_weights = "/tmp/converted.safetensors"
tensors = ctx.save_checkpoint(
f"runs/{ctx.request_id}/outputs/weights.safetensors",
local_weights,
)
return Output(weights=tensors)
For large artifacts, stream bytes incrementally and finalize once:
with ctx.open_checkpoint_stream(
f"runs/{ctx.request_id}/outputs/weights.safetensors",
format="safetensors",
) as out:
for chunk in generate_chunks():
out.write(chunk)
tensors = out.finalize()
Trainer Mode (Class-Only)
from gen_worker import StepContext, StepResult
class MyTrainer:
def setup(self, ctx: StepContext) -> None:
pass
def configure(self, ctx: StepContext) -> dict[str, object]:
return {"step": 0}
def prepare_batch(self, raw_batch: object, state: dict[str, object], ctx: StepContext) -> object:
return raw_batch
def train_step(self, batch: object, state: dict[str, object], ctx: StepContext) -> StepResult:
return StepResult(metrics={"train/loss": 0.123})
def state_dict(self, state: dict[str, object]) -> dict[str, object]:
return dict(state)
def load_state_dict(self, state: dict[str, object], payload: dict[str, object], ctx: StepContext) -> None:
state.update(payload)
Run trainer mode:
WORKER_MODE=trainer \
TRAINER_JOB_SPEC_PATH=/app/.cozy/trainer_job.json \
python -m gen_worker.entrypoint
Full authoring guide: docs/custom-trainer-authoring.md.
Orchestrated runtime contract: docs/issue-081-orchestrated-trainer-runtime.md.
Dev HTTP Runner (Local Inference Without gen-orchestrator)
For local testing of a built worker image (without standing up gen-orchestrator), run the dev HTTP runner and write outputs to a mounted local directory.
Container example:
docker run --rm --gpus all -p 8081:8081 \
-v "$(pwd)/out:/outputs" \
-e TENSORHUB_URL='http://host.docker.internal:7777' \
<your-worker-image> \
python -m gen_worker.testing.http_runner --listen 0.0.0.0:8081 --outputs /outputs
Prefetch a public model (example: SD1.5):
curl -sS -X POST 'http://localhost:8081/v1/models/prefetch' \
-H 'content-type: application/json' \
-d '{"models":[{"ref":"runwayml/stable-diffusion-v1-5","dtypes":["bf16","fp16"]}]}'
Invoke a function:
curl -sS -X POST 'http://localhost:8081/v1/request/generate' \
-H 'content-type: application/json' \
-d '{"payload": {"prompt": "a tiny robot watering a bonsai, macro photo"}}'
Outputs are written under /outputs/runs/<request_id>/outputs/... (matching Cozy ref semantics).
Configuration
endpoint.toml
schema_version = 1
name = "my-worker"
main = "my_pkg.main"
[functions.generate]
batch_dimension = "items" # optional
[models]
sdxl = { ref = "stabilityai/stable-diffusion-xl-base-1.0", dtypes = ["fp16", "bf16"] }
[models.generate]
dreamshaper = { ref = "lykon/dreamshaper-xl-v2-turbo", dtypes = ["fp16", "bf16"] }
[resources]
max_inflight_requests = 1
Environment Variables
Orchestrator-injected (production contract):
| Variable | Default | Description |
|---|---|---|
WORKER_MODE |
inference |
Runtime mode selector (inference or trainer) |
SCHEDULER_PUBLIC_ADDR |
- | Scheduler address workers should dial |
SCHEDULER_ADDRS |
- | Optional comma-separated LB seed addresses |
WORKER_JWT |
- | Worker-connect JWT (required; claims are authoritative) |
Local dev / advanced (not injected by orchestrator):
| Variable | Default | Description |
|---|---|---|
SCHEDULER_JWKS_URL |
- | Optional: verify WORKER_JWT locally against scheduler JWKS |
SCHEDULER_JWT_ISSUER |
- | Optional: expected iss when verifying WORKER_JWT locally |
SCHEDULER_JWT_AUDIENCE |
- | Optional: expected aud when verifying WORKER_JWT locally |
USE_TLS |
false |
Local-dev knob for plaintext vs TLS gRPC; production typically terminates TLS upstream |
LB_ONLY_RETRIES |
true |
Retry via configured LB endpoint(s) only; ignore direct owner redirect hints |
RECONNECT_DELAY |
0.1 |
Base reconnect backoff in seconds (exponential) |
RECONNECT_MAX_DELAY |
1.0 |
Reconnect backoff cap in seconds |
RECONNECT_JITTER_SECONDS |
0.1 |
Added jitter upper bound in seconds, capped by RECONNECT_MAX_DELAY |
MAX_RECONNECT_ATTEMPTS |
0 |
Max reconnect attempts (0 = infinite retries) |
WORKER_MAX_CONCURRENCY |
- | Max concurrent task executions |
WORKER_MAX_INPUT_BYTES |
- | Max input payload size |
WORKER_MAX_OUTPUT_BYTES |
- | Max output payload size |
WORKER_MAX_UPLOAD_BYTES |
- | Max file upload size |
WORKER_MAX_VRAM_GB |
Auto | Maximum VRAM for models |
WORKER_VRAM_SAFETY_MARGIN_GB |
3.5 | Reserved VRAM for working memory |
TENSORHUB_CACHE_DIR |
~/.cache/tensorhub |
TensorHub cache root; worker CAS defaults derive from this (${TENSORHUB_CACHE_DIR}/cas/...) |
WORKER_LOCAL_MODEL_CACHE_DIR |
/tmp/tensorhub/local-model-cache |
Optional local (non-NFS) cache for snapshot localization |
WORKER_REGISTER_TIMEOUT_S |
90 |
Startup watchdog: fail fast if worker never registers with scheduler |
WORKER_WARN_MODEL_RESOLVE_S |
30 |
Emit task.model_resolve.stuck warning after this duration |
WORKER_WARN_MODEL_LOAD_S |
60 |
Emit task.model_load.stuck warning after this duration |
WORKER_WARN_INFERENCE_S |
60 |
Emit task.inference.stuck warning after this duration |
WORKER_MAX_CONCURRENT_DOWNLOADS |
2 | Max parallel model downloads |
TENSORHUB_URL |
- | Cozy Hub base URL (used for public model requests and, if enabled, Cozy Hub API resolve) |
WORKER_ALLOW_TENSORHUB_API_RESOLVE |
false |
Local dev only: allow the worker to call Cozy Hub resolve APIs |
TENSORHUB_TOKEN |
- | Cozy Hub bearer token (optional; enables ingest-if-missing for public models, if Cozy Hub requires auth) |
TRAINER_JOB_SPEC_PATH |
/app/.cozy/trainer_job.json |
Trainer-mode JSON job manifest path |
TRAINER_PLUGIN |
- | Trainer plugin import (module:symbol); optional if provided in job JSON |
TRAINER_CHECKPOINTS_DIR |
/tmp/training/checkpoints |
Local checkpoint output directory in trainer mode |
TRAINER_SAMPLES_DIR |
/tmp/training/samples |
Local sample output directory in trainer mode |
TRAINER_EVENTS_PATH |
- | Optional line-delimited JSON lifecycle event log for trainer mode |
Metrics
The worker can emit best-effort performance/debug metrics to gen-orchestrator via WorkerEvent messages.
See docs/metrics.md.
See docs/worker-stuck-visibility.md for startup/task watchdog events used to diagnose stuck workers.
Model Download Behavior
Model refs are plain lower-case strings:
owner/repoowner/repo:tagowner/repo@blake3:<digest>
Tags are mutable pointers that resolve to published versions.
Cozy snapshot/object file downloads are written to *.part and then atomically renamed on success. If a *.part file exists from a previous interrupted download, the worker attempts to resume it using HTTP Range requests (if supported by the presigned object-store URL), and falls back to a full re-download if Range is not supported.
Docker Deployment
Project Structure
my-worker/
├── pyproject.toml
├── uv.lock
└── src/
└── my_module/
└── main.py
Local Dev Build (Using Root Dockerfile)
For production, use the cozyctl CLI to build and deploy worker-images to our network. But for local testing, you can build images using our provided Dockerfile:
# Build an example using the same root Dockerfile
docker build -t sd15-worker -f Dockerfile examples/sd15
# Run
docker run \
-e SCHEDULER_PUBLIC_ADDR=orchestrator:8080 \
-e WORKER_JWT='<worker-connect-jwt>' \
sd15-worker
Canonical local dev build args (GPU, CUDA 12.6, torch 2.10.x, Python 3.12):
cd ~/cozy/python-gen-worker
docker build \
--build-arg PYTHON_VERSION=3.12 \
--build-arg UV_TORCH_BACKEND=cu126 \
--build-arg TORCH_SPEC='~=2.10.0' \
-f Dockerfile \
-t my-worker:dev \
examples/sd15
Optional build args:
docker build \
--build-arg PYTHON_VERSION=3.12 \
--build-arg UV_TORCH_BACKEND=cu128 \
--build-arg TORCH_SPEC=">=2.9,<3" \
-t my-worker -f Dockerfile examples/sd15
Build Base
Worker images build directly from a Python+uv base image:
ghcr.io/astral-sh/uv:python3.12-bookworm-slim
PyTorch/CUDA dependencies are installed as part of your worker's dependency set during image build.
Publish/Promote Lifecycle
Control-plane behavior (tensorhub + orchestrator):
- Every publish creates a new immutable internal
release_id. - End users invoke functions by
owner/endpoint/function(defaultprod) orowner/endpoint/function:tag. endpointis derived fromendpoint.tomlnameand normalized to a URL-safe slug.functionnames are derived from Python@worker_functionnames and normalized to URL-safe slugs (for example,medasr_transcribe->medasr-transcribe).- Publishing does not move traffic by default.
- Promoting a function tag moves traffic to that release.
- Rollback is just retargeting the tag to an older release.
Model Cache
Workers report model availability for intelligent job routing:
| State | Location | Latency |
|---|---|---|
| Hot | VRAM | Instant |
| Warm | Disk | Seconds |
| Cold | None | Minutes (download required) |
Dev Testing (Mock Orchestrator)
For local end-to-end tests without standing up gen-orchestrator, use the one-off mock orchestrator invoke command (curl-like workflow). It starts a temporary scheduler, waits for a worker to connect, sends one TaskExecutionRequest, prints the result, and exits.
Start your worker container first:
docker run --rm \
--add-host=host.docker.internal:host-gateway \
-e SCHEDULER_PUBLIC_ADDR=host.docker.internal:8080 \
-e WORKER_JWT='dev-worker-jwt' \
<your-worker-image>
In another terminal, send one request:
python -m gen_worker.testing.mock_orchestrator \
--listen 0.0.0.0:8080 \
--run hello \
--payload-json '{"name":"world"}'
Run the command again with a different payload whenever you want to send another request.
from gen_worker.model_cache import ModelCache
cache = ModelCache(max_vram_gb=20.0)
cache.mark_loaded_to_vram("model-a", pipeline, size_gb=8.0)
cache.is_in_vram("model-a") # True
cache.get_vram_models() # ["model-a"]
Error Handling
from gen_worker.errors import RetryableError, ValidationError, FatalError
@worker_function()
def process(ctx: RequestContext, payload: Input) -> Output:
if not payload.prompt:
raise ValidationError("prompt is required") # 400, no retry
try:
result = call_external_api()
except TimeoutError:
raise RetryableError("API timeout") # Will be retried
return Output(result=result)
Development
# Install dev dependencies
uv sync --extra dev
# Run tests
uv run pytest
# Type checking
uv run mypy src/gen_worker
# Build
uv build
Regenerating Protobuf Stubs
Requires gen-orchestrator as a sibling repo:
uv sync --extra dev
python -m grpc_tools.protoc -I../gen-orchestrator/proto --python_out=src/gen_worker/pb --grpc_python_out=src/gen_worker/pb ../gen-orchestrator/proto/*.proto
Worker Wire Protocol
The worker advertises a protocol MAJOR.MINOR in WorkerRegistration (protocol_major, protocol_minor).
- Current runtime constants live in
src/gen_worker/wire_protocol.py. - Orchestrator compatibility policy/ranges are documented in
../gen-orchestrator/docs/worker_wire_protocol.md.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gen_worker-0.4.3.tar.gz.
File metadata
- Download URL: gen_worker-0.4.3.tar.gz
- Upload date:
- Size: 182.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
219e42854d0e66fb601211f919a2906ef204864d7e1f8ac1147e3959d1cfa988
|
|
| MD5 |
0aaf640c296496d97212f2dd052f31fd
|
|
| BLAKE2b-256 |
cf7b1890c3ae995828c865eb4bd5c430f88b3ce4a8dbfe07e092fffdffbec2b2
|
File details
Details for the file gen_worker-0.4.3-py3-none-any.whl.
File metadata
- Download URL: gen_worker-0.4.3-py3-none-any.whl
- Upload date:
- Size: 211.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6eee4c4723e474627cc5b3c30f1ff1d355f0e46774942719131febd7407ad508
|
|
| MD5 |
b775e6f436537d9394919d8535e488a9
|
|
| BLAKE2b-256 |
280a78c44944d3669ece0f752730adc3ddceb67c526a51e9381ce14a1d3c8f33
|