Skip to main content

RabbitMQ multi-stage pipeline runtime with append-only event sinks

Project description

dr-queues

RabbitMQ multi-stage pipeline runtime with append-only event sinks.

dr-queues is a domain-free library for running jobs through chained stage queues, scaling worker pools per stage, and recording pipeline lifecycle events to durable storage. It is the execution substrate for experiment applications such as dr-bottleneck.

Install

pip install dr-queues

Or with uv:

uv add dr-queues

RabbitMQ and MongoDB are not bundled with the package — run them locally (Docker Compose) or point AMQP_URL / MONGODB_URL at your infrastructure.

After install, try the demo CLI:

dr-queues-demo --repeats 2 --lanes 1

What dr-queues is

  • AMQP staged pipeline: per-stage pending/completed queue pairs chained together
  • Slim JobEnvelope for job state on the wire
  • WorkerPool and TerminalTap with pluggable step handlers
  • Append-only EventSink implementations (MongoDB happy path, AMQP optional)
  • Run manifest for multi-process worker coordination
  • Minimal workflow engine: ordered steps + HandlerRegistry

What dr-queues is not

  • LLM calls, prompts, or model profiles
  • Dataset loading, HumanEval, or experiment metrics
  • JSONL report assembly
  • General domain EventBus / webhook dispatch (deferred to a future layer)

Workers append pipeline events before acking and forwarding jobs to the next stage. That append-before-forward invariant matches an event-sourced write-ahead log: durable telemetry precedes propagation.

Requirements

  • Python 3.13+ with uv
  • Docker Compose for local RabbitMQ and MongoDB

Environment

Variable Default Purpose
AMQP_URL amqp://guest:guest@localhost:5672/ RabbitMQ connection
MONGODB_URL mongodb://localhost:27017/dr_queues MongoDB event store

Local services

docker compose up -d
  • RabbitMQ management UI: http://localhost:15672 (guest/guest)
  • MongoDB: mongodb://localhost:27017/dr_queues

Quick start

From a git checkout:

uv sync
docker compose up -d
uv run dr-queues-demo \
  --repeats 5 \
  --workers slow=4,transform=4,finalize=2

Or use the repo script wrapper (same CLI):

uv run python scripts/run_pipeline_demo.py \
  --repeats 5 \
  --workers slow=4,transform=4,finalize=2

The demo runs a 3-stage dummy pipeline (sleep_msadd_prefixrecord_artifact) with MongoDB as the default event sink.

Options:

dr-queues-demo --sink amqp --repeats 2
dr-queues-demo --sink both --lanes 1 --repeats 1

Each run writes a manifest to .runs/{run_id}/manifest.json. The demo prints run_id=... at the start — use that value when querying MongoDB (do not use the literal placeholder demo-...).

On success you should see output like events=70 terminals=10 for --repeats 5 with the default 2 lanes (10 jobs × 7 events per job).

Inspect events in MongoDB

Events are stored in the pipeline_events collection. Replace YOUR_RUN_ID with the run_id printed by the demo (e.g. demo-56bd0ce5).

Count events for one run:

mongosh mongodb://localhost:27017/dr_queues \
  --eval 'db.pipeline_events.countDocuments({run_id: "YOUR_RUN_ID"})'

List all run IDs that have events:

mongosh mongodb://localhost:27017/dr_queues \
  --eval 'db.pipeline_events.distinct("run_id")'

Count all events in the collection (across every run):

mongosh mongodb://localhost:27017/dr_queues \
  --eval 'db.pipeline_events.countDocuments({})'

Preview a few events from a run:

mongosh mongodb://localhost:27017/dr_queues \
  --eval 'db.pipeline_events.find({run_id: "YOUR_RUN_ID"}).limit(3).pretty()'

If counts are zero, check:

  • You used the actual run_id from demo output, not the placeholder text.
  • The demo used the default Mongo sink (--sink mongo). AMQP-only runs (--sink amqp) do not write to MongoDB.
  • MongoDB is running (docker compose up -d) and reachable at MONGODB_URL.
  • You re-ran the demo after starting Mongo if the first attempt failed to connect.

Package layout

Module Role
amqp/ Connection helpers, stage queue pairs
pipeline/ JobEnvelope, WorkerPool, TerminalTap, runner
events/ PipelineEvent, EventSink, Mongo/AMQP/memory sinks, event filters
manifest/ Run manifest read/write, worker CLI helpers
workflow/ PipelineDefinition, HandlerRegistry, Pipeline

Public API

Import from dr_queues:

  • Setup / run: setup_run_queues, run_in_process, seed_jobs, seed_manifest_jobs
  • Runtime: WorkerPool, TerminalTap, JobEnvelope
  • Workflow: PipelineDefinition, HandlerRegistry, Pipeline
  • Events: PipelineEvent, EventSink, MongoEventSink, AmqpEventSink, MemoryEventSink, filter_run_events

Detached stage workers

Resize or run a single stage in a separate process:

dr-queues-stage-worker \
  --run-id demo-abc123 \
  --stage transform \
  --workers 5 \
  --replace

Handlers must be registered in the worker process via --handlers-module (default: dr_queues.demo_handlers).

Future layers

A general EventBus, domain EventAdapter, and webhook/hook dispatch will likely live in a separate package or in dr-bottleneck, built on top of dr-queues. Pipeline lifecycle events may eventually map to versioned domain event types; dr-queues stays focused on queue-based execution and pipeline telemetry.

Development

uv sync
docker compose up -d
scripts/pre-check.sh              # ruff, ty, pytest (14 tests)
uv run pytest -m integration      # integration tests only; needs docker compose

Full manual smoke test:

dr-queues-demo \
  --repeats 5 \
  --workers slow=4,transform=4,finalize=2

# optional: exercise AMQP event sink instead of Mongo
dr-queues-demo --sink amqp --repeats 2

Build a wheel locally before publishing:

uv build
tar -tzf dist/dr_queues-*.tar.gz | head
unzip -l dist/dr_queues-*.whl

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dr_queues-0.1.1.tar.gz (103.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dr_queues-0.1.1-py3-none-any.whl (25.3 kB view details)

Uploaded Python 3

File details

Details for the file dr_queues-0.1.1.tar.gz.

File metadata

  • Download URL: dr_queues-0.1.1.tar.gz
  • Upload date:
  • Size: 103.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.0

File hashes

Hashes for dr_queues-0.1.1.tar.gz
Algorithm Hash digest
SHA256 074d8e8a87bfa301bf04c830842764e57e429a65389a1f5fb20c3cab78daea27
MD5 7745beadfb13d90deb09f3d558e53be6
BLAKE2b-256 4cd03fc00d0352bdf50dfc531f4a72d7b1f5ef0053b5aea225d86a414197d719

See more details on using hashes here.

File details

Details for the file dr_queues-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: dr_queues-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 25.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.0

File hashes

Hashes for dr_queues-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 da7609ae89b763a5991a7189ce45b1bbb3069b9f4afc3d4ee9343bb62edc60bb
MD5 32368ed9838b200f7956fb6378c4f9a6
BLAKE2b-256 a088c756a32655fecd194249408f606548b39d4f4e64b81cf333c971966f9c82

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page