Fast, async, fully-typed distributed task queue via Redis streams
Project description
streaQ
Fast, async, fully-typed distributed task queue via Redis streams
Features
- Up to 5x faster than
arq - Fully typed
- Comprehensive documentation
- Support for delayed/scheduled tasks
- Cron jobs
- Task middleware
- Task dependency graph
- Pipelining
- Priority queues
- Support for synchronous tasks (run in separate threads)
- Redis Sentinel & Cluster support for production
- Built-in web UI for monitoring tasks
- Built with structured concurrency on
anyio, supports bothasyncioandtrio
[!TIP] Sick of
redis-py? Check out coredis, a fast, fully-typed Redis client that supports Trio!
Installation
$ pip install streaq
Getting started
To start, you'll need to create a Worker object:
from streaq import Worker
worker = Worker(redis_url="redis://localhost:6379", anyio_backend="trio")
You can then register async tasks with the worker like this:
import trio
@worker.task
async def sleeper(time: int) -> int:
await trio.sleep(time)
return time
@worker.cron("* * * * mon-fri") # every minute on weekdays
async def cronjob() -> None:
print("Nobody respects the spammish repetition!")
Finally, let's use the worker's async context manager to queue up some tasks:
async with worker:
await sleeper.enqueue(3)
# enqueue returns a task object that can be used to get results/info
task = await sleeper.enqueue(1).start(delay=3)
print(await task.info())
print(await task.result(timeout=5))
Putting this all together gives us example.py. Let's spin up a worker:
$ streaq run example:worker
and queue up some tasks like so:
$ python example.py
Let's see what the output looks like:
[INFO] 2025-09-23 02:14:30: starting worker 3265311d for 2 functions
[INFO] 2025-09-23 02:14:35: task sleeper □ cf0c55387a214320bd23e8987283a562 → worker 3265311d
[INFO] 2025-09-23 02:14:38: task sleeper ■ cf0c55387a214320bd23e8987283a562 ← 3
[INFO] 2025-09-23 02:14:40: task sleeper □ 1de3f192ee4a40d4884ebf303874681c → worker 3265311d
[INFO] 2025-09-23 02:14:41: task sleeper ■ 1de3f192ee4a40d4884ebf303874681c ← 1
[INFO] 2025-09-23 02:15:00: task cronjob □ 2a4b864e5ecd4fc99979a92f5db3a6e0 → worker 3265311d
Nobody respects the spammish repetition!
[INFO] 2025-09-23 02:15:00: task cronjob ■ 2a4b864e5ecd4fc99979a92f5db3a6e0 ← None
TaskInfo(fn_name='sleeper', enqueue_time=1751508876961, tries=0, scheduled=datetime.datetime(2025, 7, 3, 2, 14, 39, 961000, tzinfo=datetime.timezone.utc), dependencies=set(), dependents=set())
TaskResult(fn_name='sleeper', enqueue_time=1751508876961, success=True, start_time=1751508880500, finish_time=1751508881503, tries=1, worker_id='ca5bd9eb', _result=1)
For more examples, check out the documentation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streaq-6.3.4.tar.gz.
File metadata
- Download URL: streaq-6.3.4.tar.gz
- Upload date:
- Size: 36.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
806ff64e3f4039990151aa821f7967b2ab9edf2c7f086be146985bd024ef647e
|
|
| MD5 |
3b2ac4cc8f0604ce2c6455c9b7660d81
|
|
| BLAKE2b-256 |
f63eb1057014377d4278f96462ddd2f5d0fa87fc6a9fbc2754382ba22badfc2d
|
File details
Details for the file streaq-6.3.4-py3-none-any.whl.
File metadata
- Download URL: streaq-6.3.4-py3-none-any.whl
- Upload date:
- Size: 43.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
79dadc40aeed1e43ee7b5fbeb0e645b467f6d403f0397497e3cca555264c4e14
|
|
| MD5 |
d036f179fa82eff5cefbbe343b7f4403
|
|
| BLAKE2b-256 |
e567e31d1cd59b31b7e2fdc14960a5b71c62c6d20da741b144c06cef57d14ebc
|