Skip to main content

FastAPI + ARQ: async worker queue integration without boilerplate

Project description

🇬🇧 fastapi-arq – FastAPI + ARQ without boilerplate

**🇬🇧 English** · [🇷🇺 Русский](README.ru.md) · [🇨🇳 简体中文](README.zh-CN.md) · [🇹🇼 繁體中文](README.zh-TW.md) · [🇪🇸 Español](README.es.md) · [🇫🇷 Français](README.fr.md) · [🇩🇪 Deutsch](README.de.md) · [🇯🇵 日本語](README.ja.md) · [🇰🇷 한국어](README.ko.md) · [🇸🇦 العربية](README.ar.md) · [🇵🇹 Português](README.pt.md) · [🇮🇹 Italiano](README.it.md) · [🇳🇱 Nederlands](README.nl.md) · [🇵🇱 Polski](README.pl.md) · [🇹🇷 Türkçe](README.tr.md) · [🇻🇳 Tiếng Việt](README.vi.md) · [🇹🇭 ไทย](README.th.md) · [🇮🇩 Bahasa Indonesia](README.id.md) · [🇮🇳 हिन्दी](README.hi.md) · [🇺🇦 Українська](README.uk.md)


python fastapi arq license tests


📖 About

fastapi-arq is a lightweight decorator-style wrapper that integrates arq (an in-process async Redis queue) into FastAPI applications. It eliminates repetitive boilerplate: starting/stopping the worker via FastAPI lifespan, injecting DI dependencies (Depends(...)) into task functions, typed job enqueuing, and a built-in health-check endpoint.

🔍 The Problem

Every FastAPI project that uses arq ends up writing the same glue code: manually starting/stopping arq.Worker in the lifespan, creating Redis connection pools, injecting FastAPI dependencies (DB session, S3 client) into worker tasks, writing health-check endpoints, and handling unique job scheduling, retries, and timeouts. This code is copied from project to project, leading to fragmentation and maintenance burden.

✅ The Solution

fastapi-arq is the glue between FastAPI and arq — not a replacement. One-liner setup: FastArq(app, 'redis://...'). Declarative tasks: @arq.task(max_tries=3). DI injection: Depends(get_db) in worker context. Typed enqueue: arq_redis.enqueue(my_task, user_id=42). Built-in health: /arq/health, /live, /ready.

🧩 Components

Component Purpose
FastArq Main class. Registers workers, manages lifespan, exposes health router.
@arq.task Decorator. Registers functions + settings (max_tries, timeout, keep_result, unique).
@arq.cron Decorator. Registers periodic jobs (month, day, hour, minute — same params as arq).
ArqRedis Extended arq.connections.ArqRedis with typed enqueue() and enqueue_unique().
ContextBuilder Resolves Depends(...) into a dict passed to every task as ctx.
WorkerManager Starts/stops arq.Worker as an async task. Cancellation-safe.
ArqWorker / WorkerSettings Run the worker in a separate process for production isolation.
HealthRouter FastAPI APIRouter with /health (ping + latency), /live, /ready endpoints.

📦 Requirements

  • Python 3.11+
  • FastAPI ≥ 0.111.0
  • arq ≥ 0.28.0
  • Redis server (any version ≥ 6)

🛠️ Installation

pip install fastapi-arq
# or with dev extras
pip install "fastapi-arq[dev]"

🚀 Quick Start

from fastapi import FastAPI, Depends
from fastapi_arq import FastArq

app = FastAPI()

arq = FastArq(app, redis_settings="redis://localhost:6379")

@arq.task(max_tries=3, timeout=120)
async def send_welcome_email(ctx: dict, user_id: int):
    db = ctx["db"]
    user = await db.get(User, user_id)
    ...

@app.post("/users")
async def create_user(arq_redis=Depends(arq.get_redis)):
    ...
    await arq_redis.enqueue(send_welcome_email, user_id=user.id)
    return {"ok": True}

app.include_router(arq.health_router)

🆕 v0.2.0 Features

Cron jobs

@arq.cron(hour=6, minute=0)
async def daily_report(ctx: dict) -> None:
    ...

@arq.cron(hour={9, 18}, minute=0, run_at_startup=True)
async def sync_data(ctx: dict) -> None:
    ...

External worker (production)

# app.py — enqueue-only process
arq = FastArq(app, "redis://...", run_worker=False)

# worker.py — standalone worker process
import asyncio
from myapp import arq
asyncio.run(arq.worker_runner.run())

📋 FAQ

Q: Does fastapi-arq replace arq?

No. fastapi-arq is a thin layer on top of arq. You still need arq and Redis. Think of it as 'FastAPI sugar for arq' — it handles the wiring so you don't have to.

Q: Can I use my own existing lifespan?

Yes. FastArq preserves and wraps any existing lifespan context manager. Your startup/shutdown logic runs alongside the worker lifecycle.

Q: How do I pass a DB session to tasks?

Pass context_dependencies=[Depends(get_db)] to FastArq. The builder resolves the dependency at worker startup and puts it in the ctx dict. Access as ctx['get_db'] in your task. Supports sync, async, and async-generator dependencies.

Q: Does fastapi-arq run workers in separate processes?

Yes. Pass run_worker=False to FastArq(app, ..., run_worker=False) and run arq.worker_runner.run() in a separate process. See External Worker.

💡 Common Patterns & Tips

  • Unique jobs: Use enqueue_unique(my_task, arg=val) to prevent duplicate scheduling. Uses SETNX with MD5-hashed arguments and configurable TTL.
  • Graceful shutdown: The worker._shutdown event is triggered on cancel; ongoing jobs are awaited via asyncio.gather with a 10-second timeout.
  • Context naming: The dependency key comes from name of the callable: Depends(get_db) → ctx['get_db'].
  • Cron jobs: Use @arq.cron(hour=6, minute=0) for periodic tasks. Also registered as regular tasks for manual enqueue.
  • External worker: Production deployment with run_worker=False + arq.worker_runner.run() in a separate process.
  • Testing: Use fakeredis for offline testing. The library includes pytest fixtures and async test helpers.

📚 Guides

Detailed documentation for every component and feature:

📄 License

MIT — use freely in personal and commercial projects.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_arq-0.3.0.tar.gz (24.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_arq-0.3.0-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_arq-0.3.0.tar.gz.

File metadata

  • Download URL: fastapi_arq-0.3.0.tar.gz
  • Upload date:
  • Size: 24.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fastapi_arq-0.3.0.tar.gz
Algorithm Hash digest
SHA256 ecb4c3a4e7bcb9f1725967325f874d71e8165257a055c78e2e91bde8cee8046d
MD5 1ad00f7c61815cadff65abc5f63cd120
BLAKE2b-256 c826f9b0b73fb2723a1f8a69be5d51e5c0e095aa5a4665557438a5b398c3cf47

See more details on using hashes here.

File details

Details for the file fastapi_arq-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: fastapi_arq-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 15.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fastapi_arq-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 135d32fe765913be75e74cdc64b31ce658c690f1e64e9e968619c6054ce77965
MD5 f368091e78515636833cf690cdf47015
BLAKE2b-256 8cfabc06d69f21661e09c13d88f021d6906e1539f8447b25a4e0c371075dd761

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page