Skip to main content

Async Redis-backed job runner for Python apps. AQWorker orchestrates workers, handlers, and queues with auto-discovery, retries, CLI tooling, and FastAPI-friendly async processing while staying framework agnostic.

Project description

AQWorker - Async Queue Worker

Overview

AQWorker is a Redis-based background job processing system for Python applications. It provides a simple, efficient way to handle asynchronous tasks with support for multiple worker types, queue isolation, and job status tracking. While it works seamlessly with FastAPI, it's a standalone package that can be used with any Python application.

Features

  • Framework Agnostic: Works with FastAPI, Django, Flask, or any Python application
  • Async/Await Support: Built with async/await for modern Python applications
  • Redis-based: Uses Redis for reliable job queue management
  • Worker Registry: Centralized worker management with automatic discovery
  • Handler Registry: Organized handler system for job processing
  • Handler Auto-Discovery: Import handler packages automatically via include_packages
  • CLI Tools: Command-line interface for managing workers and jobs
  • Queue Isolation: Separate queues for different job types
  • Job Status Tracking: Track job lifecycle (PENDING, PROCESSING, COMPLETED, FAILED, CANCELLED)
  • Automatic Retries: Built-in retry mechanism with configurable delays

Architecture

┌─────────────┐    get_next_job()    ┌─────────────┐    dequeue()    ┌─────────────┐
│   Worker    │ ────────────────────►│ JobService  │ ──────────────► │  JobQueue   │
│             │                      │             │                 │             │
│             │ ◄────────────────────│             │ ◄────────────── │             │
└─────────────┘    complete_job()    └─────────────┘   complete_job()└─────────────┘

┌─────────────┐    enqueue_job()     ┌─────────────┐    enqueue()    ┌─────────────┐
│ Application │ ────────────────────►│   AQWorker  │ ──────────────► │  JobQueue   │
│ (FastAPI/   │                      │             │                 │             │
│  Django/    │                      │  - Worker   │                 │             │
│  Flask/     │                      │    Registry │                 │             │
│  Script)    │                      │  - Handler  │                 │             │
│             │                      │    Registry │                 │             │
└─────────────┘                      │  - Job      │                 └─────────────┘
                                     │    Service  │
                                     └─────────────┘

Components

1. AQWorker (core.py)

  • Purpose: Main orchestrator class that manages workers, handlers, and job service
  • Features:
    • Worker and handler registration
    • Worker instance creation
    • Job service integration
    • Centralized configuration

2. WorkerRegistry (worker/registry.py)

  • Purpose: Manages worker class registrations
  • Features:
    • Register worker classes with names
    • List available workers
    • Get worker definitions

3. HandlerRegistry (handler/registry.py)

  • Purpose: Manages handler class registrations
  • Features:
    • Register handler classes
    • Lookup handlers by name
    • Snapshot of all registered handlers

4. JobService (job/service.py)

  • Purpose: High-level service layer for job operations
  • Features:
    • Enqueue jobs
    • Dequeue jobs for workers
    • Get job status
    • Queue statistics
    • Job lifecycle management

5. JobQueue (job/queue.py)

  • Purpose: Low-level Redis operations for job management
  • Features:
    • FIFO queue processing
    • Async/await support
    • Job status tracking
    • Redis operations: enqueue, dequeue, complete_job

6. BaseWorker (worker/base.py)

  • Purpose: Base class for all workers
  • Features:
    • Automatic job polling
    • Concurrent job processing
    • Error handling and retries
    • Health monitoring

7. BaseHandler (handler/base.py)

  • Purpose: Base class for all job handlers
  • Features:
    • Async/sync handler support
    • Standardized job processing interface

Installation

pip install aqworker

Or install from source:

git clone <repository>
cd aqworker
pip install -e .

Quick Start

1. Define Handlers

# handlers.py
from aqworker import BaseHandler

class EmailHandler(BaseHandler):
    name = "email"
    
    async def handle(self, data: dict) -> bool:
        recipient = data.get("recipient")
        subject = data.get("subject")
        body = data.get("body")
        
        # Send email logic here
        print(f"Sending email to {recipient}: {subject}")
        return True

