Skip to main content

RRQ is a library for creating reliable job queues using Rust, Redis. with language-agnostic producers and workers.

Project description

RRQ: Reliable Redis Queue

PyPI License

RRQ is a distributed job queue that actually works. It combines a Rust-powered orchestrator with language-native workers to give you the reliability of battle-tested infrastructure with the flexibility of writing job handlers in Python, TypeScript, or Rust.

Why RRQ?

Most job queues make you choose: either fight with complex distributed systems concepts, or accept unreliable "good enough" solutions. RRQ takes a different approach:

  • Rust orchestrator, any-language workers - The hard parts (scheduling, retries, locking, timeouts) are handled by a single-binary Rust process. Your job handlers are just normal async functions in your preferred language.

  • Redis as the source of truth - No separate databases to manage. Jobs, queues, locks, and results all live in Redis with atomic operations and predictable semantics.

  • Production-grade features built in - Retry policies, dead letter queues, job timeouts, cron scheduling, distributed tracing, and health checks work out of the box.

  • Fast and lightweight - The Rust orchestrator handles thousands of jobs per second with minimal memory. Workers are isolated processes that can be scaled independently.

How It Works

┌──────────────────────────────┐
│     Your Application         │
│  (Python, TypeScript, Rust)  │
│                              │
│ client.enqueue("job", {...}) │
└───────────────┬──────────────┘
                │ enqueue jobs
                ▼
      ┌───────────────────────┐
      │         Redis         │
      │  queues, jobs, locks  │
      └──────────┬────────────┘
                 │ poll/dispatch
                 ▼
      ┌──────────────────────────────┐
      │     RRQ Orchestrator         │
      │   (single Rust binary)       │
      │ • scheduling & retries       │
      │ • timeouts & deadlines       │
      │ • dead letter queue          │
      │ • cron jobs                  │
      └──────────┬───────────────────┘
                 │ socket protocol
                 ▼
   ┌─────────────────────────────────────────┐
   │              Job Runners                 │
   │  ┌─────────────┐    ┌─────────────┐     │
   │  │   Python    │    │ TypeScript  │     │
   │  │   Worker    │    │   Worker    │     │
   │  └─────────────┘    └─────────────┘     │
   └─────────────────────────────────────────┘

This Package

This Python package (rrq) gives you everything you need to work with RRQ from Python:

  • Producer client - Enqueue jobs from your Python application
  • Runner runtime - Execute job handlers written in Python
  • OpenTelemetry integration - Distributed tracing from producer to runner

The Rust orchestrator binary is bundled in the wheel, so there's nothing else to install.

Quick Start

1. Install

pip install rrq
# or
uv pip install rrq

2. Create configuration (rrq.toml)

[rrq]
redis_dsn = "redis://localhost:6379/0"
default_runner_name = "python"

[rrq.runners.python]
type = "socket"
cmd = ["rrq-runner", "--settings", "myapp.runner:settings"]
tcp_socket = "127.0.0.1:9000"

3. Write a job handler

# myapp/handlers.py
from rrq.runner import ExecutionRequest

async def send_welcome_email(request: ExecutionRequest):
    user_email = request.params.get("email")
    template = request.params.get("template", "welcome")

    # Your email sending logic here
    await send_email(to=user_email, template=template)

    return {"sent": True, "email": user_email}

4. Register handlers

# myapp/runner.py
from rrq.runner_settings import PythonRunnerSettings
from rrq.registry import Registry
from myapp import handlers

registry = Registry()
registry.register("send_welcome_email", handlers.send_welcome_email)

settings = PythonRunnerSettings(registry=registry)

5. Start the system

# Terminal 1: Start the orchestrator (runs runners automatically)
rrq worker run --config rrq.toml

# That's it! The orchestrator spawns and manages your Python runners.

6. Enqueue jobs

import asyncio
from rrq.client import RRQClient

async def main():
    client = RRQClient(config_path="rrq.toml")

    job_id = await client.enqueue(
        "send_welcome_email",
        {
            "params": {
                "email": "user@example.com",
                "template": "welcome",
            }
        },
    )

    print(f"Enqueued job: {job_id}")
    await client.close()

asyncio.run(main())

Features

Scheduled Jobs

Delay job execution:

# Run in 5 minutes
await client.enqueue("cleanup", {"defer_by_seconds": 300})

# Run at a specific time
from datetime import datetime, timezone
await client.enqueue(
    "report",
    {"defer_until": datetime(2024, 1, 1, 9, 0, tzinfo=timezone.utc)},
)

Unique Jobs (Idempotency)

Prevent duplicate jobs:

# Only one job with this key will be enqueued
await client.enqueue_with_unique_key(
    "process_order",
    "order-123",
    {"params": {"order_id": "123"}},
)

