Skip to main content

RabbitMQ multi-stage pipeline runtime with MongoDB-backed run state

Project description

dr-queues

RabbitMQ multi-stage pipeline runtime with MongoDB-backed run state.

dr-queues is a domain-free library for running jobs through chained RabbitMQ stage queues, scaling worker pools per stage, and recording run state in MongoDB. It is the execution substrate for experiment applications such as dr-bottleneck.

Install

pip install dr-queues

Or with uv:

uv add dr-queues

RabbitMQ and MongoDB are not bundled with the package — run them locally (Docker Compose) or point AMQP_URL / MONGODB_URL at your infrastructure.

After install, try the demo CLI:

dr-queues-demo --repeats 2 --lanes 1

What dr-queues is

  • AMQP staged pipeline: per-stage pending/completed queue pairs chained together
  • Slim JobEnvelope for job state on the wire
  • WorkerPool and TerminalTap with pluggable step handlers
  • MongoDB-backed manifests, events, seed batches, worker records, job states, failure attempts, and target holds
  • RabbitMQ durable job transport for queued and in-flight work
  • Shared stage eligibility flow for initial seed work and manual replay
  • Minimal workflow engine: ordered steps + HandlerRegistry

Runtime model

dr-queues uses RabbitMQ and MongoDB for different jobs:

  • RabbitMQ is the durable queue transport. It owns pending messages, completed-stage messages, delivery acknowledgements, redelivery, and queue depth. Jobs with target tags can be routed through partition-specific queues so workers can consume only matching target subsets.
  • MongoDB is the persistence and query layer. It owns run manifests, seed-batch records, pipeline events, worker records, latest per-job runtime state, failure attempt history, and target holds.

There is no filesystem-backed runtime store. New runs should not create .runs/<run_id> state.

What dr-queues is not

  • LLM calls, prompts, or model profiles
  • Dataset loading, HumanEval, or experiment metrics
  • JSONL report assembly
  • General domain EventBus / webhook dispatch (deferred to a future layer)

Workers append pipeline events before acking and forwarding jobs to the next stage. That append-before-forward invariant matches an event-sourced write-ahead log: durable telemetry precedes propagation.

Requirements

  • Python 3.13+ with uv
  • Docker Compose for local RabbitMQ and MongoDB

Environment

Variable Default Purpose
AMQP_URL amqp://guest:guest@localhost:5672/ RabbitMQ job transport
MONGODB_URL mongodb://localhost:27017/dr_queues MongoDB runtime store

Local services

docker compose up -d
  • RabbitMQ management UI: http://localhost:15672 (guest/guest)
  • MongoDB: mongodb://localhost:27017/dr_queues

Quick start

From a git checkout:

uv sync
docker compose up -d
uv run dr-queues-demo \
  --repeats 5 \
  --workers slow=4,transform=4,finalize=2

Or use the repo script wrapper (same CLI):

uv run python scripts/run_pipeline_demo.py \
  --repeats 5 \
  --workers slow=4,transform=4,finalize=2

The demo runs a 3-stage dummy pipeline (sleep_msadd_prefixrecord_artifact) with MongoDB as the runtime store.

Each run stores its manifest and events in MongoDB. The demo prints run_id=... at the start; use that value when querying MongoDB or running dr-queues-run status.

On success you should see output like events=70 terminals=10 for --repeats 5 with the default 2 lanes (10 jobs × 7 events per job).

Inspect MongoDB runtime state

Run manifests live in run_manifests, events in pipeline_events, seed batches in seed_batches, worker records in workers, latest job state in job_states, failure attempt history in job_attempts, and target holds in target_holds. Replace YOUR_RUN_ID with the run_id printed by the demo.

Count events for one run:

mongosh mongodb://localhost:27017/dr_queues \
  --eval 'db.pipeline_events.countDocuments({run_id: "YOUR_RUN_ID"})'

List all run IDs that have events:

mongosh mongodb://localhost:27017/dr_queues \
  --eval 'db.pipeline_events.distinct("run_id")'

Count all events in the collection (across every run):

