Skip to main content

Runtime engine for Crunch coordinator nodes

Project description

coordinator-node

PyPI

Runtime engine for Crunch coordinator nodes. Powers the full competition pipeline — from data ingestion through scoring to on-chain emission checkpoints.

pip install coordinator-node

Two ways to use this repo

1. Scaffold a new competition (recommended)

Use the Crunch CLI to create a self-contained workspace that pulls coordinator-node from PyPI:

crunch-cli init-workspace my-challenge
cd my-challenge
make deploy

This creates:

my-challenge/
├── node/          ← docker-compose, config, scripts (uses coordinator-node from PyPI)
├── challenge/     ← participant-facing package (tracker, scoring, examples)
└── Makefile

2. Develop the engine itself

Clone this repo to work on the coordinator_node package directly:

git clone https://github.com/crunchdao/coordinator-node-starter.git
cd coordinator-node-starter
uv sync
make deploy    # uses local coordinator_node/ via COPY in Dockerfile

Changes to coordinator_node/ are picked up immediately on rebuild.


Architecture

Pipeline

Feed → Input → Prediction → Score → Snapshot → Checkpoint → On-chain

Workers

Worker Purpose
feed-data-worker Ingests feed data (Pyth, Binance, etc.) via polling + backfill
predict-worker Gets latest data → ticks models → collects predictions
score-worker Resolves actuals → scores predictions → writes snapshots → rebuilds leaderboard
checkpoint-worker Aggregates snapshots → builds EmissionCheckpoint for on-chain submission
report-worker FastAPI server: leaderboard, predictions, feeds, snapshots, checkpoints

Feed Dimensions

Dimension Example Env var
source pyth, binance FEED_SOURCE
subject BTC, ETH FEED_SUBJECTS
kind tick, candle FEED_KIND
granularity 1s, 1m FEED_GRANULARITY

Status Lifecycles

Input:       RECEIVED → RESOLVED
Prediction:  PENDING → SCORED / FAILED / ABSENT
Checkpoint:  PENDING → SUBMITTED → CLAIMABLE → PAID

Configuration

All configuration is via environment variables. Copy the example env file to get started:

cp .local.env.example .local.env

Key variables:

Variable Description Default
CRUNCH_ID Competition identifier starter-challenge
FEED_SOURCE Data source pyth
FEED_SUBJECTS Assets to track BTC
SCORING_FUNCTION Dotted path to scoring callable coordinator_node.extensions.default_callables:default_score_prediction
CHECKPOINT_INTERVAL_SECONDS Seconds between checkpoints 604800
MODEL_BASE_CLASSNAME Participant model base class tracker.TrackerBase
MODEL_RUNNER_NODE_HOST Model orchestrator host model-orchestrator

Extension Points

Customize competition behavior by setting callable paths in your env:

Env var Purpose
SCORING_FUNCTION Score a prediction against ground truth
INFERENCE_INPUT_BUILDER Transform raw feed data into model input
INFERENCE_OUTPUT_VALIDATOR Validate model output shape/values
MODEL_SCORE_AGGREGATOR Aggregate per-model scores across predictions
LEADERBOARD_RANKER Custom leaderboard ranking strategy

Contract

All type shapes and behavior are defined in a single CrunchContract:

from coordinator_node.contracts import CrunchContract

class CrunchContract(BaseModel):
    raw_input_type: type[BaseModel]
    output_type: type[BaseModel]
    score_type: type[BaseModel]
    scope: PredictionScope
    aggregation: Aggregation

    # Callables
    resolve_ground_truth: Callable
    aggregate_snapshot: Callable
    build_emission: Callable

Report API

Endpoint Description
GET /reports/leaderboard Current leaderboard
GET /reports/models Registered models
GET /reports/predictions Prediction history
GET /reports/feeds Active feed subscriptions
GET /reports/snapshots Per-model period summaries
GET /reports/checkpoints Checkpoint history
GET /reports/checkpoints/{id}/emission Raw emission (frac64)
GET /reports/checkpoints/{id}/emission/cli-format CLI JSON format
GET /reports/emissions/latest Latest emission
POST /reports/checkpoints/{id}/confirm Record tx_hash
PATCH /reports/checkpoints/{id}/status Advance status

