Skip to main content

A lightweight message queue for Python with no dependencies

Project description

LiteQ

Tests codecov Python Version License: MIT

Translations: ๐Ÿ‡ท๐Ÿ‡บ ะ ัƒััะบะธะน

A lightweight, minimalist task queue for Python with zero dependencies.

LiteQ is a pure Python task queue built on SQLite, perfect for background job processing without the complexity of Celery or Redis. Just decorate your functions and call .delay() - that's it!

Features

โœจ Zero Dependencies - Pure Python 3.10+ with only SQLite
โšก Dead Simple API - Just @task decorator and .delay()
๐Ÿ”„ Async & Sync - Works with both async and regular functions
๐Ÿ“ฆ Multiple Queues - Organize tasks by queue name
๐ŸŽฏ Task Priorities - Control execution order
๐Ÿ” Auto Retry - Configurable retry logic
๐Ÿ‘ท Multiple Workers - Process tasks in parallel
โฐ Task Scheduling - Cron-like scheduled tasks
โฑ๏ธ Task Timeouts - Automatic timeout and stuck task handling
๐Ÿš€ FastAPI Integration - Built-in FastAPI support
๐Ÿ“Š Monitoring - Track stats, workers, and task status
๐Ÿ’พ Persistent - SQLite-backed for reliability
๐Ÿงช Production Ready - test coverage >80%

Installation

pip install liteq

Quick Start

1. Define your tasks

Create a file tasks.py:

from liteq import task
import time

@task()
def send_email(to: str, subject: str):
    print(f"Sending email to {to}: {subject}")
    time.sleep(1)
    return f"Email sent to {to}"

@task(queue="reports", max_retries=5)
def generate_report(report_id: int):
    print(f"Generating report {report_id}")
    time.sleep(2)
    return {"report_id": report_id, "status": "completed"}

2. Enqueue tasks

from tasks import send_email, generate_report

# Enqueue tasks - they return task IDs
task_id = send_email.delay(to="user@example.com", subject="Hello!")
print(f"Enqueued task: {task_id}")

# Enqueue to different queue
report_id = generate_report.delay(report_id=123)

3. Check task status

from liteq import get_task_status

# Get task status
status = get_task_status(task_id)
if status:
    print(f"Status: {status['status']}")  # pending/running/done/failed
    print(f"Attempts: {status['attempts']}/{status['max_retries']}")
    
    if status['status'] == 'done':
        print(f"Result: {status['result']}")
    elif status['status'] == 'failed':
        print(f"Error: {status['error']}")

4. Run worker

# Start a worker to process tasks
liteq worker --app tasks.py --queues default,reports --concurrency 4

That's it! Your tasks will be processed in the background.

Examples

FastAPI Integration

from fastapi import FastAPI
from liteq import task, get_task_status
from liteq.fastapi import LiteQBackgroundTasks, enqueue_task

app = FastAPI()

@task(queue="emails", timeout=60)
async def send_email(to: str, subject: str):
    # Send email logic
    return {"sent": True}

# Method 1: Simple .delay()
@app.post("/send-email")
async def api_send_email(to: str, subject: str):
    task_id = send_email.delay(to, subject)
    return {"task_id": task_id}

# Method 2: FastAPI-like BackgroundTasks with status checking
@app.post("/send-email-bg")
async def api_send_email_bg(to: str, background: LiteQBackgroundTasks):
    task_id = background.add_task(send_email, to, "Hello!")
    return {"message": "queued", "task_id": task_id}

# Method 3: Helper function
@app.post("/send-email-helper")
async def api_send_email_helper(to: str):
    task_id = enqueue_task(send_email, to, "Welcome")
    return {"task_id": task_id}

# Check task status
@app.get("/tasks/{task_id}")
async def check_task_status(task_id: int):
    status = get_task_status(task_id)
    if not status:
        return {"error": "Task not found"}, 404
    return {
        "task_id": status["id"],
        "status": status["status"],
        "result": status.get("result"),
        "error": status.get("error")
    }

Scheduled Tasks (Cron)

from liteq import task, register_schedule
from liteq.scheduler import Scheduler

@task()
def daily_backup():
    print("Running backup...")
    return {"status": "success"}

@task()
def cleanup():
    print("Cleaning up...")

# Register schedules
register_schedule(daily_backup, "0 2 * * *")  # Every day at 2 AM
register_schedule(cleanup, "*/5 * * * *")  # Every 5 minutes

# Run scheduler
scheduler = Scheduler(check_interval=60)
scheduler.run()
# Or via CLI
liteq scheduler --app tasks.py --interval 60

