Skip to main content

Modern, developer-friendly Python task queue and job processing framework with CLI, API, and pluggable storage backends for scalable background job processing.

Project description

TaskMQ

PyPI version CI Docs Python Versions License: MIT

TaskMQ is a modern, developer-friendly Python task queue and job processing framework. It helps you run background jobs, automate workflows, and build scalable systems with ease.

Key Features:

  • Simple CLI and REST API for adding and running jobs
  • Decorator-based handler registration for custom task logic
  • Multiple storage backends (SQLite for development, Redis for production)
  • Retry policies with fixed, exponential backoff, or no-retry options
  • Job scheduling for future execution and periodic/recurring jobs
  • Dead Letter Queue (DLQ) for failed job inspection and replay
  • JWT-based authentication for API security
  • Prometheus metrics for monitoring and observability
  • Graceful shutdown with in-flight job completion
  • Full async handler support

Table of Contents


Requirements

  • Python 3.8 or higher
  • SQLite (included with Python) or Redis 4.0+ for production workloads

Installation

From PyPI (Recommended)

pip install task-mq

# With Redis support
pip install task-mq[redis]

# With development tools
pip install task-mq[dev]

From Source

git clone https://github.com/gvarun01/task-mq.git
cd task-mq
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"

Quick Start

1. Start a Worker

taskmq run-worker --max-workers 2

2. Add a Job

taskmq add-job --payload '{"task": "hello"}' --handler dummy

3. Check Job Status

taskmq get-job 1

4. Start the REST API

taskmq serve-api

Visit http://127.0.0.1:8000/docs for interactive API documentation.


Python Library Usage

TaskMQ can be used directly in Python applications:

from taskmq.jobs.handlers import register_handler
from taskmq.worker import Worker
from taskmq.storage.sqlite_backend import SQLiteBackend

# Define a custom handler
@register_handler("email")
def send_email(job):
    """Process an email sending job."""
    payload = job.payload
    print(f"Sending email to {payload.get('to')}")
    return {"status": "sent", "to": payload.get("to")}

# Create backend and insert a job
backend = SQLiteBackend()
job_id = backend.insert_job(
    payload='{"to": "user@example.com", "subject": "Hello"}',
    handler="email"
)
print(f"Created job: {job_id}")

# Start the worker (blocks until stopped)
worker = Worker(max_workers=2, backend=backend)
worker.start()

Important: Handlers must be imported/registered before starting workers.


CLI Reference

Command Description
taskmq run-worker Start the worker pool to process jobs
taskmq serve-api Start the REST API server
taskmq add-job Add a new job to the queue
taskmq get-job <id> Get job details and result
taskmq inspect <id> View job execution timeline
taskmq logs --job <id> Search structured job logs
taskmq list-dead List jobs in the Dead Letter Queue
taskmq replay <id> Replay any job
taskmq replay-dead <id> Replay a job from the DLQ

Backend Selection

# Use SQLite (default)
taskmq run-worker

# Use Redis
taskmq --backend redis --redis-url redis://localhost:6379/0 run-worker

Advanced Features

Retry Policies

Configure how failed jobs are retried:

# Fixed interval retries (default: 60 seconds between attempts)
backend.insert_job(payload, handler="mytask", retry_policy="fixed")

# Exponential backoff (doubles each retry)
backend.insert_job(payload, handler="mytask", retry_policy="exponential")

# No retries (move to DLQ on first failure)
backend.insert_job(payload, handler="mytask", retry_policy="none")

Job Priority

Higher priority jobs are processed first:

# Priority levels: 0 (Low), 10 (Normal), 20 (High)
backend.insert_job(payload, handler="urgent", priority=20)

Scheduled Jobs

Execute jobs at a specific time:

from datetime import datetime, timedelta, UTC

# Run 1 hour from now
future = datetime.now(UTC) + timedelta(hours=1)
backend.insert_job(payload, handler="mytask", scheduled_for=future)

