Skip to main content

Runtime engine for Crunch nodes

Project description

crunch-node

PyPI

Runtime engine for Crunch nodes. Powers the full competition pipeline — from data ingestion through scoring to on-chain emission checkpoints.

pip install crunch-node

Two ways to use this repo

1. Scaffold a new competition (recommended)

Use the Crunch CLI to create a self-contained workspace that pulls crunch-node from PyPI:

crunch-cli init-workspace my-challenge
cd my-challenge
make deploy

This creates:

my-challenge/
├── node/          ← docker-compose, config, scripts (uses crunch-node from PyPI)
├── challenge/     ← participant-facing package (tracker, scoring, examples)
└── Makefile

2. Develop the engine itself

Clone this repo to work on the crunch_node package directly:

git clone https://github.com/crunchdao/coordinator-node-starter.git
cd crunch-node-starter
uv sync
make deploy    # uses local crunch_node/ via COPY in Dockerfile

Changes to crunch_node/ are picked up immediately on rebuild.


Architecture

Pipeline

Feed → Input → Prediction → Score → Snapshot → Checkpoint → On-chain

Predict latency target (architecture SLO)

The architecture should support ~50ms predict roundtrip when optimized.

  • Definition: predict-worker roundtrip from new data wake-up/availability to prediction persistence.
  • Any architecture decision expected to push this materially above ~50ms must be called out explicitly in specs, PR notes, and agent output.
  • If such a deviation is required, include rationale, expected impact, and mitigation options.

Architecture docs

Detailed C4 + refactor documentation:

  • docs/architecture/README.md
  • docs/architecture/predict-service-kernel-architecture.md

Workers

Worker Purpose
predict-worker Ingests feed data (Pyth, Binance, etc.), ticks models, collects predictions
score-worker Resolves actuals → scores predictions → writes snapshots → rebuilds leaderboard → creates checkpoints
report-worker FastAPI server: leaderboard, predictions, feeds, snapshots, checkpoints

Feed Dimensions

Dimension Example Env var
source pyth, binance FEED_SOURCE
subject BTC, ETH FEED_SUBJECTS
kind tick, candle FEED_KIND
granularity 1s, 1m FEED_GRANULARITY

Status Lifecycles

Input:       RECEIVED → RESOLVED
Prediction:  PENDING → SCORED / FAILED / ABSENT
Checkpoint:  PENDING → SUBMITTED → CLAIMABLE → PAID

Configuration

All configuration is via environment variables. Copy the example env file to get started:

cp .local.env.example .local.env

Key variables:

Variable Description Default
CRUNCH_ID Competition identifier starter-challenge
FEED_SOURCE Data source pyth
FEED_SUBJECTS Assets to track BTC
SCORING_FUNCTION Dotted path to scoring callable crunch_node.extensions.default_callables:default_score_prediction
CHECKPOINT_INTERVAL_SECONDS Seconds between checkpoints 604800
MODEL_BASE_CLASSNAME Participant model base class tracker.TrackerBase
MODEL_RUNNER_NODE_HOST Model orchestrator host model-orchestrator

API Security

Endpoints are protected by API key authentication when API_KEY is set. Off by default for backward compatibility.

Quick start

# In .local.env
API_KEY=my-strong-secret

After make deploy, admin endpoints require the key:

# Rejected (401)
curl -s http://localhost:8000/reports/backfill

# Accepted
curl -s -H "X-API-Key: my-strong-secret" http://localhost:8000/reports/backfill

Endpoint tiers