Task Timeouts

from liteq import task

# Timeout on task level
@task(timeout=30)  # 30 seconds
def slow_task():
    import time
    time.sleep(100)  # Will be killed after 30s

# Timeout on worker level
# liteq worker --app tasks.py --timeout 60

Delayed Execution

from liteq import task
from datetime import datetime, timedelta

@task()
def reminder(message: str):
    print(f"Reminder: {message}")

# Schedule for later
run_time = datetime.now() + timedelta(hours=1)
task_id = reminder.schedule(run_time, "Meeting in 1 hour")

Async Tasks

import asyncio
from liteq import task

@task()
async def fetch_data(url: str):
    print(f"Fetching {url}")
    await asyncio.sleep(1)
    return {"url": url, "data": "..."}

# Enqueue
task_id = fetch_data.delay(url="https://api.example.com")

Multiple Queues

from liteq import task

@task(queue="emails")
def send_email(to: str):
    print(f"Email to {to}")

@task(queue="reports")
def generate_report(id: int):
    print(f"Report {id}")

@task(queue="notifications")
def send_push(user_id: int, message: str):
    print(f"Push to {user_id}: {message}")

# Enqueue to different queues
send_email.delay(to="user@example.com")
generate_report.delay(id=42)
send_push.delay(user_id=1, message="Hello!")

Task Priorities

from liteq import task

@task()
def process_item(item_id: int):
    return f"Processed {item_id}"

# Higher priority number = runs first
# These are enqueued to the same queue but with different priorities
# (Note: priority is set in the task definition or database, 
# not in .delay() call in current version)

Custom Task Names and Retries

from liteq import task

@task(name="custom_email_task", max_retries=5)
def send_email(to: str):
    # This task will retry up to 5 times on failure
    print(f"Sending to {to}")

@task(max_retries=0)  # No retries
def one_time_task():
    print("This runs only once")

CLI Usage

# Run worker
liteq worker --app tasks.py

# Multiple queues
liteq worker --app tasks.py --queues emails,reports,notifications

# Custom concurrency
liteq worker --app tasks.py --concurrency 8

# With timeout (kills stuck tasks)
liteq worker --app tasks.py --timeout 300

# Run scheduler
liteq scheduler --app tasks.py --interval 60

# Monitor dashboard
liteq monitor --port 5151

Programmatic Worker

from liteq.db import init_db
from liteq.worker import Worker

# Initialize database
init_db()

# Create and run worker
worker = Worker(queues=["default", "emails"], concurrency=4)
worker.run()  # This blocks

Monitoring

from liteq.monitoring import (
    get_queue_stats,
    get_recent_tasks,
    list_queues,
    get_failed_tasks,
    get_active_workers,
)

# Get queue statistics
stats = get_queue_stats()
for stat in stats:
    print(f"{stat['queue']}: {stat['count']} tasks ({stat['status']})")

# List all queues
queues = list_queues()
print(f"Queues: {queues}")

# Get recent tasks
recent = get_recent_tasks(limit=10)

# Get failed tasks
failed = get_failed_tasks(limit=5)
for task in failed:
    print(f"Task {task['id']} failed: {task['error']}")

# Get active workers
workers = get_active_workers()
for worker in workers:
    print(f"Worker {worker['worker_id']}: {worker['active_tasks']} active tasks")

More Examples

Check out the examples/ directory for complete working examples:

Run any example:

python examples/basic.py

API Reference

Core Functions

get_task_status(task_id: int) -> dict | None

Get task status and details by task ID.

Arguments:

  • task_id (int): Task ID returned by .delay() or .schedule()

Returns: Dictionary with task information or None if not found

Example:

from liteq import task, get_task_status

@task()
def process_data(x: int):
    return x * 2

task_id = process_data.delay(5)

# Check status
status = get_task_status(task_id)
if status:
    print(f"Status: {status['status']}")  # pending/running/done/failed
    print(f"Attempts: {status['attempts']}/{status['max_retries']}")
    if status['status'] == 'done':
        print(f"Result: {status['result']}")

Decorators

@task(queue='default', max_retries=3, name=None, timeout=None)

Decorate a function to make it a task.

Arguments:

  • queue (str): Queue name (default: "default")
  • max_retries (int): Maximum retry attempts (default: 3)
  • name (str, optional): Custom task name (defaults to function name)
  • timeout (int, optional): Task timeout in seconds (default: None)

Returns: A callable with .delay(*args, **kwargs) and .schedule(run_at, *args, **kwargs) methods

Example:

@task(queue="emails", max_retries=5, timeout=60)
def send_email(to: str):
    ...

