Skip to main content

A library used to build custom functions in Cozy Creator's serverless function platform.

Project description

gen-worker

A Python SDK for building serverless functions for AI inference. Write your function, declare required model refs, publish an endpoint release, and invoke it via Cozy's control plane.

Tenant Worker Build Contract (Dockerfile-First)

When publishing a tenant worker, Cozy expects a Dockerfile-first project layout.

Build inputs MUST include:

  • endpoint.toml (Cozy manifest; used at build/publish time)
  • Dockerfile (builds the worker image)
  • tenant code (pyproject.toml, uv.lock, src/, etc.)

The built image MUST:

  1. Install gen-worker (so discovery + runtime can run).
  2. Bake function discovery output (manifest) at build time:
RUN mkdir -p /app/.tensorhub && python -m gen_worker.discover > /app/.tensorhub/endpoint.lock
  1. Use the Cozy worker runtime as the ENTRYPOINT:
ENTRYPOINT ["python", "-m", "gen_worker.entrypoint"]

Notes:

  • endpoint.toml is not required to be present in the final image; it is a build-time input.
  • The platform reads /app/.tensorhub/endpoint.lock from the built image and stores it in Cozy Hub DB for routing/invocation.

Installation

Start a python project, and then run:

uv add gen-worker

With PyTorch support:

uv add gen-worker[torch]

Quick Start

import msgspec
from gen_worker import RequestContext, worker_function

class Input(msgspec.Struct):
    prompt: str

class Output(msgspec.Struct):
    text: str

@worker_function()
def generate(ctx: RequestContext, payload: Input) -> Output:
    return Output(text=f"Hello, {payload.prompt}!")

Features

  • Function discovery - Automatic detection of @worker_function decorated functions
  • Schema generation - Input/output schemas extracted from msgspec types
  • Model injection - Dependency injection for ML models with caching
  • Streaming output - Support for incremental/streaming responses
  • Progress reporting - Built-in progress events via RequestContext
  • Perf metrics - Best-effort per-run metrics emitted to gen-orchestrator (metrics.* worker events)
  • Trainer runtime mode - SDK-native trainer loop via WORKER_MODE=trainer
  • File handling - Upload/download assets via Cozy hub file API
  • Model caching - LRU cache with VRAM/disk management and cache-aware routing

Authoring Endpoints

Three endpoint types are supported — inference, conversion, and training. See docs/endpoint-authoring.md for the full manual covering RequestContext, model injection (fixed and payload-selected), streaming output, file persistence, conversion reserved-name payloads (source/destination/outputs), and the trainer class contract (setup/configure/prepare_batch/train_step/state_dict/load_state_dict).

Training runs use trainer mode:

WORKER_MODE=trainer \
TRAINER_JOB_SPEC_PATH=/app/.cozy/trainer_job.json \
python -m gen_worker.entrypoint

Dev HTTP Runner (Local Inference Without gen-orchestrator)

For local testing of a built worker image (without standing up gen-orchestrator), run the dev HTTP runner and write outputs to a mounted local directory.

Container example:

docker run --rm --gpus all -p 8081:8081 \
  -v "$(pwd)/out:/outputs" \
  -e TENSORHUB_URL='http://host.docker.internal:7777' \
  <your-worker-image> \
  python -m gen_worker.testing.http_runner --listen 0.0.0.0:8081 --outputs /outputs

Prefetch a public model (example: SD1.5):

curl -sS -X POST 'http://localhost:8081/v1/models/prefetch' \
  -H 'content-type: application/json' \
  -d '{"models":[{"ref":"runwayml/stable-diffusion-v1-5","dtypes":["bf16","fp16"]}]}'

Invoke a function:

curl -sS -X POST 'http://localhost:8081/v1/request/generate' \
  -H 'content-type: application/json' \
  -d '{"payload": {"prompt": "a tiny robot watering a bonsai, macro photo"}}'

Outputs are written under /outputs/jobs/<request_id>/outputs/... (matching Cozy ref semantics).

Configuration

endpoint.toml

schema_version = 1
name = "my-worker"
main = "my_pkg.main"

[functions.generate]
batch_dimension = "items"  # optional

[models]
sdxl = { ref = "stabilityai/stable-diffusion-xl-base-1.0", attributes = { dtype = ["fp16", "bf16"] } }

[models.generate]
dreamshaper = { ref = "lykon/dreamshaper-xl-v2-turbo", attributes = { dtype = ["fp16", "bf16"] } }

[resources]
max_inflight_requests = 1

Environment Variables

Orchestrator-injected (production contract):

Variable Default Description
WORKER_MODE inference Runtime mode selector (inference or trainer)
SCHEDULER_PUBLIC_ADDR - Scheduler address workers should dial
SCHEDULER_ADDRS - Optional comma-separated LB seed addresses
WORKER_JWT - Worker-connect JWT (required; claims are authoritative)

