Skip to main content

A library used to build custom functions in Cozy Creator's serverless function platform.

Project description

gen-worker

A Python SDK for building serverless functions for AI inference. Write your function, declare required model refs, publish an endpoint release, and invoke it via Cozy's control plane.

Tenant Worker Build Contract (Dockerfile-First)

When publishing a tenant worker, Cozy expects a Dockerfile-first project layout.

Build inputs MUST include:

  • tensorhub.toml (Cozy manifest; used at build/publish time)
  • Dockerfile (builds the worker image)
  • tenant code (pyproject.toml, uv.lock, src/, etc.)

The built image MUST:

  1. Install gen-worker (so discovery + runtime can run).
  2. Bake function discovery output (manifest) at build time:
RUN mkdir -p /app/.cozy && python -m gen_worker.discover > /app/.cozy/manifest.json
  1. Use the Cozy worker runtime as the ENTRYPOINT:
ENTRYPOINT ["python", "-m", "gen_worker.entrypoint"]

Notes:

  • tensorhub.toml is not required to be present in the final image; it is a build-time input.
  • The platform reads /app/.cozy/manifest.json from the built image and stores it in Cozy Hub DB for routing/invocation.

Installation

Start a python project, and then run:

uv add gen-worker

With PyTorch support:

uv add gen-worker[torch]

Quick Start

import msgspec
from gen_worker import ActionContext, worker_function

class Input(msgspec.Struct):
    prompt: str

class Output(msgspec.Struct):
    text: str

@worker_function()
def generate(ctx: ActionContext, payload: Input) -> Output:
    return Output(text=f"Hello, {payload.prompt}!")

Features

  • Function discovery - Automatic detection of @worker_function decorated functions
  • Schema generation - Input/output schemas extracted from msgspec types
  • Model injection - Dependency injection for ML models with caching
  • Streaming output - Support for incremental/streaming responses
  • Progress reporting - Built-in progress events via ActionContext
  • Perf metrics - Best-effort per-run metrics emitted to gen-orchestrator (metrics.* worker events)
  • Trainer runtime mode - SDK-native trainer loop via WORKER_MODE=trainer
  • File handling - Upload/download assets via Cozy hub file API
  • Model caching - LRU cache with VRAM/disk management and cache-aware routing

Usage

Basic Function

import msgspec
from gen_worker import ActionContext, worker_function

class Input(msgspec.Struct):
    prompt: str

class Output(msgspec.Struct):
    result: str

@worker_function()
def my_function(ctx: ActionContext, payload: Input) -> Output:
    return Output(result=f"Processed: {payload.prompt}")

Streaming Output

from typing import Iterator

class Delta(msgspec.Struct):
    chunk: str

@worker_function()
def stream(ctx: ActionContext, payload: Input) -> Iterator[Delta]:
    for word in payload.prompt.split():
        if ctx.is_canceled():
            raise InterruptedError("canceled")
        yield Delta(chunk=word)

Model Injection

Declare your model keyspace in tensorhub.toml:

[models]
sd15 = "hf:stable-diffusion-v1-5/stable-diffusion-v1-5"
from typing import Annotated
from diffusers import DiffusionPipeline
from gen_worker.injection import ModelRef, ModelRefSource as Src

@worker_function()
def generate(
    ctx: ActionContext,
    pipe: Annotated[DiffusionPipeline, ModelRef(Src.FIXED, "sd15")],  # key from tensorhub.toml [models]
    payload: Input,
) -> Output:
    # Use the injected pipeline (loaded/cached by the worker's model manager).
    return Output(result="done")

Payload-Selected Model (Short Key)

If you want the client payload to choose which repo to run, declare a short-key mapping in tensorhub.toml and use ModelRef(PAYLOAD, ...):

[models]
sd15 = "hf:stable-diffusion-v1-5/stable-diffusion-v1-5"
flux = "hf:black-forest-labs/FLUX.2-klein-4B"
from typing import Annotated
import msgspec
from diffusers import DiffusionPipeline
from gen_worker import ActionContext, worker_function
from gen_worker.injection import ModelRef, ModelRefSource as Src

class Input(msgspec.Struct):
    prompt: str
    model: str  # must be one of: "sd15" | "flux"

@worker_function()
def generate(
    ctx: ActionContext,
    pipe: Annotated[DiffusionPipeline, ModelRef(Src.PAYLOAD, "model")],
    payload: Input,
):
    ...

Note: by default the worker requires payload model selection to use a known short-key from [models] (tensorhub.toml). It will not accept arbitrary repo refs in the payload. ModelRef(FIXED, ...) is also restricted to keys declared in the manifest mapping (no inline hf:/cozy: refs).

Saving Files

@worker_function()
def process(ctx: ActionContext, payload: Input) -> Output:
    # Save bytes and get asset reference
    asset = ctx.save_bytes(f"runs/{ctx.request_id}/outputs/output.png", image_bytes)
    return Output(result=asset.ref)

Trainer Mode (Class-Only)

from gen_worker import StepContext, StepResult

class MyTrainer:
    def setup(self, ctx: StepContext) -> None:
        pass

    def configure(self, ctx: StepContext) -> dict[str, object]:
        return {"step": 0}

    def prepare_batch(self, raw_batch: object, state: dict[str, object], ctx: StepContext) -> object:
        return raw_batch

    def train_step(self, batch: object, state: dict[str, object], ctx: StepContext) -> StepResult:
        return StepResult(metrics={"train/loss": 0.123})

    def state_dict(self, state: dict[str, object]) -> dict[str, object]:
        return dict(state)

    def load_state_dict(self, state: dict[str, object], payload: dict[str, object], ctx: StepContext) -> None:
        state.update(payload)

Run trainer mode:

WORKER_MODE=trainer \
TRAINER_JOB_SPEC_PATH=/app/.cozy/trainer_job.json \
python -m gen_worker.entrypoint

Full authoring guide: docs/custom-trainer-authoring.md. Orchestrated runtime contract: docs/issue-081-orchestrated-trainer-runtime.md.

Dev HTTP Runner (Local Inference Without gen-orchestrator)

For local testing of a built worker image (without standing up gen-orchestrator), run the dev HTTP runner and write outputs to a mounted local directory.

Container example:

docker run --rm --gpus all -p 8081:8081 \
  -v "$(pwd)/out:/outputs" \
  -e TENSORHUB_URL='http://host.docker.internal:7777' \
  <your-worker-image> \
  python -m gen_worker.testing.http_runner --listen 0.0.0.0:8081 --outputs /outputs

Prefetch a public model (example: SD1.5 on Hugging Face, via Cozy Hub mirror):

curl -sS -X POST 'http://localhost:8081/v1/models/prefetch' \
  -H 'content-type: application/json' \
  -d '{"models":[{"ref":"hf:runwayml/stable-diffusion-v1-5@main","dtypes":["bf16","fp16"]}]}'

Invoke a function:

curl -sS -X POST 'http://localhost:8081/v1/request/generate' \
  -H 'content-type: application/json' \
  -d '{"payload": {"prompt": "a tiny robot watering a bonsai, macro photo"}}'

Outputs are written under /outputs/runs/<request_id>/outputs/... (matching Cozy ref semantics).

Configuration

tensorhub.toml

schema_version = 1
name = "my-worker"
main = "my_pkg.main"
gen_worker = ">=0.2.0,<0.3.0"

[models]
sdxl = { ref = "hf:stabilityai/stable-diffusion-xl-base-1.0", dtypes = ["fp16","bf16"] }

[models] entries support two forms:

  • String form (defaults to dtypes=["fp16","bf16"]):
    • sd15 = "hf:stable-diffusion-v1-5/stable-diffusion-v1-5"
  • Table form:
    • flux_fp8 = { ref = "hf:black-forest-labs/FLUX.2-klein-4B", dtypes = ["fp8"] }

Environment Variables

Orchestrator-injected (production contract):

Variable Default Description
WORKER_MODE inference Runtime mode selector (inference or trainer)
SCHEDULER_PUBLIC_ADDR - Scheduler address workers should dial
SCHEDULER_ADDRS - Optional comma-separated seed addresses for leader discovery
WORKER_JWT - Worker-connect JWT (required; claims are authoritative)

Local dev / advanced (not injected by orchestrator):

