Async task queue with BullMQ-like features
Project description
AsyncMQ
⚡ Supercharge your async applications with tasks so fast, you'll think you're bending time itself. ⚡
Documentation: https://asyncmq.dymmond.com 📚
Source Code: https://github.com/dymmond/asyncmq
The official supported version is always the latest released.
AsyncMQ is an asynchronous Python job queue focused on asyncio/anyio workloads.
It gives you:
- task registration via
@task - queue and worker runtime APIs
- delayed jobs, retries/backoff, TTL expiration, and dead-letter routing
- multiple backends (
Redis,Postgres,MongoDB,RabbitMQ, in-memory) - a CLI (
asyncmq) and a built-in dashboard app
What AsyncMQ Is (and Is Not)
AsyncMQ is:
- a library-first queue/worker runtime you embed in Python apps
- backend-pluggable through a shared
BaseBackendcontract - suitable for both local development and production deployments
AsyncMQ is not:
- a hosted queue service
- a guaranteed exactly-once execution system
- a replacement for domain-level idempotency in your task code
Architecture Overview
At runtime, AsyncMQ has four main layers:
- Task registration:
@task(queue=...)stores handlers inTASK_REGISTRYand adds.enqueue()helpers. - Queue API:
Queuewraps backend operations (enqueue,pause,list_jobs, delayed/repeatable APIs). - Worker runtime:
process_job/handle_jobrun tasks, manage state transitions, retries, and acknowledgements. - Backend and store: concrete backends persist job state and queue metadata.
For an end-to-end walkthrough, start with Core Concepts.
Feature Map
Minimal Quickstart (In-Memory)
Use in-memory backend first so you can run without Redis/Postgres.
# myapp/settings.py
from asyncmq.backends.memory import InMemoryBackend
from asyncmq.conf.global_settings import Settings
class AppSettings(Settings):
backend = InMemoryBackend()
export ASYNCMQ_SETTINGS_MODULE=myapp.settings.AppSettings
# myapp/tasks.py
from asyncmq.tasks import task
@task(queue="emails", retries=2, ttl=300)
async def send_welcome(email: str) -> None:
print(f"sent welcome email to {email}")
# producer.py
import anyio
from asyncmq.queues import Queue
from myapp.tasks import send_welcome
async def main() -> None:
queue = Queue("emails")
job_id = await send_welcome.enqueue("alice@example.com", backend=queue.backend)
print("enqueued", job_id)
anyio.run(main)
asyncmq worker start emails --concurrency 1
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file asyncmq-0.8.0.tar.gz.
File metadata
- Download URL: asyncmq-0.8.0.tar.gz
- Upload date:
- Size: 176.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: Hatch/1.16.5 cpython/3.10.19 HTTPX/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2774a01204e3c98f9c2a46579abd81c307b9750163c0c8522ded478314593039
|
|
| MD5 |
8440860c5734f1ea4a1034c658d32f42
|
|
| BLAKE2b-256 |
586b99e5de2de91f680f34f408e833ac6cf8d8611d4cb1801e0ec630e8389583
|
File details
Details for the file asyncmq-0.8.0-py3-none-any.whl.
File metadata
- Download URL: asyncmq-0.8.0-py3-none-any.whl
- Upload date:
- Size: 231.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: Hatch/1.16.5 cpython/3.10.19 HTTPX/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2cd75add597cab5b9b86ed7cd4459be3e5b260ebab64e29a043368235ab1703a
|
|
| MD5 |
8ad902102fd7ee3ff774e443c852e310
|
|
| BLAKE2b-256 |
ace7fe1dcc9581c6243903871d9daf6d2fb204b231d4ae0c3e3a13e75db943af
|