Local dev / advanced (not injected by orchestrator):

Variable Default Description
SCHEDULER_JWKS_URL - Optional: verify WORKER_JWT locally against scheduler JWKS
SCHEDULER_JWT_ISSUER - Optional: expected iss when verifying WORKER_JWT locally
SCHEDULER_JWT_AUDIENCE - Optional: expected aud when verifying WORKER_JWT locally
USE_TLS false Local-dev knob for plaintext vs TLS gRPC; production typically terminates TLS upstream
LB_ONLY_RETRIES true Retry via configured LB endpoint(s) only; ignore direct owner redirect hints
RECONNECT_DELAY 0.1 Base reconnect backoff in seconds (exponential)
RECONNECT_MAX_DELAY 1.0 Reconnect backoff cap in seconds
RECONNECT_JITTER_SECONDS 0.1 Added jitter upper bound in seconds, capped by RECONNECT_MAX_DELAY
MAX_RECONNECT_ATTEMPTS 0 Max reconnect attempts (0 = infinite retries)
WORKER_MAX_CONCURRENCY - Max concurrent request executions
WORKER_MAX_INPUT_BYTES - Max input payload size
WORKER_MAX_OUTPUT_BYTES - Max output payload size
WORKER_MAX_UPLOAD_BYTES - Max file upload size
WORKER_MAX_VRAM_GB Auto Maximum VRAM for models
WORKER_VRAM_SAFETY_MARGIN_GB 3.5 Reserved VRAM for working memory
TENSORHUB_CACHE_DIR ~/.cache/tensorhub TensorHub cache root; worker CAS defaults derive from this (${TENSORHUB_CACHE_DIR}/cas/...)
WORKER_LOCAL_MODEL_CACHE_DIR /tmp/tensorhub/local-model-cache Optional local (non-NFS) cache for snapshot localization
WORKER_REGISTER_TIMEOUT_S 90 Startup watchdog: fail fast if worker never registers with scheduler
WORKER_WARN_MODEL_RESOLVE_S 30 Emit request.model_resolve.stuck warning after this duration
WORKER_WARN_MODEL_LOAD_S 60 Emit request.model_load.stuck warning after this duration
WORKER_WARN_INFERENCE_S 60 Emit request.inference.stuck warning after this duration
WORKER_MAX_CONCURRENT_DOWNLOADS 2 Max parallel model downloads
TENSORHUB_URL - Cozy Hub base URL (used for public model requests and, if enabled, Cozy Hub API resolve)
WORKER_ALLOW_TENSORHUB_API_RESOLVE false Local dev only: allow the worker to call Cozy Hub resolve APIs
TENSORHUB_TOKEN - Cozy Hub bearer token (optional; enables ingest-if-missing for public models, if Cozy Hub requires auth)
TRAINER_JOB_SPEC_PATH /app/.cozy/trainer_job.json Trainer-mode JSON job manifest path
TRAINER_PLUGIN - Trainer plugin import (module:symbol); optional if provided in job JSON
TRAINER_CHECKPOINTS_DIR /tmp/training/checkpoints Local checkpoint output directory in trainer mode
TRAINER_SAMPLES_DIR /tmp/training/samples Local sample output directory in trainer mode
TRAINER_EVENTS_PATH - Optional line-delimited JSON lifecycle event log for trainer mode

Metrics

The worker can emit best-effort performance/debug metrics to gen-orchestrator via WorkerEvent messages.

See the Observability section in docs/endpoint-authoring.md for the event catalog (request lifecycle, startup phases, per-run metrics.*, and cache inventory).

Model Download Behavior

Model refs are plain lower-case strings:

  • owner/repo
  • owner/repo:tag
  • owner/repo@blake3:<digest>

Tags are mutable pointers that resolve to published versions.

Cozy snapshot/object file downloads are written to *.part and then atomically renamed on success. If a *.part file exists from a previous interrupted download, the worker attempts to resume it using HTTP Range requests (if supported by the presigned object-store URL), and falls back to a full re-download if Range is not supported.

Docker Deployment

Project Structure

my-worker/
├── pyproject.toml
├── uv.lock
└── src/
    └── my_module/
        └── main.py

Local Dev Build (Using Root Dockerfile)

For production, use the cozyctl CLI to build and deploy worker-images to our network. But for local testing, you can build images using our provided Dockerfile:

# Build an example using the same root Dockerfile
docker build -t sd15-worker -f Dockerfile examples/sd15

# Run
docker run \
  -e SCHEDULER_PUBLIC_ADDR=orchestrator:8080 \
  -e WORKER_JWT='<worker-connect-jwt>' \
  sd15-worker

Canonical local dev build args (GPU, CUDA 12.6, torch 2.10.x, Python 3.12):

cd ~/cozy/python-gen-worker

