RabbitMQ multi-stage pipeline runtime with MongoDB-backed run state
Project description
dr-queues
RabbitMQ multi-stage pipeline runtime with MongoDB-backed run state.
dr-queues is a domain-free library for running jobs through chained RabbitMQ stage queues, scaling worker pools per stage, and recording run state in MongoDB. It is the execution substrate for experiment applications such as dr-bottleneck.
Install
pip install dr-queues
Or with uv:
uv add dr-queues
RabbitMQ and MongoDB are not bundled with the package — run them locally
(Docker Compose) or point AMQP_URL / MONGODB_URL at your infrastructure.
After install, try the demo CLI:
dr-queues-demo --repeats 2 --lanes 1
What dr-queues is
- AMQP staged pipeline: per-stage pending/completed queue pairs chained together
- Slim
JobEnvelopefor job state on the wire WorkerPoolandTerminalTapwith pluggable step handlers- MongoDB-backed manifests, events, seed batches, worker records, job states, failure attempts, and target holds
- RabbitMQ durable job transport for queued and in-flight work
- Shared stage eligibility flow for initial seed work and manual replay
- Minimal workflow engine: ordered steps +
HandlerRegistry
Runtime model
dr-queues uses RabbitMQ and MongoDB for different jobs:
- RabbitMQ is the durable queue transport. It owns pending messages, completed-stage messages, delivery acknowledgements, redelivery, and queue depth. Jobs with target tags can be routed through partition-specific queues so workers can consume only matching target subsets.
- MongoDB is the persistence and query layer. It owns run manifests, seed-batch records, pipeline events, worker records, latest per-job runtime state, failure attempt history, and target holds.
There is no filesystem-backed runtime store. New runs should not create
.runs/<run_id> state.
What dr-queues is not
- LLM calls, prompts, or model profiles
- Dataset loading, HumanEval, or experiment metrics
- JSONL report assembly
- General domain EventBus / webhook dispatch (deferred to a future layer)
Workers append pipeline events before acking and forwarding jobs to the next stage. That append-before-forward invariant matches an event-sourced write-ahead log: durable telemetry precedes propagation.
Requirements
- Python 3.13+ with uv
- Docker Compose for local RabbitMQ and MongoDB
Environment
| Variable | Default | Purpose |
|---|---|---|
AMQP_URL |
amqp://guest:guest@localhost:5672/ |
RabbitMQ job transport |
MONGODB_URL |
mongodb://localhost:27017/dr_queues |
MongoDB runtime store |
Local services
docker compose up -d
- RabbitMQ management UI: http://localhost:15672 (guest/guest)
- MongoDB:
mongodb://localhost:27017/dr_queues
Quick start
From a git checkout:
uv sync
docker compose up -d
uv run dr-queues-demo \
--repeats 5 \
--workers slow=4,transform=4,finalize=2
Or use the repo script wrapper (same CLI):
uv run python scripts/run_pipeline_demo.py \
--repeats 5 \
--workers slow=4,transform=4,finalize=2
The demo runs a 3-stage dummy pipeline (sleep_ms → add_prefix →
record_artifact) with MongoDB as the runtime store.
Each run stores its manifest and events in MongoDB. The demo prints
run_id=... at the start; use that value when querying MongoDB or running
dr-queues-run status.
On success you should see output like events=70 terminals=10 for
--repeats 5 with the default 2 lanes (10 jobs × 7 events per job).
Inspect MongoDB runtime state
Run manifests live in run_manifests, events in pipeline_events, seed
batches in seed_batches, worker records in workers, latest job
state in job_states, failure attempt history in job_attempts, and target
holds in target_holds. Replace YOUR_RUN_ID with the run_id printed by the
demo.
Count events for one run:
mongosh mongodb://localhost:27017/dr_queues \
--eval 'db.pipeline_events.countDocuments({run_id: "YOUR_RUN_ID"})'
List all run IDs that have events:
mongosh mongodb://localhost:27017/dr_queues \
--eval 'db.pipeline_events.distinct("run_id")'
Count all events in the collection (across every run):
mongosh mongodb://localhost:27017/dr_queues \
--eval 'db.pipeline_events.countDocuments({})'
Preview a few events from a run:
mongosh mongodb://localhost:27017/dr_queues \
--eval 'db.pipeline_events.find({run_id: "YOUR_RUN_ID"}).limit(3).pretty()'
Use the operational CLI for run status:
dr-queues-run status --run-id YOUR_RUN_ID
dr-queues-run wait --run-id YOUR_RUN_ID --target terminal --timeout 120
status combines Mongo progress records with RabbitMQ queue snapshots. Expected
job totals are derived from active seed batches, so adding more seed work to a
run updates progress automatically. Seeded work is first recorded as pending in
Mongo, then published to the first-stage partition queue. Stage lines report
active worker records separately from active concurrency:
stage=transform completed=10/10 input_depth=0 output_depth=0 worker_records=1/3 worker_concurrency=5
If counts are zero, check that MongoDB is running and that you used the actual
run_id from demo output, not the placeholder text.
Local observability viewer
Install the optional viewer dependencies and run the local read-only web UI:
uv add "dr-queues[viewer]"
dr-queues-viewer --run-id YOUR_RUN_ID
The viewer binds to 127.0.0.1:8765 by default. It shows run summaries,
stage queue depths, worker records, target holds, blocked jobs, recent failure
attempts, and recent pipeline events without exposing worker controls or replay
actions. The dashboard includes a local auto-refresh selector with off, 1s, 2s,
5s, and 10s intervals.
See docs/verification/dashboard_demos.md
for dashboard demo scenarios covering in-process progress, detached workers,
holds, and failure attempts.
Notebooks
Two marimo notebooks under notebooks/ provide a
learning surface and an ad-hoc analysis lens. Only their MongoDB reads and
RabbitMQ queue snapshots execute; the define/run sections are illustrative.
uv run marimo edit notebooks/quickstart.py # living intro: architecture + standard queries
uv run marimo edit notebooks/run_exploration.py # set a run_id, explore one run end to end
quickstart.py explains what dr-queues does and which collection answers which
question. run_exploration.py is meant to be copied and tweaked: set a run_id
and get state breakdowns, failure/retry analysis, a single-job timeline,
per-stage timing, and live queue depths.
Package layout
| Module | Role |
|---|---|
amqp/ |
RabbitMQ connection/session lifecycle, topology, publishing |
pipeline/ |
JobEnvelope, WorkerPool, TerminalTap, runner |
events/ |
PipelineEvent, local test sink, event filters |
manifest/ |
Run manifest models and worker spec parsing |
runtime/ |
Mongo run store, status, wait, worker lifecycle |
targeting.py |
Target selectors and partition-key derivation |
workflow/ |
PipelineDefinition, HandlerRegistry, Pipeline |
Public API
Import from dr_queues:
- Setup / run:
setup_run_queues,seed_run,run_in_process - Runtime:
MongoRunStore,get_run_status,wait_for_run,WorkerPool,TerminalTap,JobEnvelope,WorkerRecord,WorkerRuntime - Failure controls:
JobState,JobStateStatus,JobAttempt,JobAttemptAction,TargetHold,TargetSelector - Workflow:
PipelineDefinition,HandlerRegistry,Pipeline - Events:
PipelineEvent,filter_run_events
Detached stage workers
In-process and detached workers both create Mongo worker records. Detached workers are also controlled through OS process signals on the local host. Start a single stage in a separate process:
dr-queues-stage-worker \
--run-id demo-abc123 \
--stage transform \
--workers 5
Handlers must be registered in the worker process via --handlers-module
(default: dr_queues.demo_handlers).
The operational CLI can start, replace, stop, list, and wait on detached workers:
dr-queues-run start \
--run-id demo-abc123 \
--stage transform \
--workers 5
dr-queues-run workers --run-id demo-abc123
dr-queues-run stop \
--run-id demo-abc123 \
--stage transform
Use replace to stop current running workers for a stage and start a new worker
process:
dr-queues-run replace \
--run-id demo-abc123 \
--stage transform \
--workers 5
Workers can also include or exclude target subsets. This is useful when one provider, model, or quota pool is paused but unrelated work can keep moving:
dr-queues-run start \
--run-id demo-abc123 \
--stage transform \
--workers 5 \
--include provider=openai
If a selector matches no known partitions, start and replace exit nonzero
instead of reporting a started worker.
wait --target terminal also consumes final-stage completed messages through a
terminal tap, so detached runs can reach terminal completion without an
in-process runner.
Failure controls and replay
Handler failures are persisted in MongoDB. The worker records an append-only
attempt in job_attempts, updates the latest job_states record, and then
acknowledges the RabbitMQ delivery after persistence succeeds. Retryable
failures move to retry_waiting; jobs that exhaust their attempt budget move
to dead_lettered.
Inspect failed, held, retry-waiting, and dead-lettered jobs:
dr-queues-run failures --run-id demo-abc123
dr-queues-run attempts --run-id demo-abc123
Set a temporary target hold, for example when a provider quota pool is rate-limited:
dr-queues-run holds set \
--run-id demo-abc123 \
--selector quota_pool=gemini-flash \
--until +30m \
--reason rate-limit
Workers persist matching jobs as held and remove them from the hot queue path.
After clearing a hold or fixing a failed handler, replay selected work back to
the correct stage partition queue. Replay uses the latest persisted job state,
marks the selected jobs pending again, and republishes them to that state's
stage input queue:
dr-queues-run holds clear \
--run-id demo-abc123 \
--selector quota_pool=gemini-flash
dr-queues-run replay \
--run-id demo-abc123 \
--selector quota_pool=gemini-flash \
--status held \
--force
Replay is manual in this version. There is no background scheduler for
retry_waiting jobs, automatic replay after hold expiry, or token-bucket
provider throttling yet.
See docs/manual_runtime_testing.md for the
manual operational test log covering detached startup, scale up/down,
kill/restart recovery, duplicate job protection, filesystem persistence
checks, target-scoped workers, holds, retries, dead letters, replay, and stage
eligibility retests.
See docs/design/failure_persistence.md
for the current failure persistence design.
Future layers
A general EventBus, domain EventAdapter, and webhook/hook dispatch will likely live in a separate package or in dr-bottleneck, built on top of dr-queues. Pipeline lifecycle events may eventually map to versioned domain event types; dr-queues stays focused on queue-based execution and pipeline telemetry.
Development
uv sync
docker compose up -d
scripts/pre-check.sh # ruff, ty, pytest
uv run pytest -m integration # integration tests only; needs docker compose
Full manual smoke test:
dr-queues-demo \
--repeats 5 \
--workers slow=4,transform=4,finalize=2
For failure-control scenarios, follow the tested flows in
docs/manual_runtime_testing.md.
Build a wheel locally before publishing:
uv build
tar -tzf dist/dr_queues-*.tar.gz | head
unzip -l dist/dr_queues-*.whl
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dr_queues-0.1.4.tar.gz.
File metadata
- Download URL: dr_queues-0.1.4.tar.gz
- Upload date:
- Size: 194.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
da7efeef78af649880aff560e629825921a5bf52548c0cfa213d25b745f8f289
|
|
| MD5 |
3a67384be8ad2a1515a0aa36351bcaa7
|
|
| BLAKE2b-256 |
ee4524bf77594854f243f4594a095201f8226c0b0ed130ce93410347ec25c1d0
|
File details
Details for the file dr_queues-0.1.4-py3-none-any.whl.
File metadata
- Download URL: dr_queues-0.1.4-py3-none-any.whl
- Upload date:
- Size: 57.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7701068609452dc14a3ca2a843fb0ce8401b8dc09a70e2ddfb46c3ecd56313af
|
|
| MD5 |
8dbe409e92ab1a291df14a5b6cf95a6f
|
|
| BLAKE2b-256 |
27d3185caaa12f498278e8c511e5a1de4952608ba429457ac025307c8b0dbc31
|