Skip to main content

A library used to build custom functions in Cozy Creator's serverless function platform.

Project description

gen-worker

A Python SDK for building serverless functions for AI inference. Write your function, declare required model refs, publish an endpoint release, and invoke it via Cozy's control plane.

Tenant Worker Build Contract (Dockerfile-First)

When publishing a tenant worker, Cozy expects a Dockerfile-first project layout.

Build inputs MUST include:

  • endpoint.toml (Cozy manifest; used at build/publish time)
  • Dockerfile (builds the worker image)
  • tenant code (pyproject.toml, uv.lock, src/, etc.)

The built image MUST:

  1. Install gen-worker (so discovery + runtime can run).
  2. Bake function discovery output (manifest) at build time:
RUN mkdir -p /app/.tensorhub && python -m gen_worker.discover > /app/.tensorhub/endpoint.lock
  1. Use the Cozy worker runtime as the ENTRYPOINT:
ENTRYPOINT ["python", "-m", "gen_worker.entrypoint"]

Notes:

  • endpoint.toml is not required to be present in the final image; it is a build-time input.
  • The platform reads /app/.tensorhub/endpoint.lock from the built image and stores it in Cozy Hub DB for routing/invocation.

Installation

Start a python project, and then run:

uv add gen-worker

With PyTorch support:

uv add gen-worker[torch]

Quick Start

import msgspec
from gen_worker import RequestContext, worker_function

class Input(msgspec.Struct):
    prompt: str

class Output(msgspec.Struct):
    text: str

@worker_function()
def generate(ctx: RequestContext, payload: Input) -> Output:
    return Output(text=f"Hello, {payload.prompt}!")

Features

  • Function discovery - Automatic detection of @worker_function decorated functions
  • Schema generation - Input/output schemas extracted from msgspec types
  • Model injection - Dependency injection for ML models with caching
  • Streaming output - Support for incremental/streaming responses
  • Progress reporting - Built-in progress events via RequestContext
  • Perf metrics - Best-effort per-run metrics emitted to gen-orchestrator (metrics.* worker events)
  • Trainer runtime mode - SDK-native trainer loop via WORKER_MODE=trainer
  • File handling - Upload/download assets via Cozy hub file API
  • Model caching - LRU cache with VRAM/disk management and cache-aware routing

Usage

Basic Function

import msgspec
from gen_worker import RequestContext, worker_function

class Input(msgspec.Struct):
    prompt: str

class Output(msgspec.Struct):
    result: str

@worker_function()
def my_function(ctx: RequestContext, payload: Input) -> Output:
    return Output(result=f"Processed: {payload.prompt}")

Streaming Output

from typing import Iterator

class Delta(msgspec.Struct):
    chunk: str

@worker_function()
def stream(ctx: RequestContext, payload: Input) -> Iterator[Delta]:
    for word in payload.prompt.split():
        if ctx.is_canceled():
            raise InterruptedError("canceled")
        yield Delta(chunk=word)

Model Injection

Declare fixed model keys in code, with refs/dtypes in endpoint.toml [models]:

[models]
sd15 = { ref = "stable-diffusion-v1-5/stable-diffusion-v1-5", dtypes = ["fp16", "bf16"] }
from typing import Annotated
from diffusers import DiffusionPipeline
from gen_worker.injection import ModelRef, ModelRefSource as Src

@worker_function()
def generate(
    ctx: RequestContext,
    pipe: Annotated[DiffusionPipeline, ModelRef(Src.FIXED, "sd15")],
    payload: Input,
) -> Output:
    # Use the injected pipeline (loaded/cached by the worker's model manager).
    return Output(result="done")

Payload-Selected Model (Short Key)

If you want the client payload to choose which repo to run, declare selector keyspaces in endpoint.toml and use ModelRef(PAYLOAD, ...):

[models.generate]
sd15 = { ref = "stable-diffusion-v1-5/stable-diffusion-v1-5", dtypes = ["fp16", "bf16"] }
flux = { ref = "black-forest-labs/flux.2-klein-4b", dtypes = ["bf16"] }
from typing import Annotated
import msgspec
from diffusers import DiffusionPipeline
from gen_worker import RequestContext, worker_function
from gen_worker.injection import ModelRef, ModelRefSource as Src

class Input(msgspec.Struct):
    prompt: str
    model: str  # must be one of: "sd15" | "flux"

@worker_function()
def generate(
    ctx: RequestContext,
    pipe: Annotated[DiffusionPipeline, ModelRef(Src.PAYLOAD, "model")],
    payload: Input,
):
    ...

