Skip to main content

An async-first, Redis-backed job queue for Python.

Project description

toro 🐂

An async-first, Redis-backed job queue for Python. Every state transition is an atomic Lua script; producing and processing are asyncio end to end.

pip install toro-queue      # the import name is `toro`

Installed as toro-queue on PyPI (the name toro was taken), but you import toro. See DESIGN.md for the architecture and the at-least-once reliability model.

Why toro

  • Async-native. Enqueue and process with async/await — no thread pools, no sync bridge. A natural fit for FastAPI, aiohttp, or any asyncio app.
  • Atomic by construction. Claims, retries, promotions and finishes are Lua scripts, so a job can't be lost or double-committed between two round trips.
  • At-least-once delivery. Per-job locks + a background mark-and-sweep recover jobs from workers that crashed — without the visibility-timeout double-delivery trap of some other queues.
  • Typed. Ships py.typed; the public API is fully annotated.

Features

Enqueue delayed jobs, global priorities (FIFO within a band)
Retries fixed or exponential backoff, capped attempts
Schedules repeatable cron and fixed-interval (every) jobs
Rate limiting queue-wide token bucket shared across all workers
Dedup custom (idempotent) job ids + a throttle window ({id, ttl})
Auto-removal keep the last N and/or finished-within-age completed/failed
Reliability per-job locks, lock renewal, stalled-job recovery
Observability progress, per-job logs, lifecycle events, await result()
Lifecycle pause / resume, graceful shutdown that drains in-flight jobs
Dashboard matador — a live web UI

Quick start

import asyncio
from toro import Queue, Worker

async def main():
    queue = Queue("emails")
    await queue.add("welcome", {"to": "ada@example.com"})

    async def process(job):
        print("sending", job.data)
        return {"ok": True}

    worker = Worker("emails", process, concurrency=8)
    worker.on("completed", lambda job, result: print("done", job.id))
    await worker.run()

asyncio.run(main())

A taste of the options

# Priorities, delay, and retry-with-backoff
await queue.add("report", data, priority=10, delay=5000,
                attempts=5, backoff={"type": "exponential", "delay": 1000})

# Idempotent custom id (a second add with the same id is ignored)
await queue.add("charge", data, job_id="order-1234")

# A repeatable schedule (cron or every-N-ms); "run now" with trigger_scheduler
await queue.add_scheduler("nightly-rollup", cron="0 0 * * *")

# Queue-wide rate limit: at most 100 jobs / second across every worker
worker = Worker("emails", process, rate_limit={"max": 100, "duration": 1000})

# Wait for a result from the producer side
job = await queue.add("resize", {"src": "a.png"})
print(await job.result(timeout=30))

Develop

Managed with uv; the Astral toolchain throughout.

uv sync                          # venv + deps + dev group
uv run ruff check .              # lint  (strict: select = ALL)
uv run ruff format .             # format
uv run ty check                  # type check
uv run pytest -m "unit or integration"   # tests (integration needs Redis on :6379)
uv run python examples/basic.py

The suite is a pyramid — -m unit (fast, no Redis), -m integration (Redis), and -m load (the open-loop benchmark harness in tests/load/).

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

toro_queue-0.1.0.tar.gz (74.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

toro_queue-0.1.0-py3-none-any.whl (28.6 kB view details)

Uploaded Python 3

File details

Details for the file toro_queue-0.1.0.tar.gz.

File metadata

  • Download URL: toro_queue-0.1.0.tar.gz
  • Upload date:
  • Size: 74.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for toro_queue-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d3a31a6dbd72a02e2d94e0632cf7f05633578e90491f5b8b277fede23a5b6d97
MD5 2c18b2f33cf860690746ad0c9fe13287
BLAKE2b-256 12f9b7f9ba0ee93d220d54243c7312832443b1f72ea24cde7a84b7b726c87960

See more details on using hashes here.

Provenance

The following attestation bundles were made for toro_queue-0.1.0.tar.gz:

Publisher: release.yml on ilovepixelart/toro

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file toro_queue-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: toro_queue-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 28.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for toro_queue-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1a3cbfa1eac704a919d9b0c3060f6a7054a7bb98c6a1a53b74f25d9190657fdc
MD5 76ae0ab9f436e04d6dd88263388b85fd
BLAKE2b-256 bf80db0389ac60b333325c8fc4db33d396ed92785ae0344c735485c438eb985a

See more details on using hashes here.

Provenance

The following attestation bundles were made for toro_queue-0.1.0-py3-none-any.whl:

Publisher: release.yml on ilovepixelart/toro

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page