An async-first, Redis-backed job queue for Python.
Project description
toro 🐂
An async-first, Redis-backed job queue for Python. Every state transition is
an atomic Lua script; producing and processing are asyncio end to end.
pip install toro-queue # the import name is `toro`
Installed as
toro-queueon PyPI (the nametorowas taken), but youimport toro. See DESIGN.md for the architecture and the at-least-once reliability model.
Why toro
- Async-native. Enqueue and process with
async/await— no thread pools, no sync bridge. A natural fit for FastAPI, aiohttp, or any asyncio app. - Atomic by construction. Claims, retries, promotions and finishes are Lua scripts, so a job can't be lost or double-committed between two round trips.
- At-least-once delivery. Per-job locks + a background mark-and-sweep recover jobs from workers that crashed — without the visibility-timeout double-delivery trap of some other queues.
- Typed. Ships
py.typed; the public API is fully annotated.
Features
| Enqueue | delayed jobs, global priorities (FIFO within a band) |
| Retries | fixed or exponential backoff, capped attempts |
| Schedules | repeatable cron and fixed-interval (every) jobs |
| Rate limiting | queue-wide token bucket shared across all workers |
| Dedup | custom (idempotent) job ids + a throttle window ({id, ttl}) |
| Auto-removal | keep the last N and/or finished-within-age completed/failed |
| Reliability | per-job locks, lock renewal, stalled-job recovery |
| Observability | progress, per-job logs, lifecycle events, await result() |
| Lifecycle | pause / resume, graceful shutdown that drains in-flight jobs |
| Dashboard | matador — a live web UI |
Quick start
import asyncio
from toro import Queue, Worker
async def main():
queue = Queue("emails")
await queue.add("welcome", {"to": "ada@example.com"})
async def process(job):
print("sending", job.data)
return {"ok": True}
worker = Worker("emails", process, concurrency=8)
worker.on("completed", lambda job, result: print("done", job.id))
await worker.run()
asyncio.run(main())
A taste of the options
# Priorities, delay, and retry-with-backoff
await queue.add("report", data, priority=10, delay=5000,
attempts=5, backoff={"type": "exponential", "delay": 1000})
# Idempotent custom id (a second add with the same id is ignored)
await queue.add("charge", data, job_id="order-1234")
# A repeatable schedule (cron or every-N-ms); "run now" with trigger_scheduler
await queue.add_scheduler("nightly-rollup", cron="0 0 * * *")
# Queue-wide rate limit: at most 100 jobs / second across every worker
worker = Worker("emails", process, rate_limit={"max": 100, "duration": 1000})
# Wait for a result from the producer side
job = await queue.add("resize", {"src": "a.png"})
print(await job.result(timeout=30))
Develop
Managed with uv; the Astral toolchain throughout.
uv sync # venv + deps + dev group
uv run ruff check . # lint (strict: select = ALL)
uv run ruff format . # format
uv run ty check # type check
uv run pytest -m "unit or integration" # tests (integration needs Redis on :6379)
uv run python examples/basic.py
The suite is a pyramid — -m unit (fast, no Redis), -m integration (Redis),
and -m load (the open-loop benchmark harness in tests/load/).
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file toro_queue-0.1.0.tar.gz.
File metadata
- Download URL: toro_queue-0.1.0.tar.gz
- Upload date:
- Size: 74.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d3a31a6dbd72a02e2d94e0632cf7f05633578e90491f5b8b277fede23a5b6d97
|
|
| MD5 |
2c18b2f33cf860690746ad0c9fe13287
|
|
| BLAKE2b-256 |
12f9b7f9ba0ee93d220d54243c7312832443b1f72ea24cde7a84b7b726c87960
|
Provenance
The following attestation bundles were made for toro_queue-0.1.0.tar.gz:
Publisher:
release.yml on ilovepixelart/toro
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
toro_queue-0.1.0.tar.gz -
Subject digest:
d3a31a6dbd72a02e2d94e0632cf7f05633578e90491f5b8b277fede23a5b6d97 - Sigstore transparency entry: 1756705909
- Sigstore integration time:
-
Permalink:
ilovepixelart/toro@604033b085a165601cc042869545acd8814f738d -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/ilovepixelart
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@604033b085a165601cc042869545acd8814f738d -
Trigger Event:
push
-
Statement type:
File details
Details for the file toro_queue-0.1.0-py3-none-any.whl.
File metadata
- Download URL: toro_queue-0.1.0-py3-none-any.whl
- Upload date:
- Size: 28.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1a3cbfa1eac704a919d9b0c3060f6a7054a7bb98c6a1a53b74f25d9190657fdc
|
|
| MD5 |
76ae0ab9f436e04d6dd88263388b85fd
|
|
| BLAKE2b-256 |
bf80db0389ac60b333325c8fc4db33d396ed92785ae0344c735485c438eb985a
|
Provenance
The following attestation bundles were made for toro_queue-0.1.0-py3-none-any.whl:
Publisher:
release.yml on ilovepixelart/toro
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
toro_queue-0.1.0-py3-none-any.whl -
Subject digest:
1a3cbfa1eac704a919d9b0c3060f6a7054a7bb98c6a1a53b74f25d9190657fdc - Sigstore transparency entry: 1756705966
- Sigstore integration time:
-
Permalink:
ilovepixelart/toro@604033b085a165601cc042869545acd8814f738d -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/ilovepixelart
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@604033b085a165601cc042869545acd8814f738d -
Trigger Event:
push
-
Statement type: