Skip to main content

RRQ is a library for creating reliable job queues using Rust, Redis. with language-agnostic producers and workers.

Project description

RRQ: Reliable Redis Queue

PyPI License

RRQ is a distributed job queue that actually works. It combines a Rust-powered orchestrator with language-native workers to give you the reliability of battle-tested infrastructure with the flexibility of writing job handlers in Python, TypeScript, or Rust.

Why RRQ?

Most job queues make you choose: either fight with complex distributed systems concepts, or accept unreliable "good enough" solutions. RRQ takes a different approach:

  • Rust orchestrator, any-language workers - The hard parts (scheduling, retries, locking, timeouts) are handled by a single-binary Rust process. Your job handlers are just normal async functions in your preferred language.

  • Redis as the source of truth - No separate databases to manage. Jobs, queues, locks, and results all live in Redis with atomic operations and predictable semantics.

  • Production-grade features built in - Retry policies, dead letter queues, job timeouts, cron scheduling, distributed tracing, and health checks work out of the box.

  • Fast and lightweight - The Rust orchestrator handles thousands of jobs per second with minimal memory. Workers are isolated processes that can be scaled independently.

How It Works

┌──────────────────────────────┐
│     Your Application         │
│  (Python, TypeScript, Rust)  │
│                              │
│ client.enqueue("job", {...}) │
└───────────────┬──────────────┘
                │ enqueue jobs
                ▼
      ┌───────────────────────┐
      │         Redis         │
      │  queues, jobs, locks  │
      └──────────┬────────────┘
                 │ poll/dispatch
                 ▼
      ┌──────────────────────────────┐
      │     RRQ Orchestrator         │
      │   (single Rust binary)       │
      │ • scheduling & retries       │
      │ • timeouts & deadlines       │
      │ • dead letter queue          │
      │ • cron jobs                  │
      └──────────┬───────────────────┘
                 │ socket protocol
                 ▼
   ┌─────────────────────────────────────────┐
   │              Job Runners                 │
   │  ┌─────────────┐    ┌─────────────┐     │
   │  │   Python    │    │ TypeScript  │     │
   │  │   Worker    │    │   Worker    │     │
   │  └─────────────┘    └─────────────┘     │
   └─────────────────────────────────────────┘

This Package

This Python package (rrq) gives you everything you need to work with RRQ from Python:

  • Producer client - Enqueue jobs from your Python application
  • Runner runtime - Execute job handlers written in Python
  • OpenTelemetry integration - Distributed tracing from producer to runner

The Rust orchestrator binary is bundled in the wheel, so there's nothing else to install.

Quick Start

1. Install

pip install rrq
# or
uv pip install rrq

2. Create configuration (rrq.toml)

[rrq]
redis_dsn = "redis://localhost:6379/0"
default_runner_name = "python"

[rrq.runners.python]
type = "socket"
cmd = ["rrq-runner", "--settings", "myapp.runner:settings"]
tcp_socket = "127.0.0.1:9000"

3. Write a job handler

# myapp/handlers.py
from rrq.runner import ExecutionRequest

async def send_welcome_email(request: ExecutionRequest):
    user_email = request.params.get("email")
    template = request.params.get("template", "welcome")

    # Your email sending logic here
    await send_email(to=user_email, template=template)

    return {"sent": True, "email": user_email}

4. Register handlers

# myapp/runner.py
from rrq.runner_settings import PythonRunnerSettings
from rrq.registry import Registry
from myapp import handlers

registry = Registry()
registry.register("send_welcome_email", handlers.send_welcome_email)

settings = PythonRunnerSettings(registry=registry)

5. Start the system

# Terminal 1: Start the orchestrator (runs runners automatically)
rrq worker run --config rrq.toml

# That's it! The orchestrator spawns and manages your Python runners.

6. Enqueue jobs

import asyncio
from rrq.client import RRQClient

async def main():
    client = RRQClient(config_path="rrq.toml")

    job_id = await client.enqueue(
        "send_welcome_email",
        {
            "params": {
                "email": "user@example.com",
                "template": "welcome",
            }
        },
    )

    print(f"Enqueued job: {job_id}")
    await client.close()

asyncio.run(main())

Features

Scheduled Jobs

Delay job execution:

# Run in 5 minutes
await client.enqueue("cleanup", {"defer_by_seconds": 300})

# Run at a specific time
from datetime import datetime, timezone
await client.enqueue(
    "report",
    {"defer_until": datetime(2024, 1, 1, 9, 0, tzinfo=timezone.utc)},
)

Unique Jobs (Idempotency)

Prevent duplicate jobs:

# Only one job with this key will be enqueued
await client.enqueue_with_unique_key(
    "process_order",
    "order-123",
    {"params": {"order_id": "123"}},
)

Rate Limiting

