Skip to main content

RRQ is a library for creating reliable job queues using Rust, Redis. with language-agnostic producers and workers.

Project description

RRQ: Reliable Redis Queue

PyPI License

RRQ is a distributed job queue that actually works. It combines a Rust-powered orchestrator with language-native workers to give you the reliability of battle-tested infrastructure with the flexibility of writing job handlers in Python, TypeScript, or Rust.

Why RRQ?

Most job queues make you choose: either fight with complex distributed systems concepts, or accept unreliable "good enough" solutions. RRQ takes a different approach:

  • Rust orchestrator, any-language workers - The hard parts (scheduling, retries, locking, timeouts) are handled by a single-binary Rust process. Your job handlers are just normal async functions in your preferred language.

  • Redis as the source of truth - No separate databases to manage. Jobs, queues, locks, and results all live in Redis with atomic operations and predictable semantics.

  • Production-grade features built in - Retry policies, dead letter queues, job timeouts, cron scheduling, distributed tracing, and health checks work out of the box.

  • Fast and lightweight - The Rust orchestrator handles thousands of jobs per second with minimal memory. Workers are isolated processes that can be scaled independently.

How It Works

┌──────────────────────────────┐
│     Your Application         │
│  (Python, TypeScript, Rust)  │
│                              │
│ client.enqueue("job", {...}) │
└───────────────┬──────────────┘
                │ enqueue jobs
                ▼
      ┌───────────────────────┐
      │         Redis         │
      │  queues, jobs, locks  │
      └──────────┬────────────┘
                 │ poll/dispatch
                 ▼
      ┌──────────────────────────────┐
      │     RRQ Orchestrator         │
      │   (single Rust binary)       │
      │ • scheduling & retries       │
      │ • timeouts & deadlines       │
      │ • dead letter queue          │
      │ • cron jobs                  │
      └──────────┬───────────────────┘
                 │ socket protocol
                 ▼
   ┌─────────────────────────────────────────┐
   │              Job Runners                 │
   │  ┌─────────────┐    ┌─────────────┐     │
   │  │   Python    │    │ TypeScript  │     │
   │  │   Worker    │    │   Worker    │     │
   │  └─────────────┘    └─────────────┘     │
   └─────────────────────────────────────────┘

This Package

This Python package (rrq) gives you everything you need to work with RRQ from Python:

  • Producer client - Enqueue jobs from your Python application
  • Runner runtime - Execute job handlers written in Python
  • OpenTelemetry integration - Distributed tracing from producer to runner

The Rust orchestrator binary is bundled in the wheel, so there's nothing else to install.

Quick Start

1. Install

pip install rrq
# or
uv pip install rrq

2. Create configuration (rrq.toml)

[rrq]
redis_dsn = "redis://localhost:6379/0"
default_runner_name = "python"

[rrq.runners.python]
type = "socket"
cmd = ["rrq-runner", "--settings", "myapp.runner:settings"]
tcp_socket = "127.0.0.1:9000"

3. Write a job handler

# myapp/handlers.py
from rrq.runner import ExecutionRequest

async def send_welcome_email(request: ExecutionRequest):
    user_email = request.params.get("email")
    template = request.params.get("template", "welcome")

    # Your email sending logic here
    await send_email(to=user_email, template=template)

    return {"sent": True, "email": user_email}

4. Register handlers

# myapp/runner.py
from rrq.runner_settings import PythonRunnerSettings
from rrq.registry import Registry
from myapp import handlers

registry = Registry()
registry.register("send_welcome_email", handlers.send_welcome_email)

settings = PythonRunnerSettings(registry=registry)

5. Start the system

# Terminal 1: Start the orchestrator (runs runners automatically)
rrq worker run --config rrq.toml

# That's it! The orchestrator spawns and manages your Python runners.

6. Enqueue jobs

import asyncio
from rrq.client import RRQClient

async def main():
    client = RRQClient(config_path="rrq.toml")

    job_id = await client.enqueue(
        "send_welcome_email",
        {
            "params": {
                "email": "user@example.com",
                "template": "welcome",
            }
        },
    )

    print(f"Enqueued job: {job_id}")
    await client.close()

asyncio.run(main())

Features

Scheduled Jobs

Delay job execution:

# Run in 5 minutes
await client.enqueue("cleanup", {"defer_by_seconds": 300})

# Run at a specific time
from datetime import datetime, timezone
await client.enqueue(
    "report",
    {"defer_until": datetime(2024, 1, 1, 9, 0, tzinfo=timezone.utc)},
)

Unique Jobs (Idempotency)

Prevent duplicate jobs:

# Only one job with this key will be enqueued
await client.enqueue_with_unique_key(
    "process_order",
    "order-123",
    {"params": {"order_id": "123"}},
)

