FastAPI + ARQ: async worker queue integration without boilerplate
Project description
🇬🇧 fastapi-arq – FastAPI + ARQ without boilerplate
**🇬🇧 English** · [🇷🇺 Русский](README.ru.md) · [🇨🇳 简体中文](README.zh-CN.md) · [🇹🇼 繁體中文](README.zh-TW.md) · [🇪🇸 Español](README.es.md) · [🇫🇷 Français](README.fr.md) · [🇩🇪 Deutsch](README.de.md) · [🇯🇵 日本語](README.ja.md) · [🇰🇷 한국어](README.ko.md) · [🇸🇦 العربية](README.ar.md) · [🇵🇹 Português](README.pt.md) · [🇮🇹 Italiano](README.it.md) · [🇳🇱 Nederlands](README.nl.md) · [🇵🇱 Polski](README.pl.md) · [🇹🇷 Türkçe](README.tr.md) · [🇻🇳 Tiếng Việt](README.vi.md) · [🇹🇭 ไทย](README.th.md) · [🇮🇩 Bahasa Indonesia](README.id.md) · [🇮🇳 हिन्दी](README.hi.md) · [🇺🇦 Українська](README.uk.md)
📖 About
fastapi-arq is a lightweight decorator-style wrapper that integrates arq (an in-process async Redis queue) into FastAPI applications. It eliminates repetitive boilerplate: starting/stopping the worker via FastAPI lifespan, injecting DI dependencies (Depends(...)) into task functions, typed job enqueuing, and a built-in health-check endpoint.
🔍 The Problem
Every FastAPI project that uses arq ends up writing the same glue code: manually starting/stopping arq.Worker in the lifespan, creating Redis connection pools, injecting FastAPI dependencies (DB session, S3 client) into worker tasks, writing health-check endpoints, and handling unique job scheduling, retries, and timeouts. This code is copied from project to project, leading to fragmentation and maintenance burden.
✅ The Solution
fastapi-arq is the glue between FastAPI and arq — not a replacement. One-liner setup: FastArq(app, 'redis://...'). Declarative tasks: @arq.task(max_tries=3). DI injection: Depends(get_db) in worker context. Typed enqueue: arq_redis.enqueue(my_task, user_id=42). Built-in health: /arq/health, /live, /ready.
🧩 Components
| Component | Purpose |
|---|---|
FastArq |
Main class. Registers workers, manages lifespan, exposes health router. |
@arq.task |
Decorator. Registers functions + settings (max_tries, timeout, keep_result, unique). |
ArqRedis |
Extended arq.connections.ArqRedis with typed enqueue() and enqueue_unique(). |
ContextBuilder |
Resolves Depends(...) into a dict passed to every task as ctx. |
WorkerManager |
Starts/stops arq.Worker as an async task. Cancellation-safe. |
HealthRouter |
FastAPI APIRouter with /health (ping + latency), /live, /ready endpoints. |
📦 Requirements
- Python 3.11+
- FastAPI ≥ 0.111.0
- arq ≥ 0.28.0
- Redis server (any version ≥ 6)
🛠️ Installation
pip install fastapi-arq
# or with dev extras
pip install "fastapi-arq[dev]"
🚀 Quick Start
from fastapi import FastAPI, Depends
from fastapi_arq import FastArq
app = FastAPI()
arq = FastArq(app, redis_settings="redis://localhost:6379")
@arq.task(max_tries=3, timeout=120)
async def send_welcome_email(ctx: dict, user_id: int):
db = ctx["db"]
user = await db.get(User, user_id)
...
@app.post("/users")
async def create_user(arq_redis=Depends(arq.get_redis)):
...
await arq_redis.enqueue(send_welcome_email, user_id=user.id)
return {"ok": True}
app.include_router(arq.health_router)
📋 FAQ
Q: Does fastapi-arq replace arq?
No. fastapi-arq is a thin layer on top of arq. You still need arq and Redis. Think of it as 'FastAPI sugar for arq' — it handles the wiring so you don't have to.
Q: Can I use my own existing lifespan?
Yes. FastArq preserves and wraps any existing lifespan context manager. Your startup/shutdown logic runs alongside the worker lifecycle.
Q: How do I pass a DB session to tasks?
Pass context_dependencies=[Depends(get_db)] to FastArq. The builder resolves the dependency at worker startup and puts it in the ctx dict. Access as ctx['get_db'] in your task. Supports sync, async, and async-generator dependencies.
Q: Does fastapi-arq run workers in separate processes?
In the current version, the worker runs in-process (same asyncio event loop as the FastAPI app). Multi-process support is planned for future releases.
💡 Common Patterns & Tips
- Unique jobs: Use enqueue_unique(my_task, arg=val) to prevent duplicate scheduling. Uses SETNX with MD5-hashed arguments and configurable TTL.
- Graceful shutdown: The worker._shutdown event is triggered on cancel; ongoing jobs are awaited via asyncio.gather with a 10-second timeout.
- Context naming: The dependency key comes from name of the callable: Depends(get_db) → ctx['get_db'].
- Testing: Use fakeredis for offline testing. The library includes pytest fixtures and async test helpers.
📚 Guides
Detailed documentation for every component and feature:
- API Reference — classes, methods, parameters
- Lifespan — lifecycle management, custom lifespan, shutdown order
- Context Dependencies — injecting DB sessions, clients into tasks
- Unique Jobs — preventing duplicate job scheduling
- Graceful Shutdown — cancellation, timeouts, cleanup
- Testing — offline testing with fakeredis, pytest fixtures
- FAQ & Troubleshooting — known issues, workarounds
📄 License
MIT — use freely in personal and commercial projects.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastapi_arq-0.1.0.tar.gz.
File metadata
- Download URL: fastapi_arq-0.1.0.tar.gz
- Upload date:
- Size: 16.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7148616ecd2c18e763c1a73d8692e2d24b38643c7094eb7140647341c554feb1
|
|
| MD5 |
d55971a1c37d570e1d631c28a76a8ebe
|
|
| BLAKE2b-256 |
50af84a57c917b1f48527179912973196c678218ab6945eed33e13de436b46c7
|
File details
Details for the file fastapi_arq-0.1.0-py3-none-any.whl.
File metadata
- Download URL: fastapi_arq-0.1.0-py3-none-any.whl
- Upload date:
- Size: 10.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4b33776afbcd28649204da88dfd0cd9093512bcb77cbe6170fff7a7c9ec88dd3
|
|
| MD5 |
0111fabd6fb1bcafb29896255478e60e
|
|
| BLAKE2b-256 |
316ba4e2150c63c47fe74eb9914380e5ebe0d4bf7ef25678ee67a549692b5435
|