Tier Default prefixes Auth required
Public /healthz, /reports/leaderboard, /reports/schema, /reports/models, /reports/feeds, /info, /docs Never
Read /reports/predictions, /reports/snapshots, /data/backfill/* Only if API_READ_AUTH=true
Admin /reports/backfill (POST), /reports/checkpoints/, /custom/* Always (when API_KEY set)

Sending the key

Three methods (any one works):

# X-API-Key header (recommended)
curl -H "X-API-Key: <key>" ...

# Authorization: Bearer header
curl -H "Authorization: Bearer <key>" ...

# Query parameter (for quick testing only)
curl "...?api_key=<key>"

Configuration

Env var Default Description
API_KEY (empty) Shared secret. When unset, all endpoints are open.
API_READ_AUTH false When true, read endpoints also require the API key.
API_PUBLIC_PREFIXES See above Comma-separated prefixes that never require auth.
API_ADMIN_PREFIXES See above Comma-separated prefixes that always require auth.

Custom api/ endpoints

Custom endpoints under /custom/ are admin-tier by default — they require the API key when set. To make a custom endpoint public, add its prefix to API_PUBLIC_PREFIXES.


Custom API Endpoints

Add endpoints to the report worker by dropping Python files in node/api/:

# node/api/my_endpoints.py
from fastapi import APIRouter

router = APIRouter(prefix="/custom", tags=["custom"])

@router.get("/hello")
def hello():
    return {"message": "Hello from custom endpoint"}

After make deploy, available at http://localhost:8000/custom/hello.

Any .py file in api/ with a router attribute (a FastAPI APIRouter) is auto-mounted at startup. Files starting with _ are skipped.

Full database access is available via the same dependency injection pattern:

from typing import Annotated
from fastapi import APIRouter, Depends
from sqlmodel import Session
from crunch_node.db import create_session, DBModelRepository

router = APIRouter(prefix="/custom")

def get_db_session():
    with create_session() as session:
        yield session

@router.get("/models/count")
def model_count(session: Annotated[Session, Depends(get_db_session)]):
    return {"count": len(DBModelRepository(session).fetch_all())}
Env var Default Description
API_ROUTES_DIR api/ Directory to scan for router files
API_ROUTES (empty) Comma-separated module:attr paths for explicit imports

Extension Points

Customize competition behavior by setting callable paths in your env:

Env var Purpose
SCORING_FUNCTION Score a prediction against ground truth
INFERENCE_INPUT_BUILDER Transform raw feed data into model input
INFERENCE_OUTPUT_VALIDATOR Validate model output shape/values
MODEL_SCORE_AGGREGATOR Aggregate per-model scores across predictions
LEADERBOARD_RANKER Custom leaderboard ranking strategy

CrunchConfig

All type shapes and behavior are defined in a single CrunchConfig. Workers auto-discover the operator's config at startup:

from crunch_node.crunch_config import CrunchConfig, EnsembleConfig

contract = CrunchConfig(
    # Type shapes
    raw_input_type=RawInput,
    output_type=InferenceOutput,
    score_type=ScoreResult,
    scope=PredictionScope(),
    aggregation=Aggregation(),

    # Multi-metric scoring (default: 5 active metrics)
    metrics=["ic", "ic_sharpe", "hit_rate", "max_drawdown", "model_correlation"],

    # Ensemble (default: off)
    ensembles=[],

    # Callables
    resolve_ground_truth=default_resolve_ground_truth,
    aggregate_snapshot=default_aggregate_snapshot,
    build_emission=default_build_emission,
)

Config resolution order

All workers use load_config() which tries, in order:

  1. CRUNCH_CONFIG_MODULE env var (e.g. my_package.crunch_config:MyCrunchConfig)
  2. config.crunch_config:CrunchConfig — the standard operator override
  3. crunch_node.crunch_config:CrunchConfig — engine default

The operator's config is imported automatically — no env var needed if config/crunch_config.py exists on PYTHONPATH (it does in the Docker setup).


Multi-Metric Scoring

Every score cycle computes portfolio-level metrics alongside the per-prediction scoring function. Metrics are stored in snapshot result_summary JSONB and surfaced on the leaderboard.

Active metrics

Set in the contract — only listed metrics are computed:

# Use all defaults (ic, ic_sharpe, hit_rate, max_drawdown, model_correlation)
contract = CrunchConfig()

# Opt out entirely — per-prediction scoring only
contract = CrunchConfig(metrics=[])

# Pick specific metrics
contract = CrunchConfig(metrics=["ic", "sortino_ratio", "turnover"])

Built-in metrics

Tier Name Description
T1 ic Information Coefficient — Spearman rank correlation vs. actual returns
T1 ic_sharpe mean(IC) / std(IC) — rewards consistency
T1 mean_return Mean return of a long-short portfolio from signals
T1 hit_rate % of predictions with correct directional sign
T1 model_correlation Mean pairwise correlation against other models
T2 max_drawdown Worst peak-to-trough on cumulative score
T2 sortino_ratio Sharpe but only penalizes downside
T2 turnover Signal change rate between consecutive predictions
T3 fnc Feature-Neutral Correlation (ensemble-aware)
T3 contribution Leave-one-out ensemble contribution
T3 ensemble_correlation Correlation to ensemble output

T3 metrics require ensembling to be enabled.

Custom metrics

Register your own metric function:

from crunch_node.metrics import get_default_registry

def my_custom_metric(predictions, scores, context):
    """Return a single float."""
    return some_computation(predictions, scores)

get_default_registry().register("my_custom", my_custom_metric)

# Then add it to the contract
contract = CrunchConfig(metrics=["ic", "my_custom"])

Ranking by any metric

The leaderboard can rank by any active metric. Set ranking_key to a metric name:

contract = CrunchConfig(
    metrics=["ic", "ic_sharpe", "hit_rate"],
    aggregation=Aggregation(ranking_key="ic_sharpe"),
)

Ensemble Framework

Combine multiple model predictions into virtual meta-models. Off by default — opt in via the contract.

Quick start

from crunch_node.crunch_config import CrunchConfig, EnsembleConfig
from crunch_node.services.ensemble import inverse_variance, equal_weight, top_n

contract = CrunchConfig(
    ensembles=[
        EnsembleConfig(name="main", strategy=inverse_variance),
        EnsembleConfig(name="top5", strategy=inverse_variance, model_filter=top_n(5)),
        EnsembleConfig(name="equal", strategy=equal_weight),
    ],
)

How it works

  1. After scoring, the score worker computes ensembles for each enabled config
  2. Models are filtered (optional model_filter)
  3. Weights computed via strategy(model_metrics, predictions) → {model_id: weight}
  4. Weighted-average predictions stored as PredictionRecord with model_id="__ensemble_{name}__"
  5. Virtual models are scored, metrics computed, and appear in leaderboard data

Built-in strategies

Strategy Description
inverse_variance Weight = 1/var(predictions), normalized. Default.
equal_weight 1/N for all included models.

Model filters

from crunch_node.services.ensemble import top_n, min_metric

# Keep only top 5 by score
EnsembleConfig(name="top5", model_filter=top_n(5))

# Keep models with IC > 0.03
EnsembleConfig(name="quality", model_filter=min_metric("ic", 0.03))

Leaderboard filtering

Ensemble virtual models are hidden from the leaderboard by default. Toggle with:

GET /reports/leaderboard?include_ensembles=true
GET /reports/models/global?include_ensembles=true
GET /reports/models/params?include_ensembles=true

Contribution-aware rewards

The default build_emission uses tier-based ranking. For competitions that want to incentivize diversity, switch to contribution_weighted_emission:

from crunch_node.extensions.emission_strategies import contribution_weighted_emission

config = CrunchConfig(
    build_emission=contribution_weighted_emission,
    metrics=["ic", "ic_sharpe", "hit_rate", "model_correlation", "contribution"],
    ensembles=[EnsembleConfig(name="main")],
)

This blends three factors into reward allocation:

  • Rank (50%): inverse rank — higher-ranked models get more
  • Contribution (30%): ensemble contribution — models that improve the ensemble get more
  • Diversity (20%): 1 - model_correlation — unique signals get more

Weights are configurable: contribution_weighted_emission(..., rank_weight=0.3, contribution_weight=0.5, diversity_weight=0.2).

Ensemble signal endpoint

Activate the built-in ensemble signal API by renaming:

mv node/api/ensemble_signals.py.disabled node/api/ensemble_signals.py
make deploy

Endpoints:

GET /signals/ensemble              → list available ensembles
GET /signals/ensemble/{name}       → latest ensemble prediction (the product)
GET /signals/ensemble/{name}/history → recent prediction history

Diversity feedback for competitors

Competitors can see how their model relates to the collective:

GET /reports/models/{model_id}/diversity

Returns:

{
  "model_id": "my_model",
  "rank": 3,
  "diversity_score": 0.75,
  "metrics": {
    "ic": 0.035,
    "model_correlation": 0.25,
    "ensemble_correlation": 0.60,
    "contribution": 0.02,
    "fnc": 0.03
  },
  "guidance": [
    "Low correlation + positive IC — your model provides unique alpha."
  ]
}

The backtest harness also surfaces diversity metrics:

result = BacktestRunner(model=MyTracker()).run(...)
result.summary()  # includes diversity section when model_id is set
result.diversity  # fetches live diversity feedback from coordinator

Report API

Endpoint Params Description
GET /reports/leaderboard include_ensembles (bool, default false) Current leaderboard
GET /reports/models Registered models
GET /reports/models/global projectIds, start, end, include_ensembles Global model scores
GET /reports/models/params projectIds, start, end, include_ensembles Per-scope model scores
GET /reports/predictions projectIds, start, end Prediction history
GET /reports/feeds Active feed subscriptions
GET /reports/models/{id}/diversity Diversity feedback: correlation, contribution, guidance
GET /reports/diversity limit All models' diversity scores for dashboard chart
GET /reports/ensemble/history ensemble_name, since, until, limit Ensemble metrics over time
GET /reports/checkpoints/rewards model_id, limit Reward distribution per checkpoint
GET /reports/snapshots model_id, since, until, limit Per-model period summaries (enriched with metrics)
GET /reports/checkpoints status, limit Checkpoint history
GET /reports/checkpoints/{id}/emission Raw emission (frac64)
GET /reports/checkpoints/{id}/emission/cli-format CLI JSON format
GET /reports/emissions/latest Latest emission
POST /reports/checkpoints/{id}/confirm tx_hash Record tx_hash
PATCH /reports/checkpoints/{id}/status status Advance status

Backfill & Data

Endpoint Description
GET /reports/backfill/feeds Configured feeds eligible for backfill
POST /reports/backfill Start a backfill job (409 if one running)
GET /reports/backfill/jobs List all backfill jobs
GET /reports/backfill/jobs/{id} Job detail with progress percentage
GET /data/backfill/index Manifest of available parquet files
GET /data/backfill/{source}/{subject}/{kind}/{granularity}/{file} Download parquet file

Backfill & Backtest

Coordinator-side backfill

Backfill historical data from the UI or API:

  1. Admin triggers backfill via POST /reports/backfill (or the UI)
  2. Data is fetched from the configured feed provider (Binance, Pyth, etc.)
  3. Written to Hive-partitioned parquet files: data/backfill/{source}/{subject}/{kind}/{granularity}/YYYY-MM-DD.parquet
  4. Progress tracked in backfill_jobs table (resumable on restart)
  5. Parquet files served via /data/backfill/ endpoints for model consumption

Competitor-side backtest

The challenge package includes a backtest harness. Competitors run backtests locally — model code is identical to production:

from starter_challenge.backtest import BacktestRunner
from my_model import MyTracker

result = BacktestRunner(model=MyTracker()).run(
    start="2026-01-01", end="2026-02-01"
)
result.predictions_df   # DataFrame in notebook
result.metrics           # rolling windows + multi-metric enrichment
result.summary()         # formatted output

# result.metrics example:
# {
#   'score_recent': 0.42, 'score_steady': 0.38, 'score_anchor': 0.35,
#   'ic': 0.035, 'ic_sharpe': 1.2, 'hit_rate': 0.58,
#   'mean_return': 0.012, 'max_drawdown': -0.08, 'sortino_ratio': 1.5,
#   'turnover': 0.23,
# }
  • Data auto-fetched from coordinator and cached locally on first run
  • Coordinator URL and feed dimensions baked into challenge package
  • Same tick()predict() loop as production
  • Same scoring function, rolling window metrics, and multi-metric evaluation as leaderboard

Emission Checkpoints

Checkpoints produce EmissionCheckpoint matching the on-chain protocol:

{
    "crunch": "<pubkey>",
    "cruncher_rewards": [{"cruncher_index": 0, "reward_pct": 350000000}],
    "compute_provider_rewards": [],
    "data_provider_rewards": []
}

reward_pct uses frac64 (1,000,000,000 = 100%).


Database Tables

Feed layer

Table Purpose
feed_records Raw data points from external sources. Keyed by (source, subject, kind, granularity, ts_event). Values and metadata stored as JSONB.
feed_ingestion_state Tracks the last ingested timestamp per feed scope to enable incremental polling and backfill.

Backfill layer

Table Purpose
backfill_jobs Tracks backfill runs. Status: pending → running → completed / failed. Stores cursor for resume, records written, pages fetched.

Historical backfill data is stored as Hive-partitioned parquet files at data/backfill/{source}/{subject}/{kind}/{granularity}/YYYY-MM-DD.parquet (not in Postgres).

Pipeline layer

Table Purpose
inputs Incoming data events. Status: RECEIVED → RESOLVED. Holds raw data, actuals (once known), and scope metadata.
predictions One row per model per input. Links to a scheduled_prediction_config. Stores inference output, execution time, and resolution timestamp. Status: PENDING → SCORED / FAILED / ABSENT.
scores One row per scored prediction. Stores the result payload, success flag, and optional failure reason.
snapshots Per-model period summaries. Aggregates prediction counts and result metrics over a time window.
checkpoints Periodic emission checkpoints. Aggregates snapshots into on-chain reward distributions. Status: PENDING → SUBMITTED → CLAIMABLE → PAID.
scheduled_prediction_configs Defines when and what to predict — scope template, schedule, and ordering. Seeded at init from CrunchConfig.scheduled_predictions.

Model layer

Table Purpose
models Registered participant models. Tracks overall and per-scope scores as JSONB.
leaderboards Point-in-time leaderboard snapshots with ranked entries as JSONB.

Local Development

# Run tests
uv run pytest tests/ -x -q

# Start all services locally
make deploy

# View logs
make logs

# Tear down
make down

Project Structure

crunch-node-starter/
├── crunch_node/       ← core engine (published to PyPI as crunch-node)
│   ├── workers/            ← feed, predict, score, checkpoint, report workers
│   ├── services/           ← business logic
│   ├── entities/           ← domain models
│   ├── db/                 ← database tables and init
│   ├── feeds/              ← data source adapters (Pyth, Binance, etc.)
│   ├── schemas/            ← API schemas
│   ├── extensions/         ← default callables
│   ├── config/             ← runtime configuration
│   └── crunch_config.py    ← base CrunchConfig class and default types
├── scaffold/               ← template used by crunch-cli init-workspace
│   ├── node/               ← node template (Dockerfile, docker-compose, config)
│   └── challenge/          ← challenge template (tracker, scoring, backtest, examples)
├── tests/                  ← test suite
├── docker-compose.yml      ← local dev compose (uses local crunch_node/)
├── Dockerfile              ← local dev Dockerfile (COPYs crunch_node/)
├── pyproject.toml          ← package definition
└── Makefile                ← deploy / down / logs / test

Publishing

uv build
twine upload dist/*

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

crunch_node-0.1.91.tar.gz (3.4 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

crunch_node-0.1.91-py3-none-any.whl (306.5 kB view details)

Uploaded Python 3

File details

Details for the file crunch_node-0.1.91.tar.gz.

File metadata

  • Download URL: crunch_node-0.1.91.tar.gz
  • Upload date:
  • Size: 3.4 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.12

File hashes

Hashes for crunch_node-0.1.91.tar.gz
Algorithm Hash digest
SHA256 1b7cc9e6738546b50008cbc8f4e36365d47e91cc0a77185efd7f6e1d81fe7cef
MD5 19c713690f8deea4bb5bda4ef871c642
BLAKE2b-256 e731ebea5313d357006249af28be88166f216aacce6cf2e55ce7a9067febbc80

See more details on using hashes here.

File details

Details for the file crunch_node-0.1.91-py3-none-any.whl.

File metadata

File hashes

Hashes for crunch_node-0.1.91-py3-none-any.whl
Algorithm Hash digest
SHA256 ecc134ab90bae72991e8e320d9dc4290b43a8c58497e0d227aa547de3ebdff81
MD5 76673a50a02b49d473076c65fab124e0
BLAKE2b-256 86f50085423e7ba3635f7ccfa12b0a948ba3d72caf1dcac383c3a22740a1bb5f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page