Skip to main content

FastAPI + ARQ: async worker queue integration without boilerplate

Project description

🇬🇧 fastapi-arq – FastAPI + ARQ without boilerplate

**🇬🇧 English** · [🇷🇺 Русский](README.ru.md) · [🇨🇳 简体中文](README.zh-CN.md) · [🇹🇼 繁體中文](README.zh-TW.md) · [🇪🇸 Español](README.es.md) · [🇫🇷 Français](README.fr.md) · [🇩🇪 Deutsch](README.de.md) · [🇯🇵 日本語](README.ja.md) · [🇰🇷 한국어](README.ko.md) · [🇸🇦 العربية](README.ar.md) · [🇵🇹 Português](README.pt.md) · [🇮🇹 Italiano](README.it.md) · [🇳🇱 Nederlands](README.nl.md) · [🇵🇱 Polski](README.pl.md) · [🇹🇷 Türkçe](README.tr.md) · [🇻🇳 Tiếng Việt](README.vi.md) · [🇹🇭 ไทย](README.th.md) · [🇮🇩 Bahasa Indonesia](README.id.md) · [🇮🇳 हिन्दी](README.hi.md) · [🇺🇦 Українська](README.uk.md)


python fastapi arq license tests


📖 About

fastapi-arq is a lightweight decorator-style wrapper that integrates arq (an in-process async Redis queue) into FastAPI applications. It eliminates repetitive boilerplate: starting/stopping the worker via FastAPI lifespan, injecting DI dependencies (Depends(...)) into task functions, typed job enqueuing, and a built-in health-check endpoint.

🔍 The Problem

Every FastAPI project that uses arq ends up writing the same glue code: manually starting/stopping arq.Worker in the lifespan, creating Redis connection pools, injecting FastAPI dependencies (DB session, S3 client) into worker tasks, writing health-check endpoints, and handling unique job scheduling, retries, and timeouts. This code is copied from project to project, leading to fragmentation and maintenance burden.

✅ The Solution

fastapi-arq is the glue between FastAPI and arq — not a replacement. One-liner setup: FastArq(app, 'redis://...'). Declarative tasks: @arq.task(max_tries=3). DI injection: Depends(get_db) in worker context. Typed enqueue: arq_redis.enqueue(my_task, user_id=42). Built-in health: /arq/health, /live, /ready.

🧩 Components

Component Purpose
FastArq Main class. Registers workers, manages lifespan, exposes health router.
@arq.task Decorator. Registers functions + settings (max_tries, timeout, keep_result, unique).
ArqRedis Extended arq.connections.ArqRedis with typed enqueue() and enqueue_unique().
ContextBuilder Resolves Depends(...) into a dict passed to every task as ctx.
WorkerManager Starts/stops arq.Worker as an async task. Cancellation-safe.
HealthRouter FastAPI APIRouter with /health (ping + latency), /live, /ready endpoints.

📦 Requirements

  • Python 3.11+
  • FastAPI ≥ 0.111.0
  • arq ≥ 0.28.0
  • Redis server (any version ≥ 6)

🛠️ Installation

pip install fastapi-arq
# or with dev extras
pip install "fastapi-arq[dev]"

🚀 Quick Start

from fastapi import FastAPI, Depends
from fastapi_arq import FastArq

app = FastAPI()

arq = FastArq(app, redis_settings="redis://localhost:6379")

@arq.task(max_tries=3, timeout=120)
async def send_welcome_email(ctx: dict, user_id: int):
    db = ctx["db"]
    user = await db.get(User, user_id)
    ...

@app.post("/users")
async def create_user(arq_redis=Depends(arq.get_redis)):
    ...
    await arq_redis.enqueue(send_welcome_email, user_id=user.id)
    return {"ok": True}

app.include_router(arq.health_router)

📋 FAQ

Q: Does fastapi-arq replace arq?

No. fastapi-arq is a thin layer on top of arq. You still need arq and Redis. Think of it as 'FastAPI sugar for arq' — it handles the wiring so you don't have to.

Q: Can I use my own existing lifespan?

Yes. FastArq preserves and wraps any existing lifespan context manager. Your startup/shutdown logic runs alongside the worker lifecycle.

Q: How do I pass a DB session to tasks?

Pass context_dependencies=[Depends(get_db)] to FastArq. The builder resolves the dependency at worker startup and puts it in the ctx dict. Access as ctx['get_db'] in your task. Supports sync, async, and async-generator dependencies.

Q: Does fastapi-arq run workers in separate processes?

In the current version, the worker runs in-process (same asyncio event loop as the FastAPI app). Multi-process support is planned for future releases.

💡 Common Patterns & Tips

  • Unique jobs: Use enqueue_unique(my_task, arg=val) to prevent duplicate scheduling. Uses SETNX with MD5-hashed arguments and configurable TTL.
  • Graceful shutdown: The worker._shutdown event is triggered on cancel; ongoing jobs are awaited via asyncio.gather with a 10-second timeout.
  • Context naming: The dependency key comes from name of the callable: Depends(get_db) → ctx['get_db'].
  • Testing: Use fakeredis for offline testing. The library includes pytest fixtures and async test helpers.

📚 Guides

Detailed documentation for every component and feature:

📄 License

MIT — use freely in personal and commercial projects.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_arq-0.1.0.tar.gz (16.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_arq-0.1.0-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_arq-0.1.0.tar.gz.

File metadata

  • Download URL: fastapi_arq-0.1.0.tar.gz
  • Upload date:
  • Size: 16.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fastapi_arq-0.1.0.tar.gz
Algorithm Hash digest
SHA256 7148616ecd2c18e763c1a73d8692e2d24b38643c7094eb7140647341c554feb1
MD5 d55971a1c37d570e1d631c28a76a8ebe
BLAKE2b-256 50af84a57c917b1f48527179912973196c678218ab6945eed33e13de436b46c7

See more details on using hashes here.

File details

Details for the file fastapi_arq-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: fastapi_arq-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 10.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fastapi_arq-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4b33776afbcd28649204da88dfd0cd9093512bcb77cbe6170fff7a7c9ec88dd3
MD5 0111fabd6fb1bcafb29896255478e60e
BLAKE2b-256 316ba4e2150c63c47fe74eb9914380e5ebe0d4bf7ef25678ee67a549692b5435

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page