Skip to main content

An async-first, Redis-backed job queue for Python.

Project description

toro 🐂

An async-first, Redis-backed job queue for Python. Every state transition is an atomic Lua script; producing and processing are asyncio end to end.

Python
PyPI Downloads License
Coverage Quality Gate Status
Reliability Rating Maintainability Rating Security Rating

pip install toro-queue      # the import name is `toro`

Installed as toro-queue on PyPI (the name toro was taken), but you import toro. See the docs for the architecture, the reliability model, and the detailed guides.

Pairs with matador, a live web dashboard for your queues.

Why toro

  • Async-native. Enqueue and process with async/await — no thread pools, no sync bridge. A natural fit for FastAPI, aiohttp, or any asyncio app.
  • Atomic by construction. Claims, retries, promotions and finishes are Lua scripts, so a job can't be lost or double-committed between two round trips.
  • At-least-once delivery. Per-job locks + a background mark-and-sweep recover jobs from workers that crashed — without the visibility-timeout double-delivery trap of some other queues.
  • Typed. Ships py.typed; the public API is fully annotated.

Features

Enqueue delayed jobs, global priorities (FIFO within a band)
Retries fixed or exponential backoff, capped attempts
Schedules repeatable cron and fixed-interval (every) jobs
Rate limiting queue-wide token bucket shared across all workers
Dedup custom (idempotent) job ids + a throttle window ({id, ttl})
Auto-removal keep the last N and/or finished-within-age completed/failed
Reliability per-job locks, lock renewal, stalled-job recovery
Observability progress, per-job logs, lifecycle events, await result()
Lifecycle pause / resume, graceful shutdown that drains in-flight jobs
Dashboard matador — a live web UI

Quick start

import asyncio
from toro import Queue, Worker

async def main():
    queue = Queue("emails")
    await queue.add("welcome", {"to": "ada@example.com"})

    async def process(job):
        print("sending", job.data)
        return {"ok": True}

    worker = Worker("emails", process, concurrency=8)
    worker.on("completed", lambda job, result: print("done", job.id))
    await worker.run()

asyncio.run(main())

A taste of the options

# Priorities, delay, and retry-with-backoff
await queue.add("report", data, priority=10, delay=5000,
                attempts=5, backoff={"type": "exponential", "delay": 1000})

# Idempotent custom id (a second add with the same id is ignored)
await queue.add("charge", data, job_id="order-1234")

# A repeatable schedule (cron or every-N-ms); "run now" with trigger_scheduler
await queue.add_scheduler("nightly-rollup", cron="0 0 * * *")

# Queue-wide rate limit: at most 100 jobs / second across every worker
worker = Worker("emails", process, rate_limit={"max": 100, "duration": 1000})

# Wait for a result from the producer side
job = await queue.add("resize", {"src": "a.png"})
print(await job.result(timeout=30))

Develop

Managed with uv; the Astral toolchain throughout.

uv sync                          # venv + deps + dev group
uv run ruff check .              # lint  (strict: select = ALL)
uv run ruff format .             # format
uv run ty check                  # type check
uv run pytest -m "unit or integration"   # tests (integration needs Redis on :6379)
uv run python examples/basic.py

The suite is a pyramid — -m unit (fast, no Redis), -m integration (Redis), and -m load (the open-loop benchmark harness in tests/load/).

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

toro_queue-0.2.0.tar.gz (99.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

toro_queue-0.2.0-py3-none-any.whl (32.1 kB view details)

Uploaded Python 3

File details

Details for the file toro_queue-0.2.0.tar.gz.

File metadata

  • Download URL: toro_queue-0.2.0.tar.gz
  • Upload date:
  • Size: 99.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for toro_queue-0.2.0.tar.gz
Algorithm Hash digest
SHA256 f307ac07dc86f68dac4b7fcb6497bdb423aea0f23d7f5f3d32d3a8056aec2a22
MD5 8333b9f373e6accaa22ee343b6b4cf25
BLAKE2b-256 a79b662067547697e9c51d6a79c54524e602d5b4c512cc7b82c98933749b0d5d

See more details on using hashes here.

Provenance

The following attestation bundles were made for toro_queue-0.2.0.tar.gz:

Publisher: release.yml on ilovepixelart/toro

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file toro_queue-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: toro_queue-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 32.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for toro_queue-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f463dadb8e4e629a53df8ac31efc2b50153ff5c2ab3d27f0a2a8049cb3a376f4
MD5 45169ee62dc9d5734786736d065895fb
BLAKE2b-256 814cf69f5c7d22f2f7fe7c1010bda45905f58591989794a097aa250b15735895

See more details on using hashes here.

Provenance

The following attestation bundles were made for toro_queue-0.2.0-py3-none-any.whl:

Publisher: release.yml on ilovepixelart/toro

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page