Celery integration for chronoq-ranker — LearnedScheduler with shadow/active/fifo modes
Project description
chronoq-celery
Learning-to-rank scheduling for Celery. Replace FIFO with an online LambdaRank. 2-line drop-in, three safety modes.
Pre-broker gate for Celery that re-orders pending tasks by predicted duration, shortest first. Works alongside chronoq-ranker — the ranker scores tasks based on job telemetry; this package wires that scoring into Celery's task lifecycle via signals.
Install
pip install chronoq-celery
30-second try
from chronoq_celery import LearnedScheduler, attach_signals
from celery import Celery
app = Celery("myapp", broker="redis://localhost:6379/0")
scheduler = LearnedScheduler(mode="active") # or "shadow" / "fifo"
attach_signals(app, scheduler)
That is the entire integration. attach_signals wires task_prerun, task_success, task_failure, and task_revoked so the ranker records telemetry and triggers dispatch automatically.
Three modes, one flag
| Mode | What it does |
|---|---|
fifo |
No-op. Tasks dispatch in arrival order. Ranker is never instantiated. Use as a kill-switch or A/B baseline. |
shadow |
Ranker scores every task and logs the score; dispatch order is unchanged vs FIFO. Zero behavioral difference in production — safe to deploy immediately and measure the potential win before switching. |
active |
Tasks are held in an in-process min-heap scored by predicted duration and dispatched in score order via dispatch_next() on task completion. |
Shadow → active rollout pattern
Start with shadow mode to validate the ranker's scores against your workload before committing to reordering:
# Week 1: deploy with shadow mode — zero dispatch change, score logging only
scheduler = LearnedScheduler(mode="shadow")
attach_signals(app, scheduler)
# From your logs, compare predicted score distribution against actual durations.
# When the ranker's rank correlation looks solid, flip the flag:
# Week 2+: flip to active — same code, one string change
scheduler = LearnedScheduler(mode="active")
attach_signals(app, scheduler)
The ranker accumulates telemetry in both modes, so by the time you flip to active it has already trained on real traffic.
How it works
Celery's broker uses Redis LISTS (LPUSH/BLPOP) with no broker-level "select next task" hook. LearnedScheduler acts as a pre-broker gate: in active mode, callers pass an apply_fn (the closure that calls task.apply_async(...)) alongside the task metadata. Tasks are held in an in-process heapq and dispatched in score order when a worker slot opens, signaled by task_success.
The scoring path computes a QueueContext with live per-type rolling statistics (TypeStatsTracker) and passes it to the FeatureExtractor. The dominant feature — recent_mean_ms_this_type — carries approximately 80% of the ranking signal on typical workloads (from ablation experiments on the synthetic Pareto and BurstGPT traces).
API surface
LearnedScheduler(
mode="fifo" | "shadow" | "active",
ranker=None, # pre-initialised TaskRanker; created internally if None
ranker_config=None, # RankerConfig passed to TaskRanker() when ranker is None
stats_tracker=None, # TypeStatsTracker; created internally if None
window=100, # ring-buffer size for internal TypeStatsTracker
)
attach_signals(app, scheduler)
# wires: task_prerun → record_start
# task_success → record_completion + dispatch_next
# task_failure → cleanup_registry
# task_revoked → cleanup_registry
TypeStatsTracker(window=100)
# .record(task_type, actual_ms)
# .snapshot(task_type) → (mean, p95, count)
# .seed(means_dict) # cold-start pre-warm from historical data
Seeding TypeStatsTracker from historical data
On first deploy, the ranker has no telemetry. Seed TypeStatsTracker with historical per-type means to avoid cold-start degradation:
from chronoq_celery import LearnedScheduler, TypeStatsTracker, attach_signals
stats = TypeStatsTracker()
stats.seed({
"resize": 312.0, # historical mean ms per type
"transcode": 1780.0,
"analyze": 95.0,
})
scheduler = LearnedScheduler(mode="active", stats_tracker=stats)
attach_signals(app, scheduler)
Examples in the repo
integrations/celery/examples/toggle_demo.py— eager-mode FIFO vs active comparison usingtask_always_eager=True; runs the real Celery API with no Docker or Redis requiredintegrations/celery/examples/celery-docker/— Docker Compose A/B stack with real Redis, worker, producer, and wall-clock JCT measurement
Evidence
On the synthetic Pareto benchmark: +55% mean JCT improvement in active mode vs fifo on a 200-task workload (full demo.py run, n_train=800). The ranker pre-trains on synthetic oracle records before the eval batch, matching the conditions of the benchmark harness. Full methodology in the monorepo's BENCHMARKS.md.
Honest limitations
- p99 starvation at ρ ≥ 0.8: SJF-family tradeoff — short-first bias indefinitely delays long jobs at the tail. Pair with Celery's existing rate-limiting or priority-decay knobs at the task level. An aging-aware scheduler option is planned for v0.3.0.
mode="active"requires Celery 5.4+.- Pre-1.0 API: breaking changes are allowed in minor-version bumps with a deprecation shim and a CHANGELOG "Breaking" entry.
- In-process heap: the
activemode heap lives in a single Celery worker process. In multi-process Celery deployments each worker has its own heap; cross-worker coordination is out of scope for this package.
Links
- Monorepo: https://github.com/Ahnaf19/chronoq
- Ranker library: https://pypi.org/project/chronoq-ranker/
- Full benchmarks: https://github.com/Ahnaf19/chronoq/blob/main/docs/v2/BENCHMARKS.md
- MIT license
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file chronoq_celery-0.2.0.post1.tar.gz.
File metadata
- Download URL: chronoq_celery-0.2.0.post1.tar.gz
- Upload date:
- Size: 16.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b14e7417daf7c440969960ce83ee9833595eca4849a6f258142953d89cee300d
|
|
| MD5 |
d8f5ebf94d99b904766c955de2409400
|
|
| BLAKE2b-256 |
3fe2f0091a36bfb607f5cbe6ac5b814d852121a83599690682fe95f8cb5831b6
|
File details
Details for the file chronoq_celery-0.2.0.post1-py3-none-any.whl.
File metadata
- Download URL: chronoq_celery-0.2.0.post1-py3-none-any.whl
- Upload date:
- Size: 8.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f78e3311200d075a1deb02bea5dc24c718057961561ae1482e4ab07a68e6a3c
|
|
| MD5 |
3fbd4c9deb4aad699ee86f6dbf3f5113
|
|
| BLAKE2b-256 |
515990b070cd464cd8afd61bf1aa05a05f30069bca59069c32fc85c5070a7a8d
|