Skip to main content

FastAPI + ARQ: async worker queue integration without boilerplate

Project description

🇬🇧 fastapi-arq – FastAPI + ARQ without boilerplate

**🇬🇧 English** · [🇷🇺 Русский](README.ru.md) · [🇨🇳 简体中文](README.zh-CN.md) · [🇹🇼 繁體中文](README.zh-TW.md) · [🇪🇸 Español](README.es.md) · [🇫🇷 Français](README.fr.md) · [🇩🇪 Deutsch](README.de.md) · [🇯🇵 日本語](README.ja.md) · [🇰🇷 한국어](README.ko.md) · [🇸🇦 العربية](README.ar.md) · [🇵🇹 Português](README.pt.md) · [🇮🇹 Italiano](README.it.md) · [🇳🇱 Nederlands](README.nl.md) · [🇵🇱 Polski](README.pl.md) · [🇹🇷 Türkçe](README.tr.md) · [🇻🇳 Tiếng Việt](README.vi.md) · [🇹🇭 ไทย](README.th.md) · [🇮🇩 Bahasa Indonesia](README.id.md) · [🇮🇳 हिन्दी](README.hi.md) · [🇺🇦 Українська](README.uk.md)


python fastapi arq license tests


📖 About

fastapi-arq is a lightweight decorator-style wrapper that integrates arq (an in-process async Redis queue) into FastAPI applications. It eliminates repetitive boilerplate: starting/stopping the worker via FastAPI lifespan, injecting DI dependencies (Depends(...)) into task functions, typed job enqueuing, and a built-in health-check endpoint.

🔍 The Problem

Every FastAPI project that uses arq ends up writing the same glue code: manually starting/stopping arq.Worker in the lifespan, creating Redis connection pools, injecting FastAPI dependencies (DB session, S3 client) into worker tasks, writing health-check endpoints, and handling unique job scheduling, retries, and timeouts. This code is copied from project to project, leading to fragmentation and maintenance burden.

✅ The Solution

fastapi-arq is the glue between FastAPI and arq — not a replacement. One-liner setup: FastArq(app, 'redis://...'). Declarative tasks: @arq.task(max_tries=3). DI injection: Depends(get_db) in worker context. Typed enqueue: arq_redis.enqueue(my_task, user_id=42). Built-in health: /arq/health, /live, /ready.

🧩 Components

Component Purpose
FastArq Main class. Registers workers, manages lifespan, exposes health router.
@arq.task Decorator. Registers functions + settings (max_tries, timeout, keep_result, unique).
@arq.cron Decorator. Registers periodic jobs (month, day, hour, minute — same params as arq).
ArqRedis Extended arq.connections.ArqRedis with typed enqueue() and enqueue_unique().
ContextBuilder Resolves Depends(...) into a dict passed to every task as ctx.
WorkerManager Starts/stops arq.Worker as an async task. Cancellation-safe.
ArqWorker / WorkerSettings Run the worker in a separate process for production isolation.
HealthRouter FastAPI APIRouter with /health (ping + latency), /live, /ready endpoints.

📦 Requirements

  • Python 3.11+
  • FastAPI ≥ 0.111.0
  • arq ≥ 0.28.0
  • Redis server (any version ≥ 6)

🛠️ Installation

pip install fastapi-arq
# or with dev extras
pip install "fastapi-arq[dev]"

🚀 Quick Start

from fastapi import FastAPI, Depends
from fastapi_arq import FastArq

app = FastAPI()

arq = FastArq(app, redis_settings="redis://localhost:6379")

@arq.task(max_tries=3, timeout=120)
async def send_welcome_email(ctx: dict, user_id: int):
    db = ctx["db"]
    user = await db.get(User, user_id)
    ...

@app.post("/users")
async def create_user(arq_redis=Depends(arq.get_redis)):
    ...
    await arq_redis.enqueue(send_welcome_email, user_id=user.id)
    return {"ok": True}

app.include_router(arq.health_router)

🆕 v0.2.0 Features

Cron jobs

@arq.cron(hour=6, minute=0)
async def daily_report(ctx: dict) -> None:
    ...

@arq.cron(hour={9, 18}, minute=0, run_at_startup=True)
async def sync_data(ctx: dict) -> None:
    ...

External worker (production)

# app.py — enqueue-only process
arq = FastArq(app, "redis://...", run_worker=False)

# worker.py — standalone worker process
import asyncio
from myapp import arq
asyncio.run(arq.worker_runner.run())

📋 FAQ

Q: Does fastapi-arq replace arq?

No. fastapi-arq is a thin layer on top of arq. You still need arq and Redis. Think of it as 'FastAPI sugar for arq' — it handles the wiring so you don't have to.

Q: Can I use my own existing lifespan?

Yes. FastArq preserves and wraps any existing lifespan context manager. Your startup/shutdown logic runs alongside the worker lifecycle.

Q: How do I pass a DB session to tasks?

Pass context_dependencies=[Depends(get_db)] to FastArq. The builder resolves the dependency at worker startup and puts it in the ctx dict. Access as ctx['get_db'] in your task. Supports sync, async, and async-generator dependencies.

Q: Does fastapi-arq run workers in separate processes?

Yes. Pass run_worker=False to FastArq(app, ..., run_worker=False) and run arq.worker_runner.run() in a separate process. See External Worker.

💡 Common Patterns & Tips

  • Unique jobs: Use enqueue_unique(my_task, arg=val) to prevent duplicate scheduling. Uses SETNX with MD5-hashed arguments and configurable TTL.
  • Graceful shutdown: The worker._shutdown event is triggered on cancel; ongoing jobs are awaited via asyncio.gather with a 10-second timeout.
  • Context naming: The dependency key comes from name of the callable: Depends(get_db) → ctx['get_db'].
  • Cron jobs: Use @arq.cron(hour=6, minute=0) for periodic tasks. Also registered as regular tasks for manual enqueue.
  • External worker: Production deployment with run_worker=False + arq.worker_runner.run() in a separate process.
  • Testing: Use fakeredis for offline testing. The library includes pytest fixtures and async test helpers.

📚 Guides

Detailed documentation for every component and feature:

📄 License

MIT — use freely in personal and commercial projects.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_arq-0.2.1.tar.gz (20.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_arq-0.2.1-py3-none-any.whl (12.2 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_arq-0.2.1.tar.gz.

File metadata

  • Download URL: fastapi_arq-0.2.1.tar.gz
  • Upload date:
  • Size: 20.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fastapi_arq-0.2.1.tar.gz
Algorithm Hash digest
SHA256 4032986efaf5ef122790d240271a3d5cba5dedd99ef348277c57b127801d0441
MD5 c4fc6c8d97229b25f066cbbe955498e4
BLAKE2b-256 face5879db5209e3b81d45987f744fc2dfc9f3b5dbe648f751b4cfe46ab37da2

See more details on using hashes here.

File details

Details for the file fastapi_arq-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: fastapi_arq-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 12.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fastapi_arq-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 66f0c7cf5ee49eadd2a0de0ae34315a1611b2164071f707d739f600e22d3f145
MD5 53a3ad82cff57fe959e991af439eac62
BLAKE2b-256 ecf61ecc40b55833a854591653e387672998816bfcf0915c5d47bb0401a1620a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page