You can still use the @aq_worker.handler decorator for function-based handlers—just make sure the module containing the decorator is imported so the handler gets registered.

2. Define Workers

# workers.py
from aqworker import BaseWorker, WorkerConfig

class EmailWorker(BaseWorker):
    worker_name = "email"
    worker_config = WorkerConfig(
        queue_names=["emails"],
        max_concurrent_jobs=3,
        poll_interval=0.5,
    )

3. Initialize AQWorker

# aq_worker.py
from aqworker import AQWorker
from aqworker.job.service import JobService
from workers import EmailWorker
from handlers import EmailHandler  # optional when using include_packages

# Create JobService
job_service = JobService()

# Initialize AQWorker and auto-import handlers package
aq_worker = AQWorker(include_packages=["handlers"])

# Register workers and handlers
aq_worker.register_worker(EmailWorker)
# Handlers inside "handlers" will be discovered automatically, but you can still
# register manually if desired:
# aq_worker.register_handler(EmailHandler)

# Connect job service
aq_worker.listen(job_service)

Automatic handler discovery

If your handlers are spread across multiple modules, you can tell AQWorker to import them automatically by passing include_packages. Every module inside those packages will be imported once at startup, so any @aq_worker.handler decorators (or subclasses of BaseHandler) run and register themselves:

aq_worker = AQWorker(include_packages=[
    "my_project.workers",
    "examples.simple.aq_worker",
])

You can also call aq_worker.autodiscover_handlers([...]) later if you need to load additional packages dynamically. After discovery completes AQWorker logs the full list of available handlers so you can confirm everything loaded correctly.

4. Enqueue Jobs

# In your application
from worker import aq_worker

# Enqueue a job
job = await aq_worker.job_service.enqueue_job(
    queue_name="emails",
    handler="email",
    data={
        "recipient": "user@example.com",
        "subject": "Welcome!",
        "body": "Welcome to our service!"
    }
)

5. Run Workers

Using CLI:

# Option 1: Pass file path
aqworker start email aq_worker.py

# Option 2: Use environment variable
export AQWORKER_FILE=aq_worker.py
aqworker start email

Using Python:

import asyncio
from worker import aq_worker

async def main():
    worker = aq_worker.create_worker("email")
    await worker.run()

asyncio.run(main())

Usage Examples

Standalone Python Application

# main.py
import asyncio
from worker import aq_worker

async def main():
    # Enqueue jobs
    for i in range(10):
        job = await aq_worker.job_service.enqueue_job(
            queue_name="emails",
            handler="email",
            data={"recipient": f"user{i}@example.com", "subject": f"Email {i}"}
        )
        print(f"Enqueued job: {job.id}")
    
    # Start aq_worker
    worker = aq_worker.create_worker("email")
    await worker.run()

asyncio.run(main())

FastAPI Integration

The repo ships with a complete FastAPI + AQWorker demo under examples/simple_fastapi/. Key files:

  • worker.py – configures AQWorker, registers the example workers, and wires up the shared JobService.
  • main.py – FastAPI app that exposes /jobs/email, /jobs/notification, queue stats, and discovery endpoints.
  • client.py – small CLI client that calls the HTTP endpoints so you can watch jobs flow through the queues.

Minimal excerpt of the worker wiring:

# examples/simple_fastapi/worker.py
from aqworker import AQWorker
from workers import EmailWorker, NotificationWorker
from job_service import job_service

aq_worker = AQWorker(include_packages=["handlers"])
aq_worker.register_worker(EmailWorker)
aq_worker.register_worker(NotificationWorker)
aq_worker.listen(job_service)

To run the demo API and fire sample requests:

cd examples/simple_fastapi
uv run python main.py            # start FastAPI

# in another shell: send a job via the bundled client
uv run python client.py --recipient user@example.com --subject Test --body "Hello!"

Django Integration

# views.py
from django.http import JsonResponse
from worker import aq_worker
import asyncio

async def send_email_view(request):
    job = await aq_worker.job_service.enqueue_job(
        queue_name="emails",
        handler="email",
        data={
            "recipient": request.POST.get("recipient"),
            "subject": request.POST.get("subject"),
            "body": request.POST.get("body")
        }
    )
    return JsonResponse({"job_id": job.id})