Periodic Jobs

Create recurring jobs:

# Run every 300 seconds (5 minutes)
backend.insert_job(payload, handler="cleanup", interval_seconds=300)

Async Handlers

TaskMQ supports async handlers natively:

import asyncio
from taskmq.jobs.handlers import register_handler

@register_handler("async_task")
async def async_handler(job):
    await asyncio.sleep(1)
    return {"status": "completed"}

Dead Letter Queue

Jobs that exhaust retries are moved to the DLQ for inspection:

# List failed jobs
taskmq list-dead

# Replay a failed job (resets retry count)
taskmq replay-dead 123

Handler Versioning

Ensure replay uses the exact same handler code:

# Fails if handler code has changed since original execution
taskmq replay 123 --exact

Graceful Shutdown

Workers handle SIGINT/SIGTERM gracefully, completing in-flight jobs before exiting:

^C
Received signal 2. Initiating graceful shutdown...
Waiting for 2 active jobs to complete...

Job Inspection

View the complete execution timeline:

taskmq inspect 123

Output:

Job ID: 123
Status: SUCCESS
Handler: email
Payload: {'to': 'user@example.com'}
----------------------------------------
Execution Timeline:
[2026-03-15T10:00:00+00:00] Queued
[2026-03-15T10:00:01+00:00] Job started
[2026-03-15T10:00:02+00:00] Job finished successfully

API Usage

Authentication

All API endpoints require JWT authentication:

import httpx

headers = {"Authorization": "Bearer <your_jwt_token>"}

Add a Job

response = httpx.post(
    "http://127.0.0.1:8000/add-job",
    json={"payload": {"task": "process"}, "handler": "mytask"},
    headers=headers
)
print(response.json())
# {"status": "ok", "job_id": 1}

API Endpoints

Endpoint Method Auth Description
/health GET None Health check
/add-job POST admin Add a new job
/cancel POST admin Cancel a job
/retry POST admin/worker Retry a failed job
/monitor/metrics GET None Prometheus metrics

See http://127.0.0.1:8000/docs for complete OpenAPI documentation.


Docker Deployment

Build and Run

docker build -t taskmq .
docker run --rm -p 8000:8000 taskmq serve-api

Using Docker Compose

docker-compose up

This starts both the API server and a worker process.


Documentation

Full documentation: https://gvarun01.github.io/task-mq/


Contributing

Contributions are welcome! Please see CONTRIBUTING.md for guidelines.

# Setup development environment
git clone https://github.com/gvarun01/task-mq.git
cd task-mq
pip install -e ".[dev]"

# Run tests
pytest -v

# Run linting
ruff check taskmq tests

License

TaskMQ is released under the MIT License.


Author: Varun Gupta

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

task_mq-0.1.2.tar.gz (38.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

task_mq-0.1.2-py3-none-any.whl (30.8 kB view details)

Uploaded Python 3

File details

Details for the file task_mq-0.1.2.tar.gz.

File metadata

  • Download URL: task_mq-0.1.2.tar.gz
  • Upload date:
  • Size: 38.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for task_mq-0.1.2.tar.gz
Algorithm Hash digest
SHA256 ba82363daeb34acd2015793707dce44ebb2556fc9ba5e11efc07a2a4811735fe
MD5 6bc25d0a17dd14ca1b60da257d58fcb1
BLAKE2b-256 28a971dc67d960ec5e4f3ad0efaafdc7e20e71ecf856772674055818d1c7acb1

See more details on using hashes here.

File details

Details for the file task_mq-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: task_mq-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 30.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for task_mq-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3cce39bca72fef0f2471050d5e813c0c22222075fe713fd14e7c186575e6a005
MD5 4a0603a13c9c40576a8b1644e78a63a1
BLAKE2b-256 0ce658339f3b7c497cb3a5d16bed41e8fa908360e2c448b2cd32d7a159220f8d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page