AsyncTasQ: A modern, async-first, type-safe task queue Python package inspired by Laravel. Runs on uvloop for maximum performance. Native FastAPI integration. Switch between multiple queue backends (Redis, PostgreSQL, MySQL, RabbitMQ, AWS SQS) with one config line. Automatic, smart ORM serialization (SQLAlchemy, Django, Tortoise) using msgpack reduces payloads by 90%+.
Project description
AsyncTasQ
AsyncTasQ: A modern, async-first, type-safe task queue Python package inspired by Laravel. Runs on uvloop for maximum performance. Native FastAPI integration. Switch between multiple queue backends (Redis, PostgreSQL, MySQL, RabbitMQ, AWS SQS) with one config line. Automatic, smart ORM serialization (SQLAlchemy, Django, Tortoise) using msgpack reduces payloads by 90%+. Features ACID guarantees, dead-letter queues, crash recovery, and real-time event streaming.
Table of Contents
- Quick Start
- Documentation
- Examples
- Why AsyncTasQ?
- Key Features
- Comparison with Alternatives
- Contributing
- License
- Support
- Roadmap
Quick Start
Get started in 60 seconds:
Install AsyncTasQ (Python 3.12+ required)
uv add asynctasq[redis]
Note: This example requires a Redis server running on localhost:6379. If you don't have Redis installed, you can start one using Docker:
docker run -d --name redis-test -p 127.0.0.1:6379:6379 redis:8.0-alpine
Create a new file named quick_start.py and add the following code:
import asyncio
from asynctasq import AsyncTask, RedisConfig, TaskConfig, init, print, task
# 1. Configure AsyncTasQ
# Option A: Via code
init({"driver": "redis", "redis": RedisConfig(url="redis://localhost:6379")})
# Option B: Via environment variables
# Set environment variables: ASYNCTASQ_DRIVER=redis, ASYNCTASQ_REDIS_URL=redis://localhost:6379
# Then simply call: init()
# Option C: Via .env file (recommended for production)
# Create a .env file with:
# ASYNCTASQ_DRIVER=redis
# ASYNCTASQ_REDIS_URL=redis://localhost:6379
# Then simply call: init()
# 2. Define tasks (function-based or class-based)
# Function-based task
@task
async def send_email(to: str, subject: str, body: str):
# Use asynctasq.print() for beautiful Rich-formatted console output
print(f"[cyan]Sending email to[/cyan] [yellow]{to}[/yellow]: [bold]{subject}[/bold]")
await asyncio.sleep(1) # Simulate email sending
return f"Email sent to {to}"
# Class-based task with TaskConfig
class ProcessPayment(AsyncTask[bool]):
config: TaskConfig = {
"queue": "payments",
"max_attempts": 3,
"retry_delay": 60,
"timeout": 30,
"correlation_id": None,
}
def __init__(self, user_id: int, amount: float, **kwargs):
super().__init__(**kwargs)
self.user_id = user_id
self.amount = amount
async def execute(self) -> bool:
# Use asynctasq.print() for beautiful Rich-formatted console output
print(
f"[green]Processing[/green] [bold]${self.amount}[/bold] for user [cyan]{self.user_id}[/cyan]"
)
await asyncio.sleep(2) # Simulate payment processing
return True
# 3. Dispatch tasks with optional configuration chaining
async def main():
# Function-based task dispatch
for i in range(10):
task_id = await send_email(
to=f"user{i}@example.com", subject=f"Welcome {i}!", body="Welcome to our platform!"
).dispatch()
print(f"Task dispatched: {task_id}")
# Class-based task dispatch with TaskConfig defaults
payment_task_id = await ProcessPayment(user_id=123, amount=99.99).dispatch()
print(f"Payment task dispatched: {payment_task_id}")
# With configuration chaining (overrides TaskConfig)
critical_task_id = (
await send_email(to="admin@example.com", subject="Critical Alert", body="System issue!")
.on_queue("high-priority")
.max_attempts(10)
.timeout(60)
.dispatch()
)
print(f"Critical task dispatched: {critical_task_id}")
# Class-based with method chaining
urgent_payment_id = (
await ProcessPayment(user_id=456, amount=199.99)
.on_queue("urgent")
.max_attempts(5)
.timeout(120)
.dispatch()
)
print(f"Urgent payment dispatched: {urgent_payment_id}")
if __name__ == "__main__":
# AsyncTasQ works with any event loop - choose what works best for you:
# Option 1: Use AsyncTasQ's runner (automatic uvloop support)
from asynctasq import run
run(main())
# Option 2: Use standard asyncio
# import asyncio
# asyncio.run(main())
# Option 3: Use uvloop directly
# import uvloop
# uvloop.run(main())
Run the script to dispatch tasks:
uv run python quick_start.py
In a separate terminal, run the worker to process the tasks:
uv run asynctasq worker --queues urgent,high-priority,payments,default
That's it! Your first AsyncTasQ application is ready. AsyncTasQ works seamlessly with asyncio.run(), uvloop, FastAPI, Jupyter notebooks, or any other async framework - cleanup happens automatically when the event loop closes.
To stop and remove the Redis container when done:
docker stop redis-test && docker rm redis-test
Documentation
Comprehensive guides to get you started:
- Installation – Installation instructions for uv and pip
- Configuration – Complete configuration guide with
asynctasq.init()andConfig.get()- Configuration properties apply to different contexts: dispatch (when enqueuing tasks) vs worker (when processing tasks)
- Driver configs (redis, postgres, mysql, rabbitmq, sqs) and events config apply to both contexts
- Properties like
timeoutandvisibility_timeoutare configured per-task via TaskConfig
- Environment Variables – Complete guide to .env file support and environment variable configuration
- Task Definitions – Function-based and class-based tasks
- Queue Drivers – Redis, PostgreSQL, MySQL, RabbitMQ, AWS SQS
- Running Workers – CLI and programmatic workers
- Monitoring – Event streaming and queue statistics
- ORM Integrations – SQLAlchemy, Django, Tortoise ORM
- Framework Integrations – FastAPI integration
- CLI Reference – Complete command reference
- Best Practices – Task design, queue organization, production deployment
Examples
Complete code examples:
- Function-Based Tasks Examples – Decorators, configuration, and best practices
- Class-Based Tasks Examples – AsyncTask, SyncTask, ProcessTask variants
Why AsyncTasQ?
True Async-First Architecture
Unlike Celery and RQ which are built on synchronous foundations, AsyncTasQ is designed from the ground up for asyncio:
- Native async/await everywhere – All core operations use asyncio, no threading or blocking on critical paths
- Multiple execution modes – Choose between async I/O (event loop), sync I/O (thread pool), or CPU-bound (process pool) for each task
- High-performance concurrency – Process hundreds of tasks concurrently with minimal overhead using asyncio's efficient task scheduling
- Smart connection pooling – All drivers use connection pools optimized for async operations
- Non-blocking by design – Worker polling, task execution, and all I/O operations are truly non-blocking
- Universal event loop compatibility – AsyncTasQ works seamlessly with any event loop (asyncio, uvloop, or custom implementations). Cleanup hooks automatically attach to running loops, so you can use
asyncio.run(), uvloop, FastAPI, Jupyter notebooks, or any other async framework. The optionalasynctasq.utils.loop.runhelper provides uvloop support with automatic fallback to asyncio.
Intelligent Serialization & ORM Integration
- msgpack binary encoding – 2-3x faster than JSON with smaller payloads
- Automatic ORM model handling – Pass SQLAlchemy, Django ORM, or Tortoise ORM models directly as task arguments. AsyncTasQ automatically:
- Serializes them as lightweight references (primary key only)
- Reduces payload size by 90%+
- Re-fetches with fresh data when the task executes
- Supports all 3 major Python ORMs out of the box
- Smart type handling – Native support for
datetime,Decimal,UUID,set, and custom types without manual conversion
Enterprise-Grade Reliability
- ACID guarantees – PostgreSQL and MySQL drivers provide transactional dequeue with exactly-once processing semantics
- Built-in dead-letter queues – PostgreSQL/MySQL drivers automatically move permanently failed tasks to DLQ for inspection
- Crash recovery – Visibility timeouts ensure no task is lost even if workers crash mid-execution
- Graceful shutdown – SIGTERM/SIGINT handlers drain in-flight tasks before stopping (configurable timeout)
- Flexible retry strategies – Per-task retry configuration with custom
should_retry()hooks for intelligent retry logic - Task timeout protection – Prevent runaway tasks with configurable per-task timeouts
- Real-time observability – Redis Pub/Sub event streaming broadcasts task lifecycle events for monitoring dashboards
Developer Experience That Just Works
- Elegant, Laravel-inspired API – Clean, intuitive syntax that feels natural
- Full type safety – Complete type hints, mypy/pyright compatible, Generic
Task[T]for return type checking - Simple configuration – Use
asynctasq.init()andConfig.get()for all configuration needs - Two task styles – Choose function-based
@taskdecorators or class-based tasks with lifecycle hooks - Fluent method chaining – Configure tasks expressively:
.delay(60).on_queue("high").max_attempts(10).timeout(300).dispatch() - Beautiful console output – Built-in Rich integration provides colorized logging, syntax highlighting, and formatted output. Use
asynctasq.print()in tasks for gorgeous terminal displays with zero configuration - First-class FastAPI integration – Lifespan management, automatic connection pooling, native async support
Multi-Driver Flexibility Without Vendor Lock-In
- 5 production-ready drivers – Redis, PostgreSQL, MySQL, RabbitMQ, AWS SQS – all with the same API
- Switch with one line – Change
driver="redis"todriver="postgres"– no code changes needed - Per-task driver override – Use Redis for high-throughput tasks, PostgreSQL for ACID-critical tasks in the same application
- Same API, different guarantees – Choose the driver that matches your SLA requirements without rewriting code
Key Features
Core Capabilities
-
✅ Async-first design with asyncio throughout the stack
-
✅ Multiple queue drivers: Redis, PostgreSQL, MySQL, RabbitMQ, AWS SQS
-
✅ Environment variables & .env file support for easy configuration across environments
-
✅ High-performance msgpack serialization with binary support
-
✅ Automatic ORM model handling for SQLAlchemy, Django, Tortoise
-
✅ Type-safe with full type hints and Generic support
-
✅ Three execution modes: Async (I/O), Thread pool (moderate CPU), Process pool (heavy CPU)
-
✅ Beautiful console output with Rich library integration for colorized logs and formatted task output
-
✅ Configurable retries with custom retry logic hooks
-
✅ Task timeouts to prevent runaway tasks
-
✅ Delayed task execution with precision timing
-
✅ Queue priority with multiple queues per worker
-
✅ Graceful shutdown with signal handlers
Enterprise Features
-
✅ ACID guarantees (PostgreSQL/MySQL drivers)
-
✅ Dead-letter queues for failed task inspection
-
✅ Visibility timeouts for crash recovery
-
✅ Connection pooling for optimal resource usage
-
✅ Transactional dequeue with
SELECT FOR UPDATE SKIP LOCKED -
✅ Task metadata tracking (attempts, timestamps, task IDs)
-
✅ Concurrent processing with configurable worker concurrency
-
✅ Real-time event streaming via Redis Pub/Sub
Integrations
-
✅ FastAPI – Automatic lifecycle management, dependency injection
-
✅ SQLAlchemy – Async and sync model serialization
-
✅ Django ORM – Native async support (Django 3.1+)
-
✅ Tortoise ORM – Full async ORM integration
-
✅ asynctasq-monitor – Real-time monitoring dashboard (optional)
Developer Tools
-
✅ Comprehensive CLI – Worker management and database migrations
-
✅ Function-based tasks with
@taskdecorator -
✅ Class-based tasks with 4 execution modes:
AsyncTask– Async I/O-bound (API calls, async DB queries)SyncTask– Sync I/O-bound via thread pool (requests, sync DB drivers)AsyncProcessTask– Async CPU-intensive via process poolSyncProcessTask– Sync CPU-intensive via process pool (bypasses GIL)
-
✅ Lifecycle hooks –
execute(),failed(),should_retry()for complete control -
✅ Method chaining for fluent task configuration
-
✅ Flexible configuration – Use
asynctasq.init()/Config.get()for all settings
Comparison with Alternatives
AsyncTasQ vs. Celery
| Feature | AsyncTasQ | Celery |
|---|---|---|
| Async Support | ✅ Async-first, built with asyncio | ❌ No asyncio support (promised for years, not delivered) |
| Type Safety | ✅ Full type hints, Generic[T] | ⚠️ Third-party stubs (celery-types) |
| Multi-Driver | ✅ 5 drivers (Redis/PostgreSQL/MySQL/RabbitMQ/SQS) | ⚠️ 3 brokers (Redis/RabbitMQ/SQS) |
| ORM Integration | ✅ Auto-serialization (SQLAlchemy/Django/Tortoise) | ❌ Manual serialization required |
| Serialization | ✅ msgpack (fast, binary, efficient) | ⚠️ JSON default (pickle/YAML/msgpack available) |
| FastAPI Integration | ✅ First-class, lifespan management | ⚠️ Manual setup, workarounds needed |
| Dead-Letter Queue | ✅ Built-in (PostgreSQL/MySQL) | ⚠️ Manual setup (RabbitMQ DLX) |
| ACID Guarantees | ✅ PostgreSQL/MySQL drivers | ❌ Not available |
| Global Rate Limiting | ⚠️ Not yet implemented | ❌ Not available (per-worker only) |
| Setup Complexity | ✅ Simple with asynctasq.init() |
⚠️ Complex configuration |
| Prefetch Multiplier | ✅ Sensible default (1) | ⚠️ Dangerous default (4x), often causes performance issues |
| Learning Curve | ✅ Simple, intuitive API | ⚠️ Steep learning curve |
When to use AsyncTasQ:
- Modern async Python applications (FastAPI, aiohttp, async web frameworks)
- Need true asyncio support for I/O-bound tasks (API calls, database queries)
- Type-safe codebase with full IDE support
- Multiple driver flexibility (dev → production migration)
- Automatic ORM model handling (SQLAlchemy, Django, Tortoise)
- Enterprise ACID requirements (financial transactions, critical workflows)
- Simple, clean API without steep learning curve
When to use Celery:
- Mature ecosystem with many plugins and extensions
- Complex workflows (chains, chords, groups with callbacks)
- Large existing Celery codebase that's not worth migrating
- Synchronous applications where asyncio isn't needed
- Need for battle-tested, widely-adopted solution
AsyncTasQ vs. Dramatiq
| Feature | AsyncTasQ | Dramatiq |
|---|---|---|
| Async Support | ✅ Async-first, native asyncio | ⚠️ Limited (via async-dramatiq extension, not first-class) |
| Type Safety | ✅ Full type hints, Generic[T] | ✅ Type hints (py.typed) |
| Multi-Driver | ✅ 5 drivers (Redis/PostgreSQL/MySQL/RabbitMQ/SQS) | ⚠️ 2 brokers (Redis/RabbitMQ) |
| ORM Integration | ✅ Auto-serialization (SQLAlchemy/Django/Tortoise) | ❌ Manual serialization required |
| Dead-Letter Queue | ✅ Built-in (PostgreSQL/MySQL) | ✅ Built-in (all brokers) |
| FastAPI Integration | ✅ First-class, lifespan management | ⚠️ Manual setup needed |
| Database Drivers | ✅ PostgreSQL/MySQL with ACID | ❌ Not available |
| Simplicity | ✅ Clean, intuitive API | ✅ Simple, well-designed |
When to use AsyncTasQ:
- Async-first applications (FastAPI, aiohttp, modern Python stack)
- True asyncio support for I/O-bound tasks
- Database-backed queues with ACID guarantees
- Automatic ORM model serialization
- Type-safe codebase with IDE support
When to use Dramatiq:
- Synchronous applications
- Mature, battle-tested solution needed
- Complex middleware requirements
- Don't need async support
AsyncTasQ vs. ARQ (Async Redis Queue)
| Feature | AsyncTasQ | ARQ |
|---|---|---|
| Async Support | ✅ Async-first, native asyncio | ✅ Async-first, native asyncio |
| Multi-Driver | ✅ 5 drivers (Redis/PostgreSQL/MySQL/RabbitMQ/SQS) | ❌ Redis only |
| Type Safety | ✅ Full type hints, Generic[T] | ✅ Type hints |
| ORM Integration | ✅ Auto-serialization (SQLAlchemy/Django/Tortoise) | ❌ Manual serialization |
| Serialization | ✅ msgpack (binary, efficient) | ⚠️ pickle (default, security concerns) |
| Dead-Letter Queue | ✅ Built-in (PostgreSQL/MySQL) | ❌ Not available |
| ACID Guarantees | ✅ PostgreSQL/MySQL drivers | ❌ Not available |
| FastAPI Integration | ✅ First-class, lifespan management | ⚠️ Manual setup |
| Task Execution Model | ✅ At-least-once with idempotency support | ⚠️ At-least-once ("pessimistic") |
| Simplicity | ✅ Clean, Laravel-inspired API | ✅ Lightweight, minimal |
| Custom Serializers | ✅ Configurable serializers | ✅ Configurable serializers |
When to use AsyncTasQ:
- Need multiple driver options (not locked into Redis)
- Database-backed queues with ACID guarantees
- Automatic ORM model handling
- Dead-letter queue support for failed task inspection
- FastAPI applications with first-class integration
- Enterprise reliability requirements
When to use ARQ:
- Simple Redis-only infrastructure
- Lightweight solution with minimal dependencies
- Cron job scheduling is a primary requirement
- Mature async task queue needed
- Custom serializers (e.g., msgpack) are acceptable to configure manually
AsyncTasQ vs. RQ (Redis Queue)
| Feature | AsyncTasQ | RQ |
|---|---|---|
| Async Support | ✅ Async-first, native asyncio | ❌ Sync only (no asyncio support) |
| Multi-Driver | ✅ 5 drivers (Redis/PostgreSQL/MySQL/RabbitMQ/SQS) | ❌ Redis only |
| Type Safety | ✅ Full type hints, Generic[T] | ✅ Type hints added |
| Retries | ✅ Configurable with custom should_retry() |
✅ Configurable retries |
| Dead-Letter Queue | ✅ Built-in (PostgreSQL/MySQL) | ❌ Not available |
| Database Drivers | ✅ PostgreSQL/MySQL with ACID | ❌ Not available |
| Simplicity | ✅ Intuitive, clean API | ✅ Very simple |
When to use AsyncTasQ:
- Async applications (FastAPI, aiohttp)
- True asyncio support for efficient I/O
- Multiple driver options
- Enterprise features (DLQ, ACID)
- ORM integration
When to use RQ:
- Simple, synchronous use cases
- Synchronous applications
- Redis-only infrastructure
- Need mature, battle-tested solution
AsyncTasQ vs. Huey
| Feature | AsyncTasQ | Huey |
|---|---|---|
| Async Support | ✅ Async-first, native asyncio | ⚠️ Limited (async result awaiting only via helpers) |
| Multi-Driver | ✅ 5 drivers (Redis/PostgreSQL/MySQL/RabbitMQ/SQS) | ⚠️ Redis/SQLite/Filesystem/Memory |
| Type Safety | ✅ Full type hints, Generic[T] | ⚠️ Limited type hints |
| ORM Integration | ✅ Auto-serialization (SQLAlchemy/Django/Tortoise) | ❌ Manual serialization |
| Enterprise Features | ✅ ACID, DLQ, visibility timeout | ⚠️ Basic features |
| Simplicity | ✅ Clean, modern API | ✅ Simple, lightweight |
| Cron Jobs | ⚠️ Not yet implemented | ✅ Built-in periodic tasks |
When to use AsyncTasQ:
- Async-first applications requiring true asyncio
- Enterprise requirements (ACID, DLQ)
- Type-safe codebase with IDE support
- Automatic ORM integration
- Need for multiple driver options
When to use Huey:
- Lightweight use cases
- Simple periodic/cron tasks
- SQLite-backed queues for embedded apps
- Mature, stable solution needed
Key Differentiators
AsyncTasQ stands out with:
- True async-first architecture – Built with asyncio from the ground up (unlike Celery, RQ, Huey)
- Multiple execution modes – Choose async I/O, sync I/O (thread pool), or CPU-bound (process pool) per task
- Intelligent ORM handling – Automatic model serialization for SQLAlchemy, Django ORM, and Tortoise ORM (90%+ smaller payloads)
- msgpack serialization – Binary format that's 2-3x faster than JSON with smaller payloads
- Multi-driver flexibility – 5 production-ready drivers (Redis, PostgreSQL, MySQL, RabbitMQ, AWS SQS) with identical API
- Type safety everywhere – Full type hints with Generic[T] support, mypy/pyright compatible
- Enterprise ACID guarantees – PostgreSQL/MySQL drivers with transactional dequeue for exactly-once processing
- Built-in dead-letter queues – PostgreSQL/MySQL drivers automatically handle permanently failed tasks
- First-class FastAPI integration – Lifespan management, automatic connection pooling, native async support
- Real-time event streaming – Redis Pub/Sub broadcasts task lifecycle events for monitoring
- Optional monitoring UI – Beautiful real-time dashboard via asynctasq-monitor
- Elegant, Laravel-inspired API – Method chaining (
.delay(60).on_queue("high").dispatch()) and intuitive task definitions - Simple configuration – Use
asynctasq.init()andConfig.get()for all configuration needs
CI & Contributing (short)
- One-line setup:
just init— install deps and pre-commit hooks - Start services:
just docker-up— Redis, PostgreSQL, MySQL, RabbitMQ, LocalStack (SQS) for local integration tests - Run tests:
just test— usejust test-unit/just test-integrationto scope - Run with coverage:
just test-covorpytest --cov=src/asynctasq --cov-report=html - Run the worker locally:
python -m asynctasq worker - Pre-commit hooks:
./setup-pre-commit.shorjust setup-hooks - Format / lint / typecheck:
just format,just lint,just typecheck - CI runs on PRs and pushes to
mainand includes lint, type checks and tests across Python 3.12–3.14. - Pre-commit hooks enforce formatting and static checks locally before commits (see
./setup-pre-commit.sh). - Branch protection: enable required status checks (CI success, lint, unit/integration jobs) for
main. - Coverage badge: the repository updates
.github/coverage.svgautomatically via.github/workflows/coverage-badge.yml. - Run full CI locally:
just ci(runs format/lint/typecheck/tests like the workflow).
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
License
MIT License – see LICENSE file for details.
Support
- Repository: github.com/adamrefaey/asynctasq
- Issues: github.com/adamrefaey/asynctasq/issues
- Discussions: github.com/adamrefaey/asynctasq/discussions
Roadmap
- SQLite driver support
- Oracle driver support
- Task batching support
- Task chaining and workflows (chains, chords, groups)
- Rate limiting
- Task priority within queues
- Scheduled/cron tasks
Credits
Built with ❤️ by Adam Refaey.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file asynctasq-1.2.0.tar.gz.
File metadata
- Download URL: asynctasq-1.2.0.tar.gz
- Upload date:
- Size: 116.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
78410243679b1816e8c35768031fafd0ad8aaac9ce2ba3ed6d9ea6edc8386eba
|
|
| MD5 |
54b79ea5029f88346a747e1ed8268e90
|
|
| BLAKE2b-256 |
cda2220b8dfad2fda327163f038e4a92df7fdcbc67608540f08242da0c2a6ef4
|
File details
Details for the file asynctasq-1.2.0-py3-none-any.whl.
File metadata
- Download URL: asynctasq-1.2.0-py3-none-any.whl
- Upload date:
- Size: 153.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4a26cf8a80b591a9c06763a3e7efc27442aae4a197d0af3faca4acd4c9e5eefb
|
|
| MD5 |
188a752a121da0e5558064405b8308f3
|
|
| BLAKE2b-256 |
d4bcef00118ec07110f0bc19ec5a76a01931ebac2ed0e39e5aacf651b4069ee4
|