# Enqueue task
task_id = send_email.delay(to="user@example.com")

# Schedule for later
from datetime import datetime, timedelta
run_time = datetime.now() + timedelta(hours=1)
task_id = send_email.schedule(run_time, to="user@example.com")

Worker

Worker(queues, concurrency, task_timeout=None)

Create a worker to process tasks.

Arguments:

  • queues (list[str]): List of queue names to process
  • concurrency (int): Number of concurrent threads
  • task_timeout (int, optional): Timeout in seconds for stuck tasks (default: None)

Methods:

  • run(): Start processing tasks (blocks)

Example:

from liteq.worker import Worker

# Basic worker
worker = Worker(queues=["default", "emails"], concurrency=4)
worker.run()

# Worker with timeout
worker = Worker(queues=["default"], concurrency=4, task_timeout=300)
worker.run()  # Kills tasks running longer than 5 minutes

Monitoring Functions

All available in liteq.monitoring:

get_queue_stats() -> list[dict]

Get statistics grouped by queue and status.

get_recent_tasks(limit=50) -> list[dict]

Get recent tasks ordered by creation time.

list_queues() -> list[str]

Get list of all unique queue names.

get_failed_tasks(limit=50) -> list[dict]

Get recent failed tasks.

get_active_workers() -> list[dict]

Get currently active workers (heartbeat < 15 seconds ago).

Scheduler Functions

All available in liteq.scheduler:

register_schedule(task_func, cron_expr, queue='default', **kwargs) -> int

Register a task to run on a schedule.

Example:

from liteq import task, register_schedule

@task()
def backup():
    print("Backup running")

# Every day at 2 AM
schedule_id = register_schedule(backup, "0 2 * * *")

# Every 5 minutes
schedule_id = register_schedule(backup, "*/5 * * * *")

Scheduler(check_interval=60)

Scheduler daemon that processes scheduled tasks.

Example:

from liteq.scheduler import Scheduler

scheduler = Scheduler(check_interval=60)
scheduler.run()  # Blocks

FastAPI Integration

All available in liteq.fastapi:

LiteQBackgroundTasks

FastAPI-like background tasks using LiteQ.

Methods:

  • add_task(func, *args, **kwargs) -> int - Add task and return task ID
  • get_task_status(task_id: int) -> dict | None - Get status of specific task
  • get_all_statuses() -> list[dict] - Get statuses of all tasks in this background instance
  • task_ids (property) - List of all task IDs

Example:

from fastapi import FastAPI
from liteq.fastapi import LiteQBackgroundTasks
from liteq import task

app = FastAPI()

@task()
def send_email_task(to: str):
    # Send email
    return {"sent": True}

@app.post("/send-email")
async def send_email(to: str, background: LiteQBackgroundTasks):
    task_id = background.add_task(send_email_task, to)
    return {"message": "queued", "task_id": task_id}

@app.post("/batch-emails")
async def batch_emails(recipients: list[str], background: LiteQBackgroundTasks):
    for recipient in recipients:
        background.add_task(send_email_task, recipient)
    
    return {
        "message": f"Queued {len(recipients)} emails",
        "task_ids": background.task_ids
    }

@app.get("/batch-status")
async def batch_status(background: LiteQBackgroundTasks):
    statuses = background.get_all_statuses()
    return {"tasks": statuses}

enqueue_task(task_func, *args, **kwargs) -> int

Helper to enqueue a task.

Database

init_db()

Initialize the database schema. Called automatically by CLI.

Example:

from liteq.db import init_db

init_db()
โ”‚   โ”œโ”€โ”€ core.py           # Task decorator and registry
โ”‚   โ”œโ”€โ”€ db.py             # Database layer (SQLite)
โ”‚   โ”œโ”€โ”€ worker.py         # Worker implementation
โ”‚   โ”œโ”€โ”€ cli.py            # Command-line interface
โ”‚   โ”œโ”€โ”€ monitoring.py     # Stats and monitoring
โ”‚   โ””โ”€โ”€ web.py            # Web dashboard (optional)
โ”œโ”€โ”€ examples/             # Complete examples
โ”œโ”€โ”€ tests/                # >80% coverage
โ”œโ”€โ”€ README.md
## Project Structure