Note: by default the worker requires payload model selection to use a known short-key from the function keyspace in endpoint.toml. It will not accept arbitrary repo refs in the payload.

Saving Files

@worker_function()
def process(ctx: RequestContext, payload: Input) -> Output:
    # Save bytes and get asset reference
    asset = ctx.save_bytes(f"runs/{ctx.request_id}/outputs/output.png", image_bytes)
    return Output(result=asset.ref)

Trainer Mode (Class-Only)

from gen_worker import StepContext, StepResult

class MyTrainer:
    def setup(self, ctx: StepContext) -> None:
        pass

    def configure(self, ctx: StepContext) -> dict[str, object]:
        return {"step": 0}

    def prepare_batch(self, raw_batch: object, state: dict[str, object], ctx: StepContext) -> object:
        return raw_batch

    def train_step(self, batch: object, state: dict[str, object], ctx: StepContext) -> StepResult:
        return StepResult(metrics={"train/loss": 0.123})

    def state_dict(self, state: dict[str, object]) -> dict[str, object]:
        return dict(state)

    def load_state_dict(self, state: dict[str, object], payload: dict[str, object], ctx: StepContext) -> None:
        state.update(payload)

Run trainer mode:

WORKER_MODE=trainer \
TRAINER_JOB_SPEC_PATH=/app/.cozy/trainer_job.json \
python -m gen_worker.entrypoint

Full authoring guide: docs/custom-trainer-authoring.md. Orchestrated runtime contract: docs/issue-081-orchestrated-trainer-runtime.md.

Dev HTTP Runner (Local Inference Without gen-orchestrator)

For local testing of a built worker image (without standing up gen-orchestrator), run the dev HTTP runner and write outputs to a mounted local directory.

Container example:

docker run --rm --gpus all -p 8081:8081 \
  -v "$(pwd)/out:/outputs" \
  -e TENSORHUB_URL='http://host.docker.internal:7777' \
  <your-worker-image> \
  python -m gen_worker.testing.http_runner --listen 0.0.0.0:8081 --outputs /outputs

Prefetch a public model (example: SD1.5):

curl -sS -X POST 'http://localhost:8081/v1/models/prefetch' \
  -H 'content-type: application/json' \
  -d '{"models":[{"ref":"runwayml/stable-diffusion-v1-5","dtypes":["bf16","fp16"]}]}'

Invoke a function:

curl -sS -X POST 'http://localhost:8081/v1/request/generate' \
  -H 'content-type: application/json' \
  -d '{"payload": {"prompt": "a tiny robot watering a bonsai, macro photo"}}'

Outputs are written under /outputs/runs/<request_id>/outputs/... (matching Cozy ref semantics).

Configuration

endpoint.toml

schema_version = 1
name = "my-worker"
main = "my_pkg.main"

[functions.generate]
batch_dimension = "items"  # optional

[models]
sdxl = { ref = "stabilityai/stable-diffusion-xl-base-1.0", dtypes = ["fp16", "bf16"] }

[models.generate]
dreamshaper = { ref = "lykon/dreamshaper-xl-v2-turbo", dtypes = ["fp16", "bf16"] }

[resources]
max_inflight_requests = 1

Environment Variables

Orchestrator-injected (production contract):

Variable Default Description
WORKER_MODE inference Runtime mode selector (inference or trainer)
SCHEDULER_PUBLIC_ADDR - Scheduler address workers should dial
SCHEDULER_ADDRS - Optional comma-separated LB seed addresses
WORKER_JWT - Worker-connect JWT (required; claims are authoritative)

Local dev / advanced (not injected by orchestrator):

