Skip to main content

Distributed job queue with DLQ, delayed jobs, in-flight tracking, Redis backend, and in-memory fallback

Project description

nodus-queue

Distributed job queue with Dead Letter Queue, delayed jobs, in-flight tracking, and visibility-timeout recovery. Redis backend for multi-instance production; in-memory fallback for dev and tests. Zero hard dependencies beyond tenacity.

Install

pip install nodus-queue              # core + in-memory backend
pip install "nodus-queue[redis]"     # + Redis backend

Quickstart

from nodus_queue import QueueJobPayload, get_queue, reset_queue

# In dev/test — in-memory backend (automatic when REDIS_URL is unset)
q = get_queue()

job = QueueJobPayload(job_id="run-123", task_name="agent.run")
q.enqueue(job)

# Worker side
job = q.dequeue(timeout=5)   # blocks up to 5 seconds
if job:
    try:
        # ... process job ...
        q.ack(job.job_id)    # remove from in-flight
    except Exception as e:
        q.fail(job.job_id, str(e))   # move to DLQ

Redis backend

REDIS_URL=redis://localhost:6379/0
from nodus_queue import get_queue

q = get_queue()   # picks up REDIS_URL automatically

Delayed jobs

# Schedule a job to run after 30 seconds
q.enqueue_delayed(job, delay_seconds=30)

# Promote ready jobs (call periodically in Redis mode)
count = q.process_delayed_jobs()

Crash recovery

# On worker startup — re-enqueue jobs stuck in-flight for > 5 minutes
q.requeue_stale_jobs(timeout_seconds=300)

Dead Letter Queue

depth = q.get_dlq_depth()
q.drain_dead_letters()               # clear all
q.remove_dead_letter("job-id-123")  # remove one

Optional Prometheus metrics

from prometheus_client import CollectorRegistry, Counter, Gauge
from nodus_queue import QueueMetrics, get_queue

REGISTRY = CollectorRegistry()
enq = Counter("queue_enqueue_total", "...", ["backend", "outcome"], registry=REGISTRY)

class MyMetrics(QueueMetrics):
    def on_enqueue(self, backend, outcome):
        enq.labels(backend=backend, outcome=outcome).inc()
    # override other hooks as needed

q = get_queue(metrics=MyMetrics())

Backend change callback

def on_change(event: str, payload: dict) -> None:
    print(f"Queue backend changed: {event} {payload}")

q = get_queue(on_backend_change=on_change)

Environment variables

Variable Default Purpose
REDIS_URL Redis connection URL
NODUS_QUEUE_NAME nodus:jobs Key prefix for all queue Redis keys
NODUS_QUEUE_MAXSIZE 100 Hard capacity limit
NODUS_REQUIRE_REDIS false Fail on startup if Redis is unavailable
EXECUTION_MODE thread distributed requires REDIS_URL
ENV production/prod requires REDIS_URL
TESTING / TEST_MODE Auto-select in-memory backend

Extracted from

AINDY/core/distributed_queue.py in the A.I.N.D.Y. runtime.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nodus_queue-0.1.0.tar.gz (18.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nodus_queue-0.1.0-py3-none-any.whl (15.6 kB view details)

Uploaded Python 3

File details

Details for the file nodus_queue-0.1.0.tar.gz.

File metadata

  • Download URL: nodus_queue-0.1.0.tar.gz
  • Upload date:
  • Size: 18.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for nodus_queue-0.1.0.tar.gz
Algorithm Hash digest
SHA256 584306995aac7dd353e060ded496c76ca9e093d2e66b8a1020c64f3814726e32
MD5 bfe8569b67d0e3279b3037c568a61972
BLAKE2b-256 a06d28e6cd5e51caf3e9f1f06dd4b383ed3c57a93132b718d71188c9c392b0e4

See more details on using hashes here.

File details

Details for the file nodus_queue-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: nodus_queue-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 15.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for nodus_queue-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 79e3c0d4964efbdb1e3da213f9402e48af13afd7535a8d6179667f32a573d62a
MD5 9413f6f52775bd4abf68c3ab79ca8113
BLAKE2b-256 912a0b4482b742f23d88e9d26a7df19d8a4540aae3cad375f1f87a9d307db1af

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page