Skip to main content

RRQ is a library for creating reliable job queues using Rust, Redis. with language-agnostic producers and workers.

Project description

RRQ: Reliable Redis Queue

PyPI License

RRQ is a distributed job queue that actually works. It combines a Rust-powered orchestrator with language-native workers to give you the reliability of battle-tested infrastructure with the flexibility of writing job handlers in Python, TypeScript, or Rust.

Why RRQ?

Most job queues make you choose: either fight with complex distributed systems concepts, or accept unreliable "good enough" solutions. RRQ takes a different approach:

  • Rust orchestrator, any-language workers - The hard parts (scheduling, retries, locking, timeouts) are handled by a single-binary Rust process. Your job handlers are just normal async functions in your preferred language.

  • Redis as the source of truth - No separate databases to manage. Jobs, queues, locks, and results all live in Redis with atomic operations and predictable semantics.

  • Production-grade features built in - Retry policies, dead letter queues, job timeouts, cron scheduling, distributed tracing, and health checks work out of the box.

  • Fast and lightweight - The Rust orchestrator handles thousands of jobs per second with minimal memory. Workers are isolated processes that can be scaled independently.

How It Works

┌──────────────────────────────┐
│     Your Application         │
│  (Python, TypeScript, Rust)  │
│                              │
│ client.enqueue("job", {...}) │
└───────────────┬──────────────┘
                │ enqueue jobs
                ▼
      ┌───────────────────────┐
      │         Redis         │
      │  queues, jobs, locks  │
      └──────────┬────────────┘
                 │ poll/dispatch
                 ▼
      ┌──────────────────────────────┐
      │     RRQ Orchestrator         │
      │   (single Rust binary)       │
      │ • scheduling & retries       │
      │ • timeouts & deadlines       │
      │ • dead letter queue          │
      │ • cron jobs                  │
      └──────────┬───────────────────┘
                 │ socket protocol
                 ▼
   ┌─────────────────────────────────────────┐
   │              Job Runners                 │
   │  ┌─────────────┐    ┌─────────────┐     │
   │  │   Python    │    │ TypeScript  │     │
   │  │   Worker    │    │   Worker    │     │
   │  └─────────────┘    └─────────────┘     │
   └─────────────────────────────────────────┘

This Package

This Python package (rrq) gives you everything you need to work with RRQ from Python:

  • Producer client - Enqueue jobs from your Python application
  • Runner runtime - Execute job handlers written in Python
  • OpenTelemetry integration - Distributed tracing from producer to runner

The Rust orchestrator binary is bundled in the wheel, so there's nothing else to install.

Quick Start

1. Install

pip install rrq
# or
uv pip install rrq

2. Create configuration (rrq.toml)

[rrq]
redis_dsn = "redis://localhost:6379/0"
default_runner_name = "python"

[rrq.runners.python]
type = "socket"
cmd = ["rrq-runner", "--settings", "myapp.runner:settings"]
tcp_port = 9000

3. Write a job handler

# myapp/handlers.py
from rrq.runner import ExecutionRequest

async def send_welcome_email(request: ExecutionRequest):
    user_email = request.params.get("email")
    template = request.params.get("template", "welcome")

    # Your email sending logic here
    await send_email(to=user_email, template=template)

    return {"sent": True, "email": user_email}

4. Register handlers

# myapp/runner.py
from rrq.runner_settings import PythonRunnerSettings
from rrq.registry import Registry
from myapp import handlers

registry = Registry()
registry.register("send_welcome_email", handlers.send_welcome_email)

settings = PythonRunnerSettings(registry=registry)

5. Start the system

# Terminal 1: Start the orchestrator (runs runners automatically)
rrq worker run --config rrq.toml

# That's it! The orchestrator spawns and manages your Python runners.

6. Enqueue jobs

import asyncio
from rrq.client import RRQClient

async def main():
    client = RRQClient(config_path="rrq.toml")

    job_id = await client.enqueue(
        "send_welcome_email",
        {
            "params": {
                "email": "user@example.com",
                "template": "welcome",
            }
        },
    )

    print(f"Enqueued job: {job_id}")
    await client.close()

asyncio.run(main())

Features

Scheduled Jobs

Delay job execution:

# Run in 5 minutes
await client.enqueue("cleanup", {"defer_by_seconds": 300})

# Run at a specific time
from datetime import datetime, timezone
await client.enqueue(
    "report",
    {"defer_until": datetime(2024, 1, 1, 9, 0, tzinfo=timezone.utc)},
)

Unique Jobs (Idempotency)

Prevent duplicate jobs:

# Only one job with this key will be enqueued
await client.enqueue_with_unique_key(
    "process_order",
    "order-123",
    {"params": {"order_id": "123"}},
)