# Use asyncio.run or Django's async support

CLI Commands

AQWorker provides a CLI tool for managing workers and jobs:

List Commands

# List available workers
aqworker list:aq_worker [file_path]

# List registered handlers
aqworker list:handlers [file_path]

# List queues
aqworker list:queue [file_path]

Worker Commands

# Start a aq_worker
aqworker start <worker_name> [file_path]

# Get queue statistics
aqworker stats <queue_name> [file_path]

Using Environment Variable

# Set AQWorker file path
export AQWORKER_FILE=aq_worker.py

# Use commands without file path
aqworker list:aq_worker
aqworker start email
aqworker stats emails

Note: The file path should point to a Python file containing an AQWorker instance. The CLI automatically finds the instance (looking for variables named aq_worker, worker, aqworker, or aq).

Configuration

Environment Variables

# Redis Configuration (used when creating JobService)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0

# AQWorker CLI
AQWORKER_FILE=aq_worker.py  # Path to file containing AQWorker instance

Worker Configuration

from aqworker import WorkerConfig

class MyWorker(BaseWorker):
    worker_name = "my_worker"
    worker_config = WorkerConfig(
        queue_names=["my_queue"],
        max_concurrent_jobs=5,      # Max jobs processed simultaneously
        poll_interval=0.5,          # Seconds between queue polls
        job_timeout=300,            # Job timeout in seconds
    )

Job Lifecycle

1. ENQUEUE: Job created via JobService.enqueue_job()
2. QUEUE: Job added to Redis queue (aqw:{queue_name})
3. DEQUEUE: Worker gets job via JobService.get_next_job()
4. PROCESS: Job moved to processing queue, handler executed
5. COMPLETE: Job marked as completed/failed
6. CLEANUP: Old jobs cleaned up automatically

Queue System

Redis key prefixes

AQWorker keeps every Redis key namespaced with the aqw prefix so multiple apps can share the same Redis without collisions. The most important keys are:

  • aqw:{queue_name} – pending FIFO queue for a specific worker queue
  • aqw:processing, aqw:completed, aqw:failed – global processing/completed/failed lists
  • aqw:job:{job_id} – job status hash (timestamps, error state, etc.)
  • aqw:jl:{job_id} – per-job lock key used to ensure only one worker processes the job

All public APIs use helpers from aqworker.constants (get_queue_name, get_job_status_key, get_job_lock_key) so you rarely need to construct these strings manually.

Queue Structure

Redis Keys:
├── aqw:emails        # Email jobs (FIFO)
├── aqw:notifications # Notification jobs (FIFO)
├── aqw:processing    # Jobs currently being processed
├── aqw:completed     # Successfully completed jobs
└── aqw:failed        # Failed jobs

Queue Isolation

  • Each worker processes specific queue names
  • Jobs are isolated by queue name
  • No cross-queue interference
  • Use different queues for different job types

FIFO Processing

  • Simple first-in-first-out queue processing
  • Jobs processed in order of arrival
  • All jobs in a queue are processed equally
  • Use different queue names for different priority levels

Advanced Usage

Custom Job Data

job = await aq_worker.job_service.enqueue_job(
    queue_name="emails",
    handler="email",
    data={
        "recipient": "user@example.com",
        "subject": "Welcome",
        "body": "Welcome!",
        "attachments": ["file1.pdf", "file2.jpg"],
        "priority": "high"
    },
    metadata={
        "source": "api",
        "user_id": "12345",
        "campaign_id": "summer2024"
    },
    max_retries=5,
    retry_delay=60
)

Multiple Workers

# Register multiple workers
aq_worker.register_worker(EmailWorker)
aq_worker.register_worker(NotificationWorker)
aq_worker.register_worker(ReportWorker)

# Start specific aq_worker
worker = aq_worker.create_worker("email")
await worker.run()

Handler with Async Support

class AsyncHandler(BaseHandler):
    name = "async_task"
    
    async def handle(self, data: dict) -> bool:
        # Async operations
        result = await some_async_operation(data)
        return result is not None

Best Practices

1. Job Design

  • Keep jobs small and focused
  • Use descriptive handler names
  • Include all necessary data in data parameter
  • Use metadata for tracking/debugging information

