Skip to main content

Learning-to-rank scheduling library for Python job queues.

Project description

chronoq-ranker

Standalone learning-to-rank scheduling library for Python job queues. Ships today with a per-type heuristic that auto-promotes to sklearn GradientBoosting; LightGBM LambdaRank with pairwise training lands in Chunk 1 W3.

Install

pip install chronoq-ranker   # (published in Chunk 4)

Quick Start — single-job prediction (v1 compatible)

from chronoq_ranker import TaskRanker

ranker = TaskRanker(storage="sqlite:///telemetry.db")

# Predict execution time
estimate = ranker.predict("resize_image", payload_size=2048)
print(f"{estimate.estimated_ms:.0f}ms (confidence: {estimate.confidence:.2f})")

# Record actual execution
ranker.record("resize_image", payload_size=2048, actual_ms=312)

# Retrain (also auto-triggered every retrain_every_n records)
metrics = ranker.retrain()
print(f"MAE: {metrics.mae:.1f}ms, promoted: {metrics.promoted}")

Quick Start — batch ranking (v2)

from chronoq_ranker import TaskRanker, TaskCandidate

ranker = TaskRanker(storage="sqlite:///telemetry.db")

candidates = [
    TaskCandidate(task_id="a", task_type="pdf_extract", features={"payload_size": 2_000_000}),
    TaskCandidate(task_id="b", task_type="thumbnail",   features={"payload_size":    50_000}),
    TaskCandidate(task_id="c", task_type="pdf_extract", features={"payload_size":   500_000}),
]
# Returns ScoredTask list sorted ascending; rank 0 = run next.
for scored in ranker.predict_scores(candidates):
    print(f"rank={scored.rank} {scored.task_id} score={scored.score:.1f}")

Configuration

from chronoq_ranker import RankerConfig, TaskRanker

config = RankerConfig(
    cold_start_threshold=50,              # Records before promoting to GradientBoosting
    retrain_every_n=100,                  # Auto-retrain interval
    drift_threshold_mae_ms=500,           # MAE threshold for drift detection
    incremental_rounds=10,                # LambdaRank warm-start rounds (W3+)
    min_groups=20,                        # Minimum query-groups per fit
    full_refit_every_n_incrementals=20,   # Force full refit cadence
    psi_threshold=0.2,                    # Per-feature drift warn threshold (W3+)
    storage_uri="sqlite:///telemetry.db",
)
ranker = TaskRanker(config=config)

Feature schema (v2, user-declarable)

from chronoq_ranker import DefaultExtractor, FeatureSchema, FeatureExtractor

# Ships by default — 15 features (task_type, payload_size, hour_of_day, day_of_week,
# queue_depth, queue_depth_same_type, recent_mean_ms_this_type, recent_p95_ms_this_type,
# recent_count_this_type, time_since_last_retrain_s, worker_count_busy, worker_count_idle,
# prompt_length, user_tier, retry_count).
ranker = TaskRanker(feature_extractor=DefaultExtractor())

# Or roll your own schema for a domain-specific workload:
class MyExtractor(FeatureExtractor):
    schema = FeatureSchema(version="my-v1", numeric=["payload_size"], categorical=["task_type"])
    def extract(self, candidate, context=None):
        return {"task_type": candidate.task_type, "payload_size": float(candidate.features["payload_size"])}
    def extract_from_record(self, record):
        return {"task_type": record.task_type, "payload_size": float(record.payload_size)}

ranker = TaskRanker(feature_extractor=MyExtractor())

Storage Backends

  • "memory://" — In-memory (testing/ephemeral)
  • "sqlite:///path/to/db" — SQLite (persistent; auto-migrates v1 schemas)

Integration Patterns

Works with any task system — Celery, Kafka, FastAPI background tasks, custom workers:

# Before task dispatch (batch)
scored = ranker.predict_scores(candidates)
next_task = scored[0].task_id

# After task execution
ranker.record(task_type, payload_size, actual_ms=elapsed)

v1 compatibility

TaskPredictor and PredictorConfig are retained as deprecated aliases for one release cycle. Imports from chronoq_ranker.predictor still resolve via a shim module that emits a DeprecationWarning. Migrate at your leisure; the aliases land on the next major version's removal list.

Boundary guarantee

chronoq-ranker has zero imports from chronoq-demo-server, Redis, FastAPI, Celery, or vLLM. Runtime deps: lightgbm (W3+), numpy, pydantic, loguru. Verify any time with /boundary-check.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chronoq_ranker-0.2.0.tar.gz (24.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

chronoq_ranker-0.2.0-py3-none-any.whl (29.0 kB view details)

Uploaded Python 3

File details

Details for the file chronoq_ranker-0.2.0.tar.gz.

File metadata

  • Download URL: chronoq_ranker-0.2.0.tar.gz
  • Upload date:
  • Size: 24.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.10

File hashes

Hashes for chronoq_ranker-0.2.0.tar.gz
Algorithm Hash digest
SHA256 6493ce50c6aae139327c608e3d2e70c9cf5e927e353b135d84b8bf9588e98a8b
MD5 c3c033ba528306066e24be37beb160a8
BLAKE2b-256 cc1f60da3dd242cecc64edcd180208246c896dcf4fd42bcdfefceec81510c8ff

See more details on using hashes here.

File details

Details for the file chronoq_ranker-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for chronoq_ranker-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d7a87a00c544161adfaa23fcbe189075c90b6988f5ca16f457a605e93c0979d7
MD5 6257ea2bd8318f7df2d89c9b3088ec42
BLAKE2b-256 8e90c5f4994a0817757100a1530da4f95148948b46e1a250b7afb5b3a895088b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page