Limit how often a job can run:

job_id = await client.enqueue_with_rate_limit(
    "sync_user",
    {
        "params": {"user_id": "456"},
        "rate_limit_key": "user-456",
        "rate_limit_seconds": 60,
    },
)
if job_id is None:
    print("Rate limited, try again later")

Debouncing

Delay and deduplicate rapid job submissions:

# Only the last enqueue within the window will execute
await client.enqueue_with_debounce(
    "save_document",
    {
        "params": {"doc_id": "789"},
        "debounce_key": "doc-789",
        "debounce_seconds": 5,
    },
)

Cron Jobs

Schedule recurring jobs in rrq.toml:

[[rrq.cron_jobs]]
function_name = "daily_report"
schedule = "0 0 9 * * *"  # 9 AM daily (6-field cron with seconds)
queue_name = "scheduled"

Job Status

Check job progress:

status = await client.get_job_status(job_id)
print(f"Status: {status}")

OpenTelemetry

Enable distributed tracing:

from rrq.integrations import otel
otel.enable(service_name="my-service")

Traces propagate from producer → orchestrator → runner automatically.

Configuration Reference

See docs/CONFIG_REFERENCE.md for the full TOML schema.

Key settings:

[rrq]
redis_dsn = "redis://localhost:6379/0"
default_runner_name = "python"
default_job_timeout_seconds = 300  # 5 minutes
default_max_retries = 5

[rrq.runners.python]
type = "socket"
cmd = ["rrq-runner", "--settings", "myapp.runner:settings"]
tcp_socket = "127.0.0.1:9000"
pool_size = 4           # Number of runner processes
max_in_flight = 10      # Concurrent jobs per runner

Related Packages

Package Language Purpose
rrq Python Producer client + runner (this package)
rrq-ts TypeScript Producer client + runner
rrq Rust Orchestrator binary
rrq-producer Rust Native producer client
rrq-runner Rust Native runner runtime

Requirements

  • Python 3.11+
  • Redis 5.0+

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

rrq-0.10.2-py3-none-manylinux_2_35_x86_64.whl (8.0 MB view details)

Uploaded Python 3manylinux: glibc 2.35+ x86-64

rrq-0.10.2-py3-none-manylinux_2_35_aarch64.whl (7.8 MB view details)

Uploaded Python 3manylinux: glibc 2.35+ ARM64

rrq-0.10.2-py3-none-macosx_11_0_arm64.whl (7.8 MB view details)

Uploaded Python 3macOS 11.0+ ARM64

File details

Details for the file rrq-0.10.2-py3-none-manylinux_2_35_x86_64.whl.

File metadata

  • Download URL: rrq-0.10.2-py3-none-manylinux_2_35_x86_64.whl
  • Upload date:
  • Size: 8.0 MB
  • Tags: Python 3, manylinux: glibc 2.35+ x86-64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rrq-0.10.2-py3-none-manylinux_2_35_x86_64.whl
Algorithm Hash digest
SHA256 40bfccb070c12ec2f1ddd0035d214ff6cb1645d26743012f9fe8723bd323bd1f
MD5 92a5419c33a7dded56a92a9830870f94
BLAKE2b-256 45aa20b9c39203b65ab07fc262bda2ff8677fb258ba61247ee70ce38deb8f77b

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.10.2-py3-none-manylinux_2_35_x86_64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rrq-0.10.2-py3-none-manylinux_2_35_aarch64.whl.

File metadata

File hashes

Hashes for rrq-0.10.2-py3-none-manylinux_2_35_aarch64.whl
Algorithm Hash digest
SHA256 e5b805ea795a10cd1128cd7e192cda9c31c48387bb5e64c9273396f2b6f1b662
MD5 2d0bb6fdbd64766e8010426e5e0d59dc
BLAKE2b-256 06f639e0c7f27910b759edea93ea90a659151111fb0383bdce5292363bf0aadd

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.10.2-py3-none-manylinux_2_35_aarch64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rrq-0.10.2-py3-none-macosx_11_0_arm64.whl.

File metadata

  • Download URL: rrq-0.10.2-py3-none-macosx_11_0_arm64.whl
  • Upload date:
  • Size: 7.8 MB
  • Tags: Python 3, macOS 11.0+ ARM64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rrq-0.10.2-py3-none-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 a72a465a2cbca604fdd1e936375afb12da49f526d760d116c9c018027dbae430
MD5 f45e27557e7e14c018f21e4f8666fbae
BLAKE2b-256 6cb5f918acb8eb3de8a1df36dc5c43627a0beed1d2a81d78fa1db6b130070de5

See more details on using hashes here.

Provenance

The following attestation bundles were made for rrq-0.10.2-py3-none-macosx_11_0_arm64.whl:

Publisher: release.yml on GetResQ/rrq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page