Rate Limiting

Limit how often a job can run:

job_id = await client.enqueue_with_rate_limit(
    "sync_user",
    {
        "params": {"user_id": "456"},
        "rate_limit_key": "user-456",
        "rate_limit_seconds": 60,
    },
)
if job_id is None:
    print("Rate limited, try again later")

Debouncing

Delay and deduplicate rapid job submissions:

# Only the last enqueue within the window will execute
await client.enqueue_with_debounce(
    "save_document",
    {
        "params": {"doc_id": "789"},
        "debounce_key": "doc-789",
        "debounce_seconds": 5,
    },
)

Cron Jobs

Schedule recurring jobs in rrq.toml:

[[rrq.cron_jobs]]
function_name = "daily_report"
schedule = "0 0 9 * * *"  # 9 AM daily (6-field cron with seconds)
queue_name = "scheduled"

Job Status

Check job progress:

status = await client.get_job_status(job_id)
print(f"Status: {status}")

OpenTelemetry

Enable distributed tracing:

from rrq.integrations import otel
otel.enable(service_name="my-service")

Traces propagate from producer → orchestrator → runner automatically.

Configuration Reference

See docs/CONFIG_REFERENCE.md for the full TOML schema.

Key settings:

[rrq]
redis_dsn = "redis://localhost:6379/0"
default_runner_name = "python"
default_job_timeout_seconds = 300  # 5 minutes
default_max_retries = 5

[rrq.runners.python]
type = "socket"
cmd = ["rrq-runner", "--settings", "myapp.runner:settings"]
tcp_port = 9000
pool_size = 4           # Number of runner processes
max_in_flight = 10      # Concurrent jobs per runner

Related Packages

Package Language Purpose
rrq Python Producer client + runner (this package)
rrq-ts TypeScript Producer client + runner
rrq Rust Orchestrator binary
rrq-producer Rust Native producer client
rrq-runner Rust Native runner runtime

Requirements

  • Python 3.11+
  • Redis 5.0+

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

rrq-0.11.1-py3-none-manylinux_2_35_x86_64.whl (8.0 MB view details)

Uploaded Python 3manylinux: glibc 2.35+ x86-64

rrq-0.11.1-py3-none-manylinux_2_35_aarch64.whl (7.8 MB view details)

Uploaded Python 3manylinux: glibc 2.35+ ARM64

rrq-0.11.1-py3-none-macosx_11_0_arm64.whl (7.8 MB view details)

Uploaded Python 3macOS 11.0+ ARM64

File details

Details for the file rrq-0.11.1-py3-none-manylinux_2_35_x86_64.whl.

File metadata

  • Download URL: rrq-0.11.1-py3-none-manylinux_2_35_x86_64.whl
  • Upload date:
  • Size: 8.0 MB
  • Tags: Python 3, manylinux: glibc 2.35+ x86-64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rrq-0.11.1-py3-none-manylinux_2_35_x86_64.whl
Algorithm Hash digest
SHA256 357468a643cc968a1cecc7a0c8b0160a9f9ce2c016f3edfd36cc6f27c0acc03e
MD5 4a3eca576e23b84e2a083e58b9eab9db
BLAKE2b-256 4e78331c77f7bc2b2c7e0955916dc2ddd8a108574b34701b1cec5f3d1c6009d0

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.11.1-py3-none-manylinux_2_35_x86_64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rrq-0.11.1-py3-none-manylinux_2_35_aarch64.whl.

File metadata

File hashes

Hashes for rrq-0.11.1-py3-none-manylinux_2_35_aarch64.whl
Algorithm Hash digest
SHA256 170727bf92d1589621948cf2156e58413fc955cfe8ef456b0b07e5b091760944
MD5 d6bef3983e09a698743ff7d2c399e779
BLAKE2b-256 b5f4d63e36071fab087d02a9d6511f11abf650bd2f283dc79db9e31c7f60f92b

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.11.1-py3-none-manylinux_2_35_aarch64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rrq-0.11.1-py3-none-macosx_11_0_arm64.whl.

File metadata

  • Download URL: rrq-0.11.1-py3-none-macosx_11_0_arm64.whl
  • Upload date:
  • Size: 7.8 MB
  • Tags: Python 3, macOS 11.0+ ARM64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rrq-0.11.1-py3-none-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 eb5ae85fef35bed40ad36c81bb7ccf24ce283d4b1370d14e2ad8c931fafbbdf9
MD5 f2114b12580d344d2296733bab92f6f5
BLAKE2b-256 c75242b14ceda1d4b123b6f5d7e70210720cfdddc678a95f861e15a3065c572e

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.11.1-py3-none-macosx_11_0_arm64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page