Emission Checkpoints

Checkpoints produce EmissionCheckpoint matching the on-chain protocol:

{
    "crunch": "<pubkey>",
    "cruncher_rewards": [{"cruncher_index": 0, "reward_pct": 350000000}],
    "compute_provider_rewards": [],
    "data_provider_rewards": []
}

reward_pct uses frac64 (1,000,000,000 = 100%).


Database Tables

Feed layer

Table Purpose
feed_records Raw data points from external sources. Keyed by (source, subject, kind, granularity, ts_event). Values and metadata stored as JSONB.
feed_ingestion_state Tracks the last ingested timestamp per feed scope to enable incremental polling and backfill.

Pipeline layer

Table Purpose
inputs Incoming data events. Status: RECEIVED → RESOLVED. Holds raw data, actuals (once known), and scope metadata.
predictions One row per model per input. Links to a scheduled_prediction_config. Stores inference output, execution time, and resolution timestamp. Status: PENDING → SCORED / FAILED / ABSENT.
scores One row per scored prediction. Stores the result payload, success flag, and optional failure reason.
snapshots Per-model period summaries. Aggregates prediction counts and result metrics over a time window.
checkpoints Periodic emission checkpoints. Aggregates snapshots into on-chain reward distributions. Status: PENDING → SUBMITTED → CLAIMABLE → PAID.
scheduled_prediction_configs Defines when and what to predict — scope template, schedule, and ordering. Seeded at init from scheduled_prediction_configs.json.

Model layer

Table Purpose
models Registered participant models. Tracks overall and per-scope scores as JSONB.
leaderboards Point-in-time leaderboard snapshots with ranked entries as JSONB.

Local Development

# Run tests
uv run pytest tests/ -x -q

# Start all services locally
make deploy

# View logs
make logs

# Tear down
make down

Project Structure

coordinator-node-starter/
├── coordinator_node/       ← core engine (published to PyPI as coordinator-node)
│   ├── workers/            ← feed, predict, score, checkpoint, report workers
│   ├── services/           ← business logic
│   ├── entities/           ← domain models
│   ├── db/                 ← database tables and init
│   ├── feeds/              ← data source adapters (Pyth, Binance, etc.)
│   ├── schemas/            ← API schemas
│   ├── extensions/         ← default callables
│   ├── config/             ← runtime configuration
│   └── contracts.py        ← competition shape: types, scope, and callable hooks
├── base/                   ← template used by crunch-cli init-workspace
│   ├── node/               ← node template (Dockerfile, docker-compose, config)
│   └── challenge/          ← challenge template (tracker, scoring, examples)
├── tests/                  ← test suite
├── docker-compose.yml      ← local dev compose (uses local coordinator_node/)
├── Dockerfile              ← local dev Dockerfile (COPYs coordinator_node/)
├── pyproject.toml          ← package definition
└── Makefile                ← deploy / down / logs / test

Publishing

uv build
twine upload dist/*

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

coordinator_node-0.1.4.tar.gz (184.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

coordinator_node-0.1.4-py3-none-any.whl (58.7 kB view details)

Uploaded Python 3

File details

Details for the file coordinator_node-0.1.4.tar.gz.

File metadata

  • Download URL: coordinator_node-0.1.4.tar.gz
  • Upload date:
  • Size: 184.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.2

File hashes

Hashes for coordinator_node-0.1.4.tar.gz
Algorithm Hash digest
SHA256 494d0168a128794af535f393e149900047e6cddb7194ceb435510004181cbf0e
MD5 429ae13ac5bb8fb43cae3d90440c401e
BLAKE2b-256 d15836c39b2a0653ee218353d0d3ccea0e037ead7e1558d313167c69d816d64b

See more details on using hashes here.

File details

Details for the file coordinator_node-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for coordinator_node-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 6e97e193e57ef1e5a7b4e145f367c0837a09ddea2899f6dcefdb90945aa3cc29
MD5 f725cec9a943c421085447d2b7a34bf5
BLAKE2b-256 308cbe451c0bb88021074637ff5e14fc3b4a508f34baf8129227432416e671ed

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page