Rate Limiting

Limit how often a job can run:

job_id = await client.enqueue_with_rate_limit(
    "sync_user",
    {
        "params": {"user_id": "456"},
        "rate_limit_key": "user-456",
        "rate_limit_seconds": 60,
    },
)
if job_id is None:
    print("Rate limited, try again later")

Debouncing

Delay and deduplicate rapid job submissions:

# Only the last enqueue within the window will execute
await client.enqueue_with_debounce(
    "save_document",
    {
        "params": {"doc_id": "789"},
        "debounce_key": "doc-789",
        "debounce_seconds": 5,
    },
)

Cron Jobs

Schedule recurring jobs in rrq.toml:

[[rrq.cron_jobs]]
function_name = "daily_report"
schedule = "0 0 9 * * *"  # 9 AM daily (6-field cron with seconds)
queue_name = "scheduled"

Job Status

Check job progress:

status = await client.get_job_status(job_id)
print(f"Status: {status}")

OpenTelemetry

Enable distributed tracing:

from rrq.integrations import otel
otel.enable(service_name="my-service")

Traces propagate from producer → orchestrator → runner automatically.

Configuration Reference

See docs/CONFIG_REFERENCE.md for the full TOML schema.

Key settings:

[rrq]
redis_dsn = "redis://localhost:6379/0"
default_runner_name = "python"
default_job_timeout_seconds = 300  # 5 minutes
default_max_retries = 3

[rrq.runners.python]
type = "socket"
cmd = ["rrq-runner", "--settings", "myapp.runner:settings"]
tcp_socket = "127.0.0.1:9000"
pool_size = 4           # Number of runner processes
max_in_flight = 10      # Concurrent jobs per runner

Related Packages

Package Language Purpose
rrq Python Producer client + runner (this package)
rrq-ts TypeScript Producer client + runner
rrq Rust Orchestrator binary
rrq-producer Rust Native producer client
rrq-runner Rust Native runner runtime

Requirements

  • Python 3.11+
  • Redis 5.0+

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

rrq-0.9.13-py3-none-manylinux_2_35_x86_64.whl (7.7 MB view details)

Uploaded Python 3manylinux: glibc 2.35+ x86-64

rrq-0.9.13-py3-none-manylinux_2_35_aarch64.whl (7.6 MB view details)

Uploaded Python 3manylinux: glibc 2.35+ ARM64

rrq-0.9.13-py3-none-macosx_11_0_arm64.whl (7.5 MB view details)

Uploaded Python 3macOS 11.0+ ARM64

File details

Details for the file rrq-0.9.13-py3-none-manylinux_2_35_x86_64.whl.

File metadata

  • Download URL: rrq-0.9.13-py3-none-manylinux_2_35_x86_64.whl
  • Upload date:
  • Size: 7.7 MB
  • Tags: Python 3, manylinux: glibc 2.35+ x86-64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rrq-0.9.13-py3-none-manylinux_2_35_x86_64.whl
Algorithm Hash digest
SHA256 5024b558119d3bf3b058546e4dc98a2f8cb07baa24bfbb4fb7cdf4e73620c621
MD5 0a053334c35cd9396a929a1bf614e053
BLAKE2b-256 be034bc710a620c205421cfa7e288a05b3c2757d86490a26dc6c97f5c528dd08

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.9.13-py3-none-manylinux_2_35_x86_64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rrq-0.9.13-py3-none-manylinux_2_35_aarch64.whl.

File metadata

File hashes

Hashes for rrq-0.9.13-py3-none-manylinux_2_35_aarch64.whl
Algorithm Hash digest
SHA256 0098d8f7746a247fe4bc83e1f5bd0bf77740108dbaff11ead78301820242b89f
MD5 d422a4e98d383e7a306f7d372c08a882
BLAKE2b-256 8efaa74c3f2a09172e05be43e44e2331342dc55a153a06409140437bfba9aaa5

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.9.13-py3-none-manylinux_2_35_aarch64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rrq-0.9.13-py3-none-macosx_11_0_arm64.whl.

File metadata

  • Download URL: rrq-0.9.13-py3-none-macosx_11_0_arm64.whl
  • Upload date:
  • Size: 7.5 MB
  • Tags: Python 3, macOS 11.0+ ARM64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rrq-0.9.13-py3-none-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 2b24ce1d691aa4ecb0540221d67c34268abb144314858a568354006e880820e2
MD5 312e5ca60314f740d053f9f84da517ba
BLAKE2b-256 c86b3fdf43c7925361aac0dea958088e4489ace9be314e49d808e2dd8c9988a5

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.9.13-py3-none-macosx_11_0_arm64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page