mongosh mongodb://localhost:27017/dr_queues \
  --eval 'db.pipeline_events.countDocuments({})'

Preview a few events from a run:

mongosh mongodb://localhost:27017/dr_queues \
  --eval 'db.pipeline_events.find({run_id: "YOUR_RUN_ID"}).limit(3).pretty()'

Use the operational CLI for run status:

dr-queues-run status --run-id YOUR_RUN_ID
dr-queues-run wait --run-id YOUR_RUN_ID --target terminal --timeout 120

status combines Mongo progress records with RabbitMQ queue snapshots. Expected job totals are derived from active seed batches, so adding more seed work to a run updates progress automatically. Seeded work is first recorded as pending in Mongo, then published to the first-stage partition queue. Stage lines report active worker records separately from active concurrency:

stage=transform completed=10/10 input_depth=0 output_depth=0 worker_records=1/3 worker_concurrency=5

If counts are zero, check that MongoDB is running and that you used the actual run_id from demo output, not the placeholder text.

Local observability viewer

Install the optional viewer dependencies and run the local read-only web UI:

uv add "dr-queues[viewer]"
dr-queues-viewer --run-id YOUR_RUN_ID

The viewer binds to 127.0.0.1:8765 by default. It shows run summaries, stage queue depths, worker records, target holds, blocked jobs, recent failure attempts, and recent pipeline events without exposing worker controls or replay actions. The dashboard includes a local auto-refresh selector with off, 1s, 2s, 5s, and 10s intervals.

See docs/verification/dashboard_demos.md for dashboard demo scenarios covering in-process progress, detached workers, holds, and failure attempts.

Notebooks

Two marimo notebooks under notebooks/ provide a learning surface and an ad-hoc analysis lens. Only their MongoDB reads and RabbitMQ queue snapshots execute; the define/run sections are illustrative.

uv run marimo edit notebooks/quickstart.py        # living intro: architecture + standard queries
uv run marimo edit notebooks/run_exploration.py   # set a run_id, explore one run end to end

quickstart.py explains what dr-queues does and which collection answers which question. run_exploration.py is meant to be copied and tweaked: set a run_id and get state breakdowns, failure/retry analysis, a single-job timeline, per-stage timing, and live queue depths.

Package layout

Module Role
amqp/ RabbitMQ connection/session lifecycle, topology, publishing
pipeline/ JobEnvelope, WorkerPool, TerminalTap, runner
events/ PipelineEvent, local test sink, event filters
manifest/ Run manifest models and worker spec parsing
runtime/ Mongo run store, status, wait, worker lifecycle
targeting.py Target selectors and partition-key derivation
workflow/ PipelineDefinition, HandlerRegistry, Pipeline

Public API

Import from dr_queues:

  • Setup / run: setup_run_queues, seed_run, run_in_process
  • Runtime: MongoRunStore, get_run_status, wait_for_run, WorkerPool, TerminalTap, JobEnvelope, WorkerRecord, WorkerRuntime
  • Failure controls: JobState, JobStateStatus, JobAttempt, JobAttemptAction, TargetHold, TargetSelector
  • Workflow: PipelineDefinition, HandlerRegistry, Pipeline
  • Events: PipelineEvent, filter_run_events

Detached stage workers

In-process and detached workers both create Mongo worker records. Detached workers are also controlled through OS process signals on the local host. Start a single stage in a separate process:

dr-queues-stage-worker \
  --run-id demo-abc123 \
  --stage transform \
  --workers 5

Handlers must be registered in the worker process via --handlers-module (default: dr_queues.demo_handlers).

The operational CLI can start, replace, stop, list, and wait on detached workers:

dr-queues-run start \
  --run-id demo-abc123 \
  --stage transform \
  --workers 5

dr-queues-run workers --run-id demo-abc123

dr-queues-run stop \
  --run-id demo-abc123 \
  --stage transform

Use replace to stop current running workers for a stage and start a new worker process:

dr-queues-run replace \
  --run-id demo-abc123 \
  --stage transform \
  --workers 5

Workers can also include or exclude target subsets. This is useful when one provider, model, or quota pool is paused but unrelated work can keep moving:

dr-queues-run start \
  --run-id demo-abc123 \
  --stage transform \
  --workers 5 \
  --include provider=openai

If a selector matches no known partitions, start and replace exit nonzero instead of reporting a started worker.

wait --target terminal also consumes final-stage completed messages through a terminal tap, so detached runs can reach terminal completion without an in-process runner.

Failure controls and replay

Handler failures are persisted in MongoDB. The worker records an append-only attempt in job_attempts, updates the latest job_states record, and then acknowledges the RabbitMQ delivery after persistence succeeds. Retryable failures move to retry_waiting; jobs that exhaust their attempt budget move to dead_lettered.

Inspect failed, held, retry-waiting, and dead-lettered jobs:

dr-queues-run failures --run-id demo-abc123
dr-queues-run attempts --run-id demo-abc123

Set a temporary target hold, for example when a provider quota pool is rate-limited:

dr-queues-run holds set \
  --run-id demo-abc123 \
  --selector quota_pool=gemini-flash \
  --until +30m \
  --reason rate-limit

Workers persist matching jobs as held and remove them from the hot queue path. After clearing a hold or fixing a failed handler, replay selected work back to the correct stage partition queue. Replay uses the latest persisted job state, marks the selected jobs pending again, and republishes them to that state's stage input queue:

dr-queues-run holds clear \
  --run-id demo-abc123 \
  --selector quota_pool=gemini-flash

dr-queues-run replay \
  --run-id demo-abc123 \
  --selector quota_pool=gemini-flash \
  --status held \
  --force

Replay is manual in this version. There is no background scheduler for retry_waiting jobs, automatic replay after hold expiry, or token-bucket provider throttling yet.

See docs/manual_runtime_testing.md for the manual operational test log covering detached startup, scale up/down, kill/restart recovery, duplicate job protection, filesystem persistence checks, target-scoped workers, holds, retries, dead letters, replay, and stage eligibility retests. See docs/design/failure_persistence.md for the current failure persistence design.

Future layers

A general EventBus, domain EventAdapter, and webhook/hook dispatch will likely live in a separate package or in dr-bottleneck, built on top of dr-queues. Pipeline lifecycle events may eventually map to versioned domain event types; dr-queues stays focused on queue-based execution and pipeline telemetry.

Development

uv sync
docker compose up -d
scripts/pre-check.sh              # ruff, ty, pytest
uv run pytest -m integration      # integration tests only; needs docker compose

Full manual smoke test:

dr-queues-demo \
  --repeats 5 \
  --workers slow=4,transform=4,finalize=2

For failure-control scenarios, follow the tested flows in docs/manual_runtime_testing.md.

Build a wheel locally before publishing:

uv build
tar -tzf dist/dr_queues-*.tar.gz | head
unzip -l dist/dr_queues-*.whl

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dr_queues-0.1.4.tar.gz (194.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dr_queues-0.1.4-py3-none-any.whl (57.5 kB view details)

Uploaded Python 3

File details

Details for the file dr_queues-0.1.4.tar.gz.

File metadata

  • Download URL: dr_queues-0.1.4.tar.gz
  • Upload date:
  • Size: 194.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.0

File hashes

Hashes for dr_queues-0.1.4.tar.gz
Algorithm Hash digest
SHA256 da7efeef78af649880aff560e629825921a5bf52548c0cfa213d25b745f8f289
MD5 3a67384be8ad2a1515a0aa36351bcaa7
BLAKE2b-256 ee4524bf77594854f243f4594a095201f8226c0b0ed130ce93410347ec25c1d0

See more details on using hashes here.

File details

Details for the file dr_queues-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: dr_queues-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 57.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.0

File hashes

Hashes for dr_queues-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 7701068609452dc14a3ca2a843fb0ce8401b8dc09a70e2ddfb46c3ecd56313af
MD5 8dbe409e92ab1a291df14a5b6cf95a6f
BLAKE2b-256 27d3185caaa12f498278e8c511e5a1de4952608ba429457ac025307c8b0dbc31

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page