RabbitMQ multi-stage pipeline runtime with append-only event sinks
Project description
dr-queues
RabbitMQ multi-stage pipeline runtime with append-only event sinks.
dr-queues is a domain-free library for running jobs through chained stage queues, scaling worker pools per stage, and recording pipeline lifecycle events to durable storage. It is the execution substrate for experiment applications such as dr-bottleneck.
Install
pip install dr-queues
Or with uv:
uv add dr-queues
RabbitMQ and MongoDB are not bundled with the package — run them locally
(Docker Compose) or point AMQP_URL / MONGODB_URL at your infrastructure.
After install, try the demo CLI:
dr-queues-demo --repeats 2 --lanes 1
What dr-queues is
- AMQP staged pipeline: per-stage pending/completed queue pairs chained together
- Slim
JobEnvelopefor job state on the wire WorkerPoolandTerminalTapwith pluggable step handlers- Append-only
EventSinkimplementations (MongoDB happy path, AMQP optional) - Run manifest for multi-process worker coordination
- Minimal workflow engine: ordered steps +
HandlerRegistry
What dr-queues is not
- LLM calls, prompts, or model profiles
- Dataset loading, HumanEval, or experiment metrics
- JSONL report assembly
- General domain EventBus / webhook dispatch (deferred to a future layer)
Workers append pipeline events before acking and forwarding jobs to the next stage. That append-before-forward invariant matches an event-sourced write-ahead log: durable telemetry precedes propagation.
Requirements
- Python 3.13+ with uv
- Docker Compose for local RabbitMQ and MongoDB
Environment
| Variable | Default | Purpose |
|---|---|---|
AMQP_URL |
amqp://guest:guest@localhost:5672/ |
RabbitMQ connection |
MONGODB_URL |
mongodb://localhost:27017/dr_queues |
MongoDB event store |
Local services
docker compose up -d
- RabbitMQ management UI: http://localhost:15672 (guest/guest)
- MongoDB:
mongodb://localhost:27017/dr_queues
Quick start
From a git checkout:
uv sync
docker compose up -d
uv run dr-queues-demo \
--repeats 5 \
--workers slow=4,transform=4,finalize=2
Or use the repo script wrapper (same CLI):
uv run python scripts/run_pipeline_demo.py \
--repeats 5 \
--workers slow=4,transform=4,finalize=2
The demo runs a 3-stage dummy pipeline (sleep_ms → add_prefix →
record_artifact) with MongoDB as the default event sink.
Options:
dr-queues-demo --sink amqp --repeats 2
dr-queues-demo --sink both --lanes 1 --repeats 1
Each run writes a manifest to .runs/{run_id}/manifest.json. The demo prints
run_id=... at the start — use that value when querying MongoDB (do not use the
literal placeholder demo-...).
On success you should see output like events=70 terminals=10 for
--repeats 5 with the default 2 lanes (10 jobs × 7 events per job).
Inspect events in MongoDB
Events are stored in the pipeline_events collection. Replace
YOUR_RUN_ID with the run_id printed by the demo (e.g. demo-56bd0ce5).
Count events for one run:
mongosh mongodb://localhost:27017/dr_queues \
--eval 'db.pipeline_events.countDocuments({run_id: "YOUR_RUN_ID"})'
List all run IDs that have events:
mongosh mongodb://localhost:27017/dr_queues \
--eval 'db.pipeline_events.distinct("run_id")'
Count all events in the collection (across every run):
mongosh mongodb://localhost:27017/dr_queues \
--eval 'db.pipeline_events.countDocuments({})'
Preview a few events from a run:
mongosh mongodb://localhost:27017/dr_queues \
--eval 'db.pipeline_events.find({run_id: "YOUR_RUN_ID"}).limit(3).pretty()'
If counts are zero, check:
- You used the actual
run_idfrom demo output, not the placeholder text. - The demo used the default Mongo sink (
--sink mongo). AMQP-only runs (--sink amqp) do not write to MongoDB. - MongoDB is running (
docker compose up -d) and reachable atMONGODB_URL. - You re-ran the demo after starting Mongo if the first attempt failed to connect.
Package layout
| Module | Role |
|---|---|
amqp/ |
Connection helpers, stage queue pairs |
pipeline/ |
JobEnvelope, WorkerPool, TerminalTap, runner |
events/ |
PipelineEvent, EventSink, Mongo/AMQP/memory sinks, event filters |
manifest/ |
Run manifest read/write, worker CLI helpers |
workflow/ |
PipelineDefinition, HandlerRegistry, Pipeline |
Public API
Import from dr_queues:
- Setup / run:
setup_run_queues,run_in_process,seed_jobs,seed_manifest_jobs - Runtime:
WorkerPool,TerminalTap,JobEnvelope - Workflow:
PipelineDefinition,HandlerRegistry,Pipeline - Events:
PipelineEvent,EventSink,MongoEventSink,AmqpEventSink,MemoryEventSink,filter_run_events
Detached stage workers
Resize or run a single stage in a separate process:
dr-queues-stage-worker \
--run-id demo-abc123 \
--stage transform \
--workers 5 \
--replace
Handlers must be registered in the worker process via --handlers-module
(default: dr_queues.demo_handlers).
Future layers
A general EventBus, domain EventAdapter, and webhook/hook dispatch will likely live in a separate package or in dr-bottleneck, built on top of dr-queues. Pipeline lifecycle events may eventually map to versioned domain event types; dr-queues stays focused on queue-based execution and pipeline telemetry.
Development
uv sync
docker compose up -d
scripts/pre-check.sh # ruff, ty, pytest (14 tests)
uv run pytest -m integration # integration tests only; needs docker compose
Full manual smoke test:
dr-queues-demo \
--repeats 5 \
--workers slow=4,transform=4,finalize=2
# optional: exercise AMQP event sink instead of Mongo
dr-queues-demo --sink amqp --repeats 2
Build a wheel locally before publishing:
uv build
tar -tzf dist/dr_queues-*.tar.gz | head
unzip -l dist/dr_queues-*.whl
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dr_queues-0.1.1.tar.gz.
File metadata
- Download URL: dr_queues-0.1.1.tar.gz
- Upload date:
- Size: 103.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
074d8e8a87bfa301bf04c830842764e57e429a65389a1f5fb20c3cab78daea27
|
|
| MD5 |
7745beadfb13d90deb09f3d558e53be6
|
|
| BLAKE2b-256 |
4cd03fc00d0352bdf50dfc531f4a72d7b1f5ef0053b5aea225d86a414197d719
|
File details
Details for the file dr_queues-0.1.1-py3-none-any.whl.
File metadata
- Download URL: dr_queues-0.1.1-py3-none-any.whl
- Upload date:
- Size: 25.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
da7609ae89b763a5991a7189ce45b1bbb3069b9f4afc3d4ee9343bb62edc60bb
|
|
| MD5 |
32368ed9838b200f7956fb6378c4f9a6
|
|
| BLAKE2b-256 |
a088c756a32655fecd194249408f606548b39d4f4e64b81cf333c971966f9c82
|