docker build \
  --build-arg PYTHON_VERSION=3.12 \
  --build-arg UV_TORCH_BACKEND=cu126 \
  --build-arg TORCH_SPEC='~=2.10.0' \
  -f Dockerfile \
  -t my-worker:dev \
  examples/sd15

Optional build args:

docker build \
  --build-arg PYTHON_VERSION=3.12 \
  --build-arg UV_TORCH_BACKEND=cu128 \
  --build-arg TORCH_SPEC=">=2.9,<3" \
  -t my-worker -f Dockerfile examples/sd15

Build Base

Worker images build directly from a Python+uv base image:

  • ghcr.io/astral-sh/uv:python3.12-bookworm-slim

PyTorch/CUDA dependencies are installed as part of your worker's dependency set during image build.

Publish/Promote Lifecycle

Control-plane behavior (tensorhub + orchestrator):

  • Every publish creates a new immutable internal release_id.
  • End users invoke functions by owner/endpoint/function (default prod) or owner/endpoint/function:tag.
  • endpoint is derived from endpoint.toml name and normalized to a URL-safe slug.
  • function names are derived from Python @worker_function names and normalized to URL-safe slugs (for example, medasr_transcribe -> medasr-transcribe).
  • Publishing does not move traffic by default.
  • Promoting a function tag moves traffic to that release.
  • Rollback is just retargeting the tag to an older release.

Model Cache

Workers report model availability for intelligent job routing:

State Location Latency
Hot VRAM Instant
Warm Disk Seconds
Cold None Minutes (download required)

Dev Testing (Mock Orchestrator)

For local end-to-end tests without standing up gen-orchestrator, use the one-off mock orchestrator invoke command (curl-like workflow). It starts a temporary scheduler, waits for a worker to connect, sends one JobExecutionRequest, prints the result, and exits.

Start your worker container first:

docker run --rm \
  --add-host=host.docker.internal:host-gateway \
  -e SCHEDULER_PUBLIC_ADDR=host.docker.internal:8080 \
  -e WORKER_JWT='dev-worker-jwt' \
  <your-worker-image>

In another terminal, send one request:

python -m gen_worker.testing.mock_orchestrator \
  --listen 0.0.0.0:8080 \
  --run hello \
  --payload-json '{"name":"world"}'

Run the command again with a different payload whenever you want to send another request.

from gen_worker.model_cache import ModelCache

cache = ModelCache(max_vram_gb=20.0)
cache.mark_loaded_to_vram("model-a", pipeline, size_gb=8.0)
cache.is_in_vram("model-a")  # True
cache.get_vram_models()      # ["model-a"]

Error Handling

from gen_worker.errors import RetryableError, ValidationError, FatalError

@worker_function()
def process(ctx: RequestContext, payload: Input) -> Output:
    if not payload.prompt:
        raise ValidationError("prompt is required")  # 400, no retry

    try:
        result = call_external_api()
    except TimeoutError:
        raise RetryableError("API timeout")  # Will be retried

    return Output(result=result)

Development

# Install dev dependencies
uv sync --extra dev

# Run tests
uv run pytest

# Type checking
uv run mypy src/gen_worker

# Build
uv build

Regenerating Protobuf Stubs

Requires gen-orchestrator as a sibling repo:

uv sync --extra dev
python -m grpc_tools.protoc -I../gen-orchestrator/proto --python_out=src/gen_worker/pb --grpc_python_out=src/gen_worker/pb ../gen-orchestrator/proto/*.proto

Worker Wire Protocol

The worker advertises a protocol MAJOR.MINOR in WorkerRegistration (protocol_major, protocol_minor).

  • Current runtime constants live in src/gen_worker/wire_protocol.py.
  • Orchestrator compatibility policy/ranges are documented in ../gen-orchestrator/docs/worker_wire_protocol.md.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gen_worker-0.5.0.tar.gz (258.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gen_worker-0.5.0-py3-none-any.whl (301.8 kB view details)

Uploaded Python 3

File details

Details for the file gen_worker-0.5.0.tar.gz.

File metadata

  • Download URL: gen_worker-0.5.0.tar.gz
  • Upload date:
  • Size: 258.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for gen_worker-0.5.0.tar.gz
Algorithm Hash digest
SHA256 17255f5ab7d7f9750f01aff26c88b650a1e4a7e2fa0c5531b25ca2eb7a79535a
MD5 311856146b3ec8fde31ce4c2e1d4e8b6
BLAKE2b-256 5d783ccd4a5889003f61415b193170e8f3d00b846148a3a32a06ce8f704e439c

See more details on using hashes here.

File details

Details for the file gen_worker-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: gen_worker-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 301.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for gen_worker-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d1b337e958a94fe41673a1fb838512d0accde110f95efa3aa979b7a01162ad02
MD5 623126189b1c5b9f289cc341490fac21
BLAKE2b-256 70da378b164046c96cc5228b81982fc54821fc645c2e100016ffe4681883ca39

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page