Skip to main content

Fast, async, fully-typed distributed task queue via Redis streams

Project description

Docs PyPI Downloads Release Coverage Gitter

streaQ

Fast, async, fully-typed distributed task queue via Redis streams

Features

  • Up to 5x faster than arq
  • Fully typed
  • Comprehensive documentation
  • Support for delayed/scheduled tasks
  • Cron jobs
  • Task middleware
  • Task dependency graph
  • Pipelining
  • Priority queues
  • Support for synchronous tasks (run in separate threads)
  • Redis Sentinel & Cluster support for production
  • Built-in web UI for monitoring tasks
  • Built with structured concurrency on anyio, supports both asyncio and trio

[!TIP] Sick of redis-py? Check out coredis, a fast, fully-typed Redis client that supports Trio!

Installation

$ pip install streaq

Getting started

To start, you'll need to create a Worker object:

from streaq import Worker

worker = Worker(redis_url="redis://localhost:6379", anyio_backend="trio")

You can then register async tasks with the worker like this:

import trio

@worker.task()
async def sleeper(time: int) -> int:
    await trio.sleep(time)
    return time

@worker.cron("* * * * mon-fri")  # every minute on weekdays
async def cronjob() -> None:
    print("Nobody respects the spammish repetition!")

Finally, let's use the worker's async context manager to queue up some tasks:

async with worker:
    await sleeper.enqueue(3)
    # enqueue returns a task object that can be used to get results/info
    task = await sleeper.enqueue(1).start(delay=3)
    print(await task.info())
    print(await task.result(timeout=5))

Putting this all together gives us example.py. Let's spin up a worker:

$ streaq run example:worker

and queue up some tasks like so:

$ python example.py

Let's see what the output looks like:

[INFO] 2025-09-23 02:14:30: starting worker 3265311d for 2 functions
[INFO] 2025-09-23 02:14:35: task sleeper □ cf0c55387a214320bd23e8987283a562 → worker 3265311d
[INFO] 2025-09-23 02:14:38: task sleeper ■ cf0c55387a214320bd23e8987283a562 ← 3
[INFO] 2025-09-23 02:14:40: task sleeper □ 1de3f192ee4a40d4884ebf303874681c → worker 3265311d
[INFO] 2025-09-23 02:14:41: task sleeper ■ 1de3f192ee4a40d4884ebf303874681c ← 1
[INFO] 2025-09-23 02:15:00: task cronjob □ 2a4b864e5ecd4fc99979a92f5db3a6e0 → worker 3265311d
Nobody respects the spammish repetition!
[INFO] 2025-09-23 02:15:00: task cronjob ■ 2a4b864e5ecd4fc99979a92f5db3a6e0 ← None
TaskInfo(fn_name='sleeper', enqueue_time=1751508876961, tries=0, scheduled=datetime.datetime(2025, 7, 3, 2, 14, 39, 961000, tzinfo=datetime.timezone.utc), dependencies=set(), dependents=set())
TaskResult(fn_name='sleeper', enqueue_time=1751508876961, success=True, start_time=1751508880500, finish_time=1751508881503, tries=1, worker_id='ca5bd9eb', _result=1)

For more examples, check out the documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streaq-6.1.0.tar.gz (32.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streaq-6.1.0-py3-none-any.whl (38.1 kB view details)

Uploaded Python 3

File details

Details for the file streaq-6.1.0.tar.gz.

File metadata

  • Download URL: streaq-6.1.0.tar.gz
  • Upload date:
  • Size: 32.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for streaq-6.1.0.tar.gz
Algorithm Hash digest
SHA256 13b8489d3498650ba7a58e5c16dff84d0f0754d6f04e3ac6411d926aab415e49
MD5 8ec8a463a881df24187a131eb643e47a
BLAKE2b-256 baf33e9004877c0c5be316e5a9c59672db549c43390b6a8d66664021c892ad24

See more details on using hashes here.

File details

Details for the file streaq-6.1.0-py3-none-any.whl.

File metadata

  • Download URL: streaq-6.1.0-py3-none-any.whl
  • Upload date:
  • Size: 38.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for streaq-6.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d5fa354e3aba71adb9ffcec6c0d5571d564f2e158386ff2dee2ddae433a5d9dc
MD5 d61689cae19b0be1d2cc3711bf65a54d
BLAKE2b-256 1d230f88457383a8d0fa32cf48f109c26eaebdbb72c7e17359f21e3cb71d22e1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page