Variable Default Description
SCHEDULER_JWKS_URL - Optional: verify WORKER_JWT locally against scheduler JWKS
SCHEDULER_JWT_ISSUER - Optional: expected iss when verifying WORKER_JWT locally
SCHEDULER_JWT_AUDIENCE - Optional: expected aud when verifying WORKER_JWT locally
USE_TLS false Local-dev knob for plaintext vs TLS gRPC; production typically terminates TLS upstream
WORKER_MAX_CONCURRENCY - Max concurrent task executions
WORKER_MAX_INPUT_BYTES - Max input payload size
WORKER_MAX_OUTPUT_BYTES - Max output payload size
WORKER_MAX_UPLOAD_BYTES - Max file upload size
WORKER_MAX_VRAM_GB Auto Maximum VRAM for models
WORKER_VRAM_SAFETY_MARGIN_GB 3.5 Reserved VRAM for working memory
TENSORHUB_CACHE_DIR ~/.cache/tensorhub TensorHub cache root; worker CAS defaults derive from this (${TENSORHUB_CACHE_DIR}/cas/...)
WORKER_LOCAL_MODEL_CACHE_DIR /tmp/tensorhub/local-model-cache Optional local (non-NFS) cache for snapshot localization
WORKER_REGISTER_TIMEOUT_S 90 Startup watchdog: fail fast if worker never registers with scheduler
WORKER_WARN_MODEL_RESOLVE_S 30 Emit task.model_resolve.stuck warning after this duration
WORKER_WARN_MODEL_LOAD_S 60 Emit task.model_load.stuck warning after this duration
WORKER_WARN_INFERENCE_S 60 Emit task.inference.stuck warning after this duration
WORKER_MAX_CONCURRENT_DOWNLOADS 2 Max parallel model downloads
TENSORHUB_URL - Cozy Hub base URL (used for public model requests and, if enabled, Cozy Hub API resolve)
WORKER_ALLOW_TENSORHUB_API_RESOLVE false Local dev only: allow the worker to call Cozy Hub resolve APIs
TENSORHUB_TOKEN - Cozy Hub bearer token (optional; enables ingest-if-missing for public HF models, if Cozy Hub requires auth)
HF_TOKEN - Hugging Face token (for private hf: refs)
TRAINER_JOB_SPEC_PATH /app/.cozy/trainer_job.json Trainer-mode JSON job manifest path
TRAINER_PLUGIN - Trainer plugin import (module:symbol); optional if provided in job JSON
TRAINER_CHECKPOINTS_DIR /tmp/training/checkpoints Local checkpoint output directory in trainer mode
TRAINER_SAMPLES_DIR /tmp/training/samples Local sample output directory in trainer mode
TRAINER_EVENTS_PATH - Optional line-delimited JSON lifecycle event log for trainer mode

Metrics

The worker can emit best-effort performance/debug metrics to gen-orchestrator via WorkerEvent messages.

See docs/metrics.md. See docs/worker-stuck-visibility.md for startup/task watchdog events used to diagnose stuck workers.

Hugging Face (hf:) download behavior

By default, hf: model refs do not download the full repo. The worker uses huggingface_hub.snapshot_download(allow_patterns=...) to avoid pulling huge legacy weights.

Defaults:

  • Download only what a diffusers pipeline needs (derived from model_index.json).
  • Skip safety_checker and feature_extractor by default.
  • Download only reduced-precision safetensors weights (fp16/bf16); never download .ckpt or .bin by default.
  • For sharded safetensors, also download the *.safetensors.index.json and the referenced shard files.

Overrides:

  • COZY_HF_COMPONENTS="unet,vae,text_encoder,tokenizer,scheduler": hard override component list.
  • COZY_HF_INCLUDE_OPTIONAL_COMPONENTS=1: include components like safety_checker / feature_extractor if present.
  • COZY_HF_WEIGHT_PRECISIONS="fp16,bf16": change which weight suffixes are accepted (add fp32 only if you really need it).
  • COZY_HF_ALLOW_ROOT_JSON=1: allow additional small root *.json files (some repos need extra root config).
  • COZY_HF_FULL_REPO_DOWNLOAD=1: disable filtering and download the entire repo (not recommended; can be 10s of GB).

Cozy Hub (cozy:) download behavior

Cozy refs use release selectors:

  • cozy:owner/repo:tag (for example :latest, :prod, :5.3)
  • cozy:owner/repo@blake3:<digest> (immutable pin)

Tags are mutable pointers, but they resolve only to published versions.

Cozy snapshot/object file downloads are written to *.part and then atomically renamed on success. If a *.part file exists from a previous interrupted download, the worker attempts to resume it using HTTP Range requests (if supported by the presigned object-store URL), and falls back to a full re-download if Range is not supported.

Docker Deployment

Project Structure

my-worker/
├── pyproject.toml
├── uv.lock
└── src/
    └── my_module/
        └── main.py

Local Dev Build (Using Root Dockerfile)

For production, use the cozyctl CLI to build and deploy worker-images to our network. But for local testing, you can build images using our provided Dockerfile:

# Build an example using the same root Dockerfile
docker build -t sd15-worker -f Dockerfile examples/sd15

# Run
docker run \
  -e SCHEDULER_PUBLIC_ADDR=orchestrator:8080 \
  -e WORKER_JWT='<worker-connect-jwt>' \
  sd15-worker

Canonical local dev build args (GPU, CUDA 12.6, torch 2.10.x, Python 3.12):

cd ~/cozy/python-gen-worker

docker build \
  --build-arg PYTHON_VERSION=3.12 \
  --build-arg UV_TORCH_BACKEND=cu126 \
  --build-arg TORCH_SPEC='~=2.10.0' \
  -f Dockerfile \
  -t my-worker:dev \
  examples/sd15

Optional build args:

docker build \
  --build-arg PYTHON_VERSION=3.12 \
  --build-arg UV_TORCH_BACKEND=cu128 \
  --build-arg TORCH_SPEC=">=2.9,<3" \
  -t my-worker -f Dockerfile examples/sd15

Build Base

Worker images build directly from a Python+uv base image:

  • ghcr.io/astral-sh/uv:python3.12-bookworm-slim

PyTorch/CUDA dependencies are installed as part of your worker's dependency set during image build.

Publish/Promote Lifecycle

Control-plane behavior (tensorhub + orchestrator):

  • Every publish creates a new immutable internal release_id.
  • End users invoke functions by owner/endpoint/function (default prod) or owner/endpoint/function:tag.
  • endpoint is derived from tensorhub.toml name and normalized to a URL-safe slug.
  • function names are derived from Python @worker_function names and normalized to URL-safe slugs (for example, medasr_transcribe -> medasr-transcribe).
  • Publishing does not move traffic by default.
  • Promoting a function tag moves traffic to that release.
  • Rollback is just retargeting the tag to an older release.

Model Cache

Workers report model availability for intelligent job routing:

State Location Latency
Hot VRAM Instant
Warm Disk Seconds
Cold None Minutes (download required)

Dev Testing (Mock Orchestrator)

For local end-to-end tests without standing up gen-orchestrator, use the one-off mock orchestrator invoke command (curl-like workflow). It starts a temporary scheduler, waits for a worker to connect, sends one TaskExecutionRequest, prints the result, and exits.

Start your worker container first:

docker run --rm \
  --add-host=host.docker.internal:host-gateway \
  -e SCHEDULER_PUBLIC_ADDR=host.docker.internal:8080 \
  -e WORKER_JWT='dev-worker-jwt' \
  <your-worker-image>

In another terminal, send one request:

python -m gen_worker.testing.mock_orchestrator \
  --listen 0.0.0.0:8080 \
  --run hello \
  --payload-json '{"name":"world"}'

Run the command again with a different payload whenever you want to send another request.

from gen_worker.model_cache import ModelCache

cache = ModelCache(max_vram_gb=20.0)
cache.mark_loaded_to_vram("model-a", pipeline, size_gb=8.0)
cache.is_in_vram("model-a")  # True
cache.get_vram_models()      # ["model-a"]

Error Handling

from gen_worker.errors import RetryableError, ValidationError, FatalError

@worker_function()
def process(ctx: ActionContext, payload: Input) -> Output:
    if not payload.prompt:
        raise ValidationError("prompt is required")  # 400, no retry

    try:
        result = call_external_api()
    except TimeoutError:
        raise RetryableError("API timeout")  # Will be retried

    return Output(result=result)

Development

# Install dev dependencies
uv sync --extra dev

# Run tests
uv run pytest

# Type checking
uv run mypy src/gen_worker

# Build
uv build

Regenerating Protobuf Stubs

Requires gen-orchestrator as a sibling repo:

uv sync --extra dev
python -m grpc_tools.protoc -I../gen-orchestrator/proto --python_out=src/gen_worker/pb --grpc_python_out=src/gen_worker/pb ../gen-orchestrator/proto/*.proto

Worker Wire Protocol

The worker advertises a protocol MAJOR.MINOR in WorkerRegistration (protocol_major, protocol_minor).

  • Current runtime constants live in src/gen_worker/wire_protocol.py.
  • Orchestrator compatibility policy/ranges are documented in ../gen-orchestrator/docs/worker_wire_protocol.md.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gen_worker-0.3.5.tar.gz (156.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gen_worker-0.3.5-py3-none-any.whl (179.5 kB view details)

Uploaded Python 3

File details

Details for the file gen_worker-0.3.5.tar.gz.

File metadata

  • Download URL: gen_worker-0.3.5.tar.gz
  • Upload date:
  • Size: 156.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for gen_worker-0.3.5.tar.gz
Algorithm Hash digest
SHA256 67009043a5d57a23319d0ae7c174b8318082ae9f02310b16d0a81416451ffc0d
MD5 ec34752343bf1ec2bcfa965e5837801b
BLAKE2b-256 d1a684174a72ccab8f3a76b26a8d24511b0acedbe08f8bdac77a30ecb88199bf

See more details on using hashes here.

File details

Details for the file gen_worker-0.3.5-py3-none-any.whl.

File metadata

  • Download URL: gen_worker-0.3.5-py3-none-any.whl
  • Upload date:
  • Size: 179.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for gen_worker-0.3.5-py3-none-any.whl
Algorithm Hash digest
SHA256 53cb051f800ad0c60fea5d08c6dadab1674193ada27e8f3052a93e5853320569
MD5 35c549248c8d70f94cbac362cfed5edb
BLAKE2b-256 84baa9af0c772444bd7dc4d1acf91e931fb51b653a1fc40adefdd17468fa6f50

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page