liteq/ โ”œโ”€โ”€ liteq/ โ”‚ โ”œโ”€โ”€ init.py # Main exports โ”‚ โ”œโ”€โ”€ db.py # Database layer โ”‚ โ”œโ”€โ”€ decorators.py # @task decorator โ”‚ โ”œโ”€โ”€ worker.py # Worker implementation โ”‚ โ”œโ”€โ”€ manager.py # QueueManager โ”‚ โ”œโ”€โ”€ producer.py # Task enqueueing โ”‚ โ”œโ”€โ”€ monitoring.py # Stats and monitoring โ”‚ โ”œโ”€โ”€ recovery.py # Recovery functions โ”‚ โ”œโ”€โ”€ registry.py # Task registry โ”‚ โ””โ”€โ”€ signals.py # Signal handling โ”œโ”€โ”€ examples/ โ”œโ”€โ”€ tests/ โ”œโ”€โ”€ README.md โ”œโ”€โ”€ LICENSE โ”œโ”€โ”€ pyproject.toml โ””โ”€โ”€ setup.py


## Development

### Setup

```bash
git clone https://github.com/ddreamboy/liteq.git
cd liteq
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install -e ".[dev]"

Or install only test dependencies:

pip install -r requirements-test.txt

Run Tests

# Run all tests
pytest

# With coverage
pytest --cov=liteq --cov-report=html

# Verbose output
pytest -v

Code Quality

# Format code
black .

# Lint code
ruff check .

Publish to PyPI

# Build
python -m build

# Upload to TestPyPI
python -m twine upload --repository testpypi dist/*

# Upload to PyPI
python -m twine upload dist/*

Use Cases

  • ๐Ÿ“ง Email sending queues

  • ๐Ÿ“Š Report generation

  • Environment Variables

  • LITEQ_DB - Database file path (default: liteq.db)

export LITEQ_DB=/path/to/tasks.db
liteq worker --app tasks.py

Database Schema

LiteQ uses a simple SQLite database with two tables:

tasks:

  • id - Primary key
  • name - Task function name
  • payload - JSON args/kwargs
  • queue - Queue name
  • status - pending/running/done/failed
  • priority - Integer (higher = first)
  • attempts - Current attempt count
  • max_retries - Max retry limit
  • worker_id - Processing worker
  • run_at - Scheduled run time
  • created_at - Creation timestamp
  • finished_at - Completion timestamp
  • result - JSON result
  • error - Error message

workers:

  • worker_id - Primary key
  • hostname - Worker hostname
  • queues - Comma-separated queues
  • concurrency - Process count
  • last_heartbeat - Last ping time

Use Cases

  • ๐Ÿ“ง Email sending queues
  • ๐Ÿ“Š Report generation
  • ๐Ÿ–ผ๏ธ Image/video processing
  • ๐Ÿ“ฑ Push notifications
  • ๐Ÿงน Cleanup/maintenance tasks
  • ๐Ÿ“ˆ Analytics pipelines
  • ๐Ÿ”„ Webhook delivery
  • ๐Ÿ“ฆ Batch operations
  • ๐Ÿ” Web scraping
  • ๐Ÿ’พ Data imports

Why LiteQ?

Simple - Minimal API, zero configuration
Lightweight - No dependencies, small codebase
Fast - SQLite is surprisingly performant
Reliable - WAL mode, ACID transactions
Debuggable - It's just SQLite, inspect with any SQL tool
Pythonic - Feels natural, not enterprise-y

When NOT to use LiteQ

  • Millions of tasks per second
  • Distributed/multi-node setups
  • Network filesystems (NFS, SMB)
  • Tasks larger than a few MB
  • Real-time streaming

For these, use RabbitMQ, Redis, Kafka, or cloud service

Contributing

Contributions welcome! Please feel free to submit a Pull Request.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

liteq-0.2.1.tar.gz (59.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

liteq-0.2.1-py3-none-any.whl (24.8 kB view details)

Uploaded Python 3

File details

Details for the file liteq-0.2.1.tar.gz.

File metadata

  • Download URL: liteq-0.2.1.tar.gz
  • Upload date:
  • Size: 59.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for liteq-0.2.1.tar.gz
Algorithm Hash digest
SHA256 2dc214a644e3ceb4e48360c292a2b476874f47f1b6a0b3727d995c4f8ceeae68
MD5 039ee8ad8ffc286c27f13d1a6853aaec
BLAKE2b-256 5e4f908eb2b2fe55f3b152001c21ad2c8a6039b8ebfab0c811c4d0db74f23965

See more details on using hashes here.

File details

Details for the file liteq-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: liteq-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 24.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for liteq-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1432399dc0eda1f84d3854cbbf711b2a89812d620f76149a69deb8c2f8c17e56
MD5 5459be67e981f23c40fd6bb650e3b332
BLAKE2b-256 51516d43e110bd4c29cdfa87aad4ead818b5e5b9d29c8d1928d6867969960453

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page