Skip to main content

An async-first, Redis-backed job queue for Python.

Project description

toro 🐂

An async-first, Redis-backed job queue for Python. Every state transition is an atomic Lua script; producing and processing are asyncio end to end.

Python
PyPI Downloads License
Coverage Quality Gate Status
Reliability Rating Maintainability Rating Security Rating

pip install toro-queue      # the import name is `toro`

Installed as toro-queue on PyPI (the name toro was taken), but you import toro. See the docs for the architecture, the reliability model, and the detailed guides.

Pairs with matador, a live web dashboard for your queues.

Why toro

  • Async-native. Enqueue and process with async/await - no thread pools, no sync bridge. A natural fit for FastAPI, aiohttp, or any asyncio app.
  • Atomic by construction. Claims, retries, promotions and finishes are Lua scripts, so a job can't be lost or double-committed between two round trips.
  • At-least-once delivery. Per-job locks + a background mark-and-sweep recover jobs from workers that crashed - without the visibility-timeout double-delivery trap of some other queues.
  • Typed. Ships py.typed; the public API is fully annotated.

Features

Enqueue delayed jobs, global priorities (FIFO within a band)
Retries fixed or exponential backoff, capped attempts
Schedules repeatable cron and fixed-interval (every) jobs
Rate limiting queue-wide token bucket shared across all workers
Dedup custom (idempotent) job ids + a throttle window ({id, ttl})
Auto-removal keep the last N and/or finished-within-age completed/failed
Reliability per-job locks, lock renewal, stalled-job recovery
Observability progress, per-job logs, lifecycle events, await result()
Lifecycle pause / resume, graceful shutdown that drains in-flight jobs
Dashboard matador - a live web UI

Quick start

import asyncio
from toro import Queue, Worker

async def main():
    queue = Queue("emails")
    await queue.add("welcome", {"to": "ada@example.com"})

    async def process(job):
        print("sending", job.data)
        return {"ok": True}

    worker = Worker("emails", process, concurrency=8)
    worker.on("completed", lambda job, result: print("done", job.id))
    await worker.run()

asyncio.run(main())

A taste of the options

# Priorities, delay, and retry-with-backoff
await queue.add("report", data, priority=10, delay=5000,
                attempts=5, backoff={"type": "exponential", "delay": 1000})

# Idempotent custom id (a second add with the same id is ignored)
await queue.add("charge", data, job_id="order-1234")

# A repeatable schedule (cron or every-N-ms); "run now" with trigger_scheduler
await queue.add_scheduler("nightly-rollup", cron="0 0 * * *")

# Queue-wide rate limit: at most 100 jobs / second across every worker
worker = Worker("emails", process, rate_limit={"max": 100, "duration": 1000})

# Wait for a result from the producer side
job = await queue.add("resize", {"src": "a.png"})
print(await job.result(timeout=30))

Develop

Managed with uv; the Astral toolchain throughout.

uv sync                          # venv + deps + dev group
uv run ruff check .              # lint  (strict: select = ALL)
uv run ruff format .             # format
uv run ty check                  # type check
uv run pytest -m "unit or integration"   # tests (integration needs Redis on :6379)
uv run python examples/basic.py

The suite is a pyramid - -m unit (fast, no Redis), -m integration (Redis), and -m load (the open-loop benchmark harness in tests/load/).

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

toro_queue-0.3.0.tar.gz (107.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

toro_queue-0.3.0-py3-none-any.whl (35.5 kB view details)

Uploaded Python 3

File details

Details for the file toro_queue-0.3.0.tar.gz.

File metadata

  • Download URL: toro_queue-0.3.0.tar.gz
  • Upload date:
  • Size: 107.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for toro_queue-0.3.0.tar.gz
Algorithm Hash digest
SHA256 0676315ae949deb79332d56d040f52f7a250dc73041114b92bdd0574722277df
MD5 d6f59fb6a7ac2e873802076d179acf9c
BLAKE2b-256 6cb418802d5b85ce9e133075d6e5f58750de610467e61eb5d3029cd5e9587ca9

See more details on using hashes here.

Provenance

The following attestation bundles were made for toro_queue-0.3.0.tar.gz:

Publisher: release.yml on ilovepixelart/toro

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file toro_queue-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: toro_queue-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 35.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for toro_queue-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b845e8090f396683469cd090c65c8853a9a9e74e0789e907f436cc0e7cb61072
MD5 8a4950c5e39138c4d40a65af1d0b96af
BLAKE2b-256 bb13a3c965bdcf90fa30e2f2dda43f7a3597025b7339da0ac41f755eb1bf781b

See more details on using hashes here.

Provenance

The following attestation bundles were made for toro_queue-0.3.0-py3-none-any.whl:

Publisher: release.yml on ilovepixelart/toro

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page