Variable Default Description
SCHEDULER_JWKS_URL - Optional: verify WORKER_JWT locally against scheduler JWKS
SCHEDULER_JWT_ISSUER - Optional: expected iss when verifying WORKER_JWT locally
SCHEDULER_JWT_AUDIENCE - Optional: expected aud when verifying WORKER_JWT locally
USE_TLS false Local-dev knob for plaintext vs TLS gRPC; production typically terminates TLS upstream
LB_ONLY_RETRIES true Retry via configured LB endpoint(s) only; ignore direct owner redirect hints
RECONNECT_DELAY 0.1 Base reconnect backoff in seconds (exponential)
RECONNECT_MAX_DELAY 1.0 Reconnect backoff cap in seconds
RECONNECT_JITTER_SECONDS 0.1 Added jitter upper bound in seconds, capped by RECONNECT_MAX_DELAY
MAX_RECONNECT_ATTEMPTS 0 Max reconnect attempts (0 = infinite retries)
WORKER_MAX_CONCURRENCY - Max concurrent task executions
WORKER_MAX_INPUT_BYTES - Max input payload size
WORKER_MAX_OUTPUT_BYTES - Max output payload size
WORKER_MAX_UPLOAD_BYTES - Max file upload size
WORKER_MAX_VRAM_GB Auto Maximum VRAM for models
WORKER_VRAM_SAFETY_MARGIN_GB 3.5 Reserved VRAM for working memory
TENSORHUB_CACHE_DIR ~/.cache/tensorhub TensorHub cache root; worker CAS defaults derive from this (${TENSORHUB_CACHE_DIR}/cas/...)
WORKER_LOCAL_MODEL_CACHE_DIR /tmp/tensorhub/local-model-cache Optional local (non-NFS) cache for snapshot localization
WORKER_REGISTER_TIMEOUT_S 90 Startup watchdog: fail fast if worker never registers with scheduler
WORKER_WARN_MODEL_RESOLVE_S 30 Emit task.model_resolve.stuck warning after this duration
WORKER_WARN_MODEL_LOAD_S 60 Emit task.model_load.stuck warning after this duration
WORKER_WARN_INFERENCE_S 60 Emit task.inference.stuck warning after this duration
WORKER_MAX_CONCURRENT_DOWNLOADS 2 Max parallel model downloads
TENSORHUB_URL - Cozy Hub base URL (used for public model requests and, if enabled, Cozy Hub API resolve)
WORKER_ALLOW_TENSORHUB_API_RESOLVE false Local dev only: allow the worker to call Cozy Hub resolve APIs
TENSORHUB_TOKEN - Cozy Hub bearer token (optional; enables ingest-if-missing for public models, if Cozy Hub requires auth)
TRAINER_JOB_SPEC_PATH /app/.cozy/trainer_job.json Trainer-mode JSON job manifest path
TRAINER_PLUGIN - Trainer plugin import (module:symbol); optional if provided in job JSON
TRAINER_CHECKPOINTS_DIR /tmp/training/checkpoints Local checkpoint output directory in trainer mode
TRAINER_SAMPLES_DIR /tmp/training/samples Local sample output directory in trainer mode
TRAINER_EVENTS_PATH - Optional line-delimited JSON lifecycle event log for trainer mode

Metrics

The worker can emit best-effort performance/debug metrics to gen-orchestrator via WorkerEvent messages.

See docs/metrics.md. See docs/worker-stuck-visibility.md for startup/task watchdog events used to diagnose stuck workers.

Model Download Behavior

Model refs are plain lower-case strings:

  • owner/repo
  • owner/repo:tag
  • owner/repo@blake3:<digest>

Tags are mutable pointers that resolve to published versions.

Cozy snapshot/object file downloads are written to *.part and then atomically renamed on success. If a *.part file exists from a previous interrupted download, the worker attempts to resume it using HTTP Range requests (if supported by the presigned object-store URL), and falls back to a full re-download if Range is not supported.

Docker Deployment

Project Structure

my-worker/
├── pyproject.toml
├── uv.lock
└── src/
    └── my_module/
        └── main.py

Local Dev Build (Using Root Dockerfile)

For production, use the cozyctl CLI to build and deploy worker-images to our network. But for local testing, you can build images using our provided Dockerfile:

# Build an example using the same root Dockerfile
docker build -t sd15-worker -f Dockerfile examples/sd15

# Run
docker run \
  -e SCHEDULER_PUBLIC_ADDR=orchestrator:8080 \
  -e WORKER_JWT='<worker-connect-jwt>' \
  sd15-worker

Canonical local dev build args (GPU, CUDA 12.6, torch 2.10.x, Python 3.12):

cd ~/cozy/python-gen-worker

docker build \
  --build-arg PYTHON_VERSION=3.12 \
  --build-arg UV_TORCH_BACKEND=cu126 \
  --build-arg TORCH_SPEC='~=2.10.0' \
  -f Dockerfile \
  -t my-worker:dev \
  examples/sd15

Optional build args:

docker build \
  --build-arg PYTHON_VERSION=3.12 \
  --build-arg UV_TORCH_BACKEND=cu128 \
  --build-arg TORCH_SPEC=">=2.9,<3" \
  -t my-worker -f Dockerfile examples/sd15

Build Base

Worker images build directly from a Python+uv base image:

  • ghcr.io/astral-sh/uv:python3.12-bookworm-slim

PyTorch/CUDA dependencies are installed as part of your worker's dependency set during image build.

Publish/Promote Lifecycle

Control-plane behavior (tensorhub + orchestrator):

  • Every publish creates a new immutable internal release_id.
  • End users invoke functions by owner/endpoint/function (default prod) or owner/endpoint/function:tag.
  • endpoint is derived from endpoint.toml name and normalized to a URL-safe slug.
  • function names are derived from Python @worker_function names and normalized to URL-safe slugs (for example, medasr_transcribe -> medasr-transcribe).
  • Publishing does not move traffic by default.
  • Promoting a function tag moves traffic to that release.
  • Rollback is just retargeting the tag to an older release.

Model Cache

Workers report model availability for intelligent job routing:

State Location Latency
Hot VRAM Instant
Warm Disk Seconds
Cold None Minutes (download required)

Dev Testing (Mock Orchestrator)

For local end-to-end tests without standing up gen-orchestrator, use the one-off mock orchestrator invoke command (curl-like workflow). It starts a temporary scheduler, waits for a worker to connect, sends one TaskExecutionRequest, prints the result, and exits.

Start your worker container first:

docker run --rm \
  --add-host=host.docker.internal:host-gateway \
  -e SCHEDULER_PUBLIC_ADDR=host.docker.internal:8080 \
  -e WORKER_JWT='dev-worker-jwt' \
  <your-worker-image>

In another terminal, send one request:

python -m gen_worker.testing.mock_orchestrator \
  --listen 0.0.0.0:8080 \
  --run hello \
  --payload-json '{"name":"world"}'

Run the command again with a different payload whenever you want to send another request.

from gen_worker.model_cache import ModelCache

cache = ModelCache(max_vram_gb=20.0)
cache.mark_loaded_to_vram("model-a", pipeline, size_gb=8.0)
cache.is_in_vram("model-a")  # True
cache.get_vram_models()      # ["model-a"]

Error Handling

from gen_worker.errors import RetryableError, ValidationError, FatalError

@worker_function()
def process(ctx: RequestContext, payload: Input) -> Output:
    if not payload.prompt:
        raise ValidationError("prompt is required")  # 400, no retry

    try:
        result = call_external_api()
    except TimeoutError:
        raise RetryableError("API timeout")  # Will be retried

    return Output(result=result)

Development

# Install dev dependencies
uv sync --extra dev

# Run tests
uv run pytest

# Type checking
uv run mypy src/gen_worker

# Build
uv build

Regenerating Protobuf Stubs

Requires gen-orchestrator as a sibling repo:

uv sync --extra dev
python -m grpc_tools.protoc -I../gen-orchestrator/proto --python_out=src/gen_worker/pb --grpc_python_out=src/gen_worker/pb ../gen-orchestrator/proto/*.proto

Worker Wire Protocol

The worker advertises a protocol MAJOR.MINOR in WorkerRegistration (protocol_major, protocol_minor).

  • Current runtime constants live in src/gen_worker/wire_protocol.py.
  • Orchestrator compatibility policy/ranges are documented in ../gen-orchestrator/docs/worker_wire_protocol.md.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gen_worker-0.3.10.tar.gz (163.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gen_worker-0.3.10-py3-none-any.whl (187.4 kB view details)

Uploaded Python 3

File details

Details for the file gen_worker-0.3.10.tar.gz.

File metadata

  • Download URL: gen_worker-0.3.10.tar.gz
  • Upload date:
  • Size: 163.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for gen_worker-0.3.10.tar.gz
Algorithm Hash digest
SHA256 f750110a08b5a9ef868c12878698076ef8e88f19a8816fcdcc73343df88cc366
MD5 c13e5c20fd7c9d1bc4c58f4943cc2760
BLAKE2b-256 72f35509b6d962895800d26a44f255aeeb6df864312e3d4e2fc59fb63f1c9f3a

See more details on using hashes here.

File details

Details for the file gen_worker-0.3.10-py3-none-any.whl.

File metadata

  • Download URL: gen_worker-0.3.10-py3-none-any.whl
  • Upload date:
  • Size: 187.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for gen_worker-0.3.10-py3-none-any.whl
Algorithm Hash digest
SHA256 956ebef7cd1b4cc55e7b8612c27063c8925ecf2f104d9a2553db2fa7cf665a25
MD5 76c3c136e8e313bd58a89b62bb06334d
BLAKE2b-256 efe3a99492c2138cc586ca9ebaf46690039327c0ba63f03cebf6830139b68c80

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page