2. Worker Design

  • Implement proper error handling in handlers
  • Use appropriate concurrency levels
  • Monitor job processing time
  • Log important events

3. Queue Management

  • Use descriptive queue names
  • Monitor queue depths
  • Clean up old jobs regularly
  • Balance load across workers

4. Handler Design

  • Make handlers idempotent when possible
  • Handle errors gracefully
  • Return True on success, False on failure
  • Use async handlers for I/O operations

Troubleshooting

Common Issues

  1. Jobs not processing

    • Check worker is running
    • Verify queue names match between enqueue and worker
    • Check Redis connection
    • Verify handler is registered
  2. High memory usage

    • Clean up old jobs: await job_service.cleanup_old_jobs(days=7)
    • Reduce queue depths
    • Optimize job data size
  3. Slow processing

    • Increase worker concurrency (max_concurrent_jobs)
    • Optimize handler logic
    • Check Redis performance
    • Consider multiple workers

Debugging

# Check queue statistics
stats = await aq_worker.job_service.get_queue_stats(["emails"])
print(f"Queue stats: {stats}")

# Get job status
job = await aq_worker.job_service.get_job(job_id)
print(f"Job status: {job.status}")

# List registered workers
workers = aq_worker.get_available_workers()
print(f"Available workers: {workers}")

# List registered handlers
handlers = aq_worker.handler_registry.snapshot()
print(f"Registered handlers: {list(handlers.keys())}")

Examples

The examples/ directory contains end-to-end demos:

examples/simple/

  • Pure Python example with a background thread enqueuing email jobs while a worker (created via create_worker("email")) processes them.
  • Handlers live in examples.simple.aq_worker.handlers and are picked up through AQWorker(include_packages=["examples.simple.aq_worker"]), so you can keep job logic in separate modules without manual registration.
  • Run python -m examples.simple.main to start the enqueue loop and worker concurrently or use the CLI (aqworker start email examples/simple/worker.py) in separate shells.

examples/simple_fastapi/

  • FastAPI service exposing REST endpoints for enqueuing jobs plus health/metadata routes (/handlers, /workers, /jobs/queues/{queue}/stats).
  • Worker definition in examples/simple_fastapi/worker.py registers worker classes and auto-discovers handlers via include_packages=["examples.simple_fastapi.handlers"].
  • examples/simple_fastapi/handlers.py simulates real work with 0.5–2 s asyncio.sleep delays so you can observe concurrent processing.
  • examples/simple_fastapi/client.py is an httpx script that continuously enqueues email + notification jobs every ~0.2 s—perfect for smoke-testing the API while watching worker logs scroll.

Contributing

When adding new features:

  1. Follow async/await patterns
  2. Add proper error handling
  3. Update documentation
  4. Add tests
  5. Consider performance impact

License

This project is licensed under the terms specified in the LICENSE file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aqworker-0.1.2.tar.gz (24.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aqworker-0.1.2-py3-none-any.whl (32.1 kB view details)

Uploaded Python 3

File details

Details for the file aqworker-0.1.2.tar.gz.

File metadata

  • Download URL: aqworker-0.1.2.tar.gz
  • Upload date:
  • Size: 24.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.12 {"installer":{"name":"uv","version":"0.9.12"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for aqworker-0.1.2.tar.gz
Algorithm Hash digest
SHA256 1b718c3da41047e2f41304754b69aef095d99c03b0b82645219904ece0b4db00
MD5 b62c0a4084eb6f3bb1c6190136ddeb14
BLAKE2b-256 44245abbbd746e7f5a38ec44a069231a2340d8e21547670617b64ffab0cd0b05

See more details on using hashes here.

File details

Details for the file aqworker-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: aqworker-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 32.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.12 {"installer":{"name":"uv","version":"0.9.12"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for aqworker-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 634cd6da00182669afed86de8c7b2f01d13c5b322ef3b14a0bc9da4b5a5a652e
MD5 3879c30dd5e61900dee5fa1222406585
BLAKE2b-256 8115f0576d36b993d0b346b7ddbc43c7171d050fbf73e07dcfcf0c6ae68348e8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page