Rate Limiting

Limit how often a job can run:

job_id = await client.enqueue_with_rate_limit(
    "sync_user",
    {
        "params": {"user_id": "456"},
        "rate_limit_key": "user-456",
        "rate_limit_seconds": 60,
    },
)
if job_id is None:
    print("Rate limited, try again later")

Debouncing

Delay and deduplicate rapid job submissions:

# Only the last enqueue within the window will execute
await client.enqueue_with_debounce(
    "save_document",
    {
        "params": {"doc_id": "789"},
        "debounce_key": "doc-789",
        "debounce_seconds": 5,
    },
)

Cron Jobs

Schedule recurring jobs in rrq.toml:

[[rrq.cron_jobs]]
function_name = "daily_report"
schedule = "0 0 9 * * *"  # 9 AM daily (6-field cron with seconds)
queue_name = "scheduled"

Job Status

Check job progress:

status = await client.get_job_status(job_id)
print(f"Status: {status}")

OpenTelemetry

Enable distributed tracing:

from rrq.integrations import otel
otel.enable(service_name="my-service")

Traces propagate from producer → orchestrator → runner automatically.

Configuration Reference

See docs/CONFIG_REFERENCE.md for the full TOML schema.

Key settings:

[rrq]
redis_dsn = "redis://localhost:6379/0"
default_runner_name = "python"
default_job_timeout_seconds = 300  # 5 minutes
default_max_retries = 5

[rrq.runners.python]
type = "socket"
cmd = ["rrq-runner", "--settings", "myapp.runner:settings"]
tcp_socket = "127.0.0.1:9000"
pool_size = 4           # Number of runner processes
max_in_flight = 10      # Concurrent jobs per runner

Related Packages

Package Language Purpose
rrq Python Producer client + runner (this package)
rrq-ts TypeScript Producer client + runner
rrq Rust Orchestrator binary
rrq-producer Rust Native producer client
rrq-runner Rust Native runner runtime

Requirements

  • Python 3.11+
  • Redis 5.0+

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

rrq-0.10.7-py3-none-manylinux_2_35_x86_64.whl (8.0 MB view details)

Uploaded Python 3manylinux: glibc 2.35+ x86-64

rrq-0.10.7-py3-none-manylinux_2_35_aarch64.whl (7.8 MB view details)

Uploaded Python 3manylinux: glibc 2.35+ ARM64

rrq-0.10.7-py3-none-macosx_11_0_arm64.whl (7.8 MB view details)

Uploaded Python 3macOS 11.0+ ARM64

File details

Details for the file rrq-0.10.7-py3-none-manylinux_2_35_x86_64.whl.

File metadata

  • Download URL: rrq-0.10.7-py3-none-manylinux_2_35_x86_64.whl
  • Upload date:
  • Size: 8.0 MB
  • Tags: Python 3, manylinux: glibc 2.35+ x86-64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rrq-0.10.7-py3-none-manylinux_2_35_x86_64.whl
Algorithm Hash digest
SHA256 6e8b1f7f00c538c975acfea5224656df3779c226cdc5bd532cb8b8f7120bc0a5
MD5 cc0747a83fec0d06cac9e613cef5aa16
BLAKE2b-256 c16a59655e953318a89a686c42611fcad0b49b58e647fdc827a9279b9c4cb361

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.10.7-py3-none-manylinux_2_35_x86_64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rrq-0.10.7-py3-none-manylinux_2_35_aarch64.whl.

File metadata

File hashes

Hashes for rrq-0.10.7-py3-none-manylinux_2_35_aarch64.whl
Algorithm Hash digest
SHA256 79fb35c13dce84ef222c9fa9bebfbf9fe702fd4b620079f4f6e168d6e8ec109d
MD5 17853beaf4c648f3563797de6db04da6
BLAKE2b-256 13ab75d0331541936a25e7b749e14f8012caabb6c4797d2e7b7d18787d8a3f5e

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.10.7-py3-none-manylinux_2_35_aarch64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rrq-0.10.7-py3-none-macosx_11_0_arm64.whl.

File metadata

  • Download URL: rrq-0.10.7-py3-none-macosx_11_0_arm64.whl
  • Upload date:
  • Size: 7.8 MB
  • Tags: Python 3, macOS 11.0+ ARM64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rrq-0.10.7-py3-none-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 ac7ca0b248eeb69930ea0d6e822d86760627bbfc6f55d6353f1966270483403d
MD5 a55cb5c86edf24858bb2995a37dcdea6
BLAKE2b-256 e97723d27a5f7eacc99eea73b3e8539fb5432c3f57385cc10e360c3045b13783

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.10.7-py3-none-macosx_11_0_arm64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page