Async Redis-backed job runner for Python apps. AQWorker orchestrates workers, handlers, and queues with auto-discovery, retries, CLI tooling, and FastAPI-friendly async processing while staying framework agnostic.
Project description
AQWorker - Async Queue Worker
Overview
AQWorker is a Redis-based background job processing system for Python applications. It provides a simple, efficient way to handle asynchronous tasks with support for multiple worker types, queue isolation, and job status tracking. While it works seamlessly with FastAPI, it's a standalone package that can be used with any Python application.
Features
- Framework Agnostic: Works with FastAPI, Django, Flask, or any Python application
- Async/Await Support: Built with async/await for modern Python applications
- Redis-based: Uses Redis for reliable job queue management
- Worker Registry: Centralized worker management with automatic discovery
- Handler Registry: Organized handler system for job processing
- Handler Auto-Discovery: Import handler packages automatically via
include_packages - CLI Tools: Command-line interface for managing workers and jobs
- Queue Isolation: Separate queues for different job types
- Job Status Tracking: Track job lifecycle (PENDING, PROCESSING, COMPLETED, FAILED, CANCELLED)
- Automatic Retries: Built-in retry mechanism with configurable delays
Architecture
┌─────────────┐ get_next_job() ┌─────────────┐ dequeue() ┌─────────────┐
│ Worker │ ────────────────────►│ JobService │ ──────────────► │ JobQueue │
│ │ │ │ │ │
│ │ ◄────────────────────│ │ ◄────────────── │ │
└─────────────┘ complete_job() └─────────────┘ complete_job()└─────────────┘
┌─────────────┐ enqueue_job() ┌─────────────┐ enqueue() ┌─────────────┐
│ Application │ ────────────────────►│ AQWorker │ ──────────────► │ JobQueue │
│ (FastAPI/ │ │ │ │ │
│ Django/ │ │ - Worker │ │ │
│ Flask/ │ │ Registry │ │ │
│ Script) │ │ - Handler │ │ │
│ │ │ Registry │ │ │
└─────────────┘ │ - Job │ └─────────────┘
│ Service │
└─────────────┘
Components
1. AQWorker (core.py)
- Purpose: Main orchestrator class that manages workers, handlers, and job service
- Features:
- Worker and handler registration
- Worker instance creation
- Job service integration
- Centralized configuration
2. WorkerRegistry (worker/registry.py)
- Purpose: Manages worker class registrations
- Features:
- Register worker classes with names
- List available workers
- Get worker definitions
3. HandlerRegistry (handler/registry.py)
- Purpose: Manages handler class registrations
- Features:
- Register handler classes
- Lookup handlers by name
- Snapshot of all registered handlers
4. JobService (job/service.py)
- Purpose: High-level service layer for job operations
- Features:
- Enqueue jobs
- Dequeue jobs for workers
- Get job status
- Queue statistics
- Job lifecycle management
5. JobQueue (job/queue.py)
- Purpose: Low-level Redis operations for job management
- Features:
- FIFO queue processing
- Async/await support
- Job status tracking
- Redis operations: enqueue, dequeue, complete_job
6. BaseWorker (worker/base.py)
- Purpose: Base class for all workers
- Features:
- Automatic job polling
- Concurrent job processing
- Error handling and retries
- Health monitoring
7. BaseHandler (handler/base.py)
- Purpose: Base class for all job handlers
- Features:
- Async/sync handler support
- Standardized job processing interface
Installation
pip install aqworker
Or install from source:
git clone <repository>
cd aqworker
pip install -e .
Quick Start
1. Define Handlers
# handlers.py
from aqworker import BaseHandler
class EmailHandler(BaseHandler):
name = "email"
async def handle(self, data: dict) -> bool:
recipient = data.get("recipient")
subject = data.get("subject")
body = data.get("body")
# Send email logic here
print(f"Sending email to {recipient}: {subject}")
return True
You can still use the @aq_worker.handler decorator for function-based handlers—just make
sure the module containing the decorator is imported so the handler gets registered.
2. Define Workers
# workers.py
from aqworker import BaseWorker, WorkerConfig
class EmailWorker(BaseWorker):
worker_name = "email"
worker_config = WorkerConfig(
queue_names=["emails"],
max_concurrent_jobs=3,
poll_interval=0.5,
)
3. Initialize AQWorker
# aq_worker.py
from aqworker import AQWorker
from aqworker.job.service import JobService
from workers import EmailWorker
from handlers import EmailHandler # optional when using include_packages
# Create JobService
job_service = JobService()
# Initialize AQWorker and auto-import handlers package
aq_worker = AQWorker(include_packages=["handlers"])
# Register workers and handlers
aq_worker.register_worker(EmailWorker)
# Handlers inside "handlers" will be discovered automatically, but you can still
# register manually if desired:
# aq_worker.register_handler(EmailHandler)
# Connect job service
aq_worker.listen(job_service)
Automatic handler discovery
If your handlers are spread across multiple modules, you can tell AQWorker to
import them automatically by passing include_packages. Every module inside
those packages will be imported once at startup, so any @aq_worker.handler
decorators (or subclasses of BaseHandler) run and register themselves:
aq_worker = AQWorker(include_packages=[
"my_project.workers",
"examples.simple.aq_worker",
])
You can also call aq_worker.autodiscover_handlers([...]) later if you need to
load additional packages dynamically. After discovery completes AQWorker logs the
full list of available handlers so you can confirm everything loaded correctly.
4. Enqueue Jobs
# In your application
from worker import aq_worker
# Enqueue a job
job = await aq_worker.job_service.enqueue_job(
queue_name="emails",
handler="email",
data={
"recipient": "user@example.com",
"subject": "Welcome!",
"body": "Welcome to our service!"
}
)
5. Run Workers
Using CLI:
# Option 1: Pass file path
aqworker start email aq_worker.py
# Option 2: Use environment variable
export AQWORKER_FILE=aq_worker.py
aqworker start email
Using Python:
import asyncio
from worker import aq_worker
async def main():
worker = aq_worker.create_worker("email")
await worker.run()
asyncio.run(main())
Usage Examples
Standalone Python Application
# main.py
import asyncio
from worker import aq_worker
async def main():
# Enqueue jobs
for i in range(10):
job = await aq_worker.job_service.enqueue_job(
queue_name="emails",
handler="email",
data={"recipient": f"user{i}@example.com", "subject": f"Email {i}"}
)
print(f"Enqueued job: {job.id}")
# Start aq_worker
worker = aq_worker.create_worker("email")
await worker.run()
asyncio.run(main())
FastAPI Integration
The repo ships with a complete FastAPI + AQWorker demo under examples/simple_fastapi/.
Key files:
worker.py– configuresAQWorker, registers the example workers, and wires up the sharedJobService.main.py– FastAPI app that exposes/jobs/email,/jobs/notification, queue stats, and discovery endpoints.client.py– small CLI client that calls the HTTP endpoints so you can watch jobs flow through the queues.
Minimal excerpt of the worker wiring:
# examples/simple_fastapi/worker.py
from aqworker import AQWorker
from workers import EmailWorker, NotificationWorker
from job_service import job_service
aq_worker = AQWorker(include_packages=["handlers"])
aq_worker.register_worker(EmailWorker)
aq_worker.register_worker(NotificationWorker)
aq_worker.listen(job_service)
To run the demo API and fire sample requests:
cd examples/simple_fastapi
uv run python main.py # start FastAPI
# in another shell: send a job via the bundled client
uv run python client.py --recipient user@example.com --subject Test --body "Hello!"
Django Integration
# views.py
from django.http import JsonResponse
from worker import aq_worker
import asyncio
async def send_email_view(request):
job = await aq_worker.job_service.enqueue_job(
queue_name="emails",
handler="email",
data={
"recipient": request.POST.get("recipient"),
"subject": request.POST.get("subject"),
"body": request.POST.get("body")
}
)
return JsonResponse({"job_id": job.id})
# Use asyncio.run or Django's async support
CLI Commands
AQWorker provides a CLI tool for managing workers and jobs:
List Commands
# List available workers
aqworker list:aq_worker [file_path]
# List registered handlers
aqworker list:handlers [file_path]
# List queues
aqworker list:queue [file_path]
Worker Commands
# Start a aq_worker
aqworker start <worker_name> [file_path]
# Get queue statistics
aqworker stats <queue_name> [file_path]
Using Environment Variable
# Set AQWorker file path
export AQWORKER_FILE=aq_worker.py
# Use commands without file path
aqworker list:aq_worker
aqworker start email
aqworker stats emails
Note: The file path should point to a Python file containing an AQWorker instance. The CLI automatically finds the instance (looking for variables named aq_worker, worker, aqworker, or aq).
Configuration
Environment Variables
# Redis Configuration (used when creating JobService)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
# AQWorker CLI
AQWORKER_FILE=aq_worker.py # Path to file containing AQWorker instance
Worker Configuration
from aqworker import WorkerConfig
class MyWorker(BaseWorker):
worker_name = "my_worker"
worker_config = WorkerConfig(
queue_names=["my_queue"],
max_concurrent_jobs=5, # Max jobs processed simultaneously
poll_interval=0.5, # Seconds between queue polls
job_timeout=300, # Job timeout in seconds
)
Job Lifecycle
1. ENQUEUE: Job created via JobService.enqueue_job()
2. QUEUE: Job added to Redis queue (aqw:{queue_name})
3. DEQUEUE: Worker gets job via JobService.get_next_job()
4. PROCESS: Job moved to processing queue, handler executed
5. COMPLETE: Job marked as completed/failed
6. CLEANUP: Old jobs cleaned up automatically
Queue System
Redis key prefixes
AQWorker keeps every Redis key namespaced with the aqw prefix so multiple apps can
share the same Redis without collisions. The most important keys are:
aqw:{queue_name}– pending FIFO queue for a specific worker queueaqw:processing,aqw:completed,aqw:failed– global processing/completed/failed listsaqw:job:{job_id}– job status hash (timestamps, error state, etc.)aqw:jl:{job_id}– per-job lock key used to ensure only one worker processes the job
All public APIs use helpers from aqworker.constants (get_queue_name,
get_job_status_key, get_job_lock_key) so you rarely need to construct these strings
manually.
Queue Structure
Redis Keys:
├── aqw:emails # Email jobs (FIFO)
├── aqw:notifications # Notification jobs (FIFO)
├── aqw:processing # Jobs currently being processed
├── aqw:completed # Successfully completed jobs
└── aqw:failed # Failed jobs
Queue Isolation
- Each worker processes specific queue names
- Jobs are isolated by queue name
- No cross-queue interference
- Use different queues for different job types
FIFO Processing
- Simple first-in-first-out queue processing
- Jobs processed in order of arrival
- All jobs in a queue are processed equally
- Use different queue names for different priority levels
Advanced Usage
Custom Job Data
job = await aq_worker.job_service.enqueue_job(
queue_name="emails",
handler="email",
data={
"recipient": "user@example.com",
"subject": "Welcome",
"body": "Welcome!",
"attachments": ["file1.pdf", "file2.jpg"],
"priority": "high"
},
metadata={
"source": "api",
"user_id": "12345",
"campaign_id": "summer2024"
},
max_retries=5,
retry_delay=60
)
Multiple Workers
# Register multiple workers
aq_worker.register_worker(EmailWorker)
aq_worker.register_worker(NotificationWorker)
aq_worker.register_worker(ReportWorker)
# Start specific aq_worker
worker = aq_worker.create_worker("email")
await worker.run()
Handler with Async Support
class AsyncHandler(BaseHandler):
name = "async_task"
async def handle(self, data: dict) -> bool:
# Async operations
result = await some_async_operation(data)
return result is not None
Best Practices
1. Job Design
- Keep jobs small and focused
- Use descriptive handler names
- Include all necessary data in
dataparameter - Use
metadatafor tracking/debugging information
2. Worker Design
- Implement proper error handling in handlers
- Use appropriate concurrency levels
- Monitor job processing time
- Log important events
3. Queue Management
- Use descriptive queue names
- Monitor queue depths
- Clean up old jobs regularly
- Balance load across workers
4. Handler Design
- Make handlers idempotent when possible
- Handle errors gracefully
- Return
Trueon success,Falseon failure - Use async handlers for I/O operations
Troubleshooting
Common Issues
-
Jobs not processing
- Check worker is running
- Verify queue names match between enqueue and worker
- Check Redis connection
- Verify handler is registered
-
High memory usage
- Clean up old jobs:
await job_service.cleanup_old_jobs(days=7) - Reduce queue depths
- Optimize job data size
- Clean up old jobs:
-
Slow processing
- Increase worker concurrency (
max_concurrent_jobs) - Optimize handler logic
- Check Redis performance
- Consider multiple workers
- Increase worker concurrency (
Debugging
# Check queue statistics
stats = await aq_worker.job_service.get_queue_stats(["emails"])
print(f"Queue stats: {stats}")
# Get job status
job = await aq_worker.job_service.get_job(job_id)
print(f"Job status: {job.status}")
# List registered workers
workers = aq_worker.get_available_workers()
print(f"Available workers: {workers}")
# List registered handlers
handlers = aq_worker.handler_registry.snapshot()
print(f"Registered handlers: {list(handlers.keys())}")
Examples
The examples/ directory contains end-to-end demos:
examples/simple/
- Pure Python example with a background thread enqueuing email jobs while a worker
(created via
create_worker("email")) processes them. - Handlers live in
examples.simple.aq_worker.handlersand are picked up throughAQWorker(include_packages=["examples.simple.aq_worker"]), so you can keep job logic in separate modules without manual registration. - Run
python -m examples.simple.mainto start the enqueue loop and worker concurrently or use the CLI (aqworker start email examples/simple/worker.py) in separate shells.
examples/simple_fastapi/
- FastAPI service exposing REST endpoints for enqueuing jobs plus health/metadata
routes (
/handlers,/workers,/jobs/queues/{queue}/stats). - Worker definition in
examples/simple_fastapi/worker.pyregisters worker classes and auto-discovers handlers viainclude_packages=["examples.simple_fastapi.handlers"]. examples/simple_fastapi/handlers.pysimulates real work with 0.5–2 sasyncio.sleepdelays so you can observe concurrent processing.examples/simple_fastapi/client.pyis anhttpxscript that continuously enqueues email + notification jobs every ~0.2 s—perfect for smoke-testing the API while watching worker logs scroll.
Contributing
When adding new features:
- Follow async/await patterns
- Add proper error handling
- Update documentation
- Add tests
- Consider performance impact
License
This project is licensed under the terms specified in the LICENSE file.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aqworker-0.1.3.tar.gz.
File metadata
- Download URL: aqworker-0.1.3.tar.gz
- Upload date:
- Size: 24.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.12 {"installer":{"name":"uv","version":"0.9.12"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5848d0004d516fae34ff30b98775ff8361e9c32919bbe22a14722d4a88d5eb2f
|
|
| MD5 |
76982b9bf3105119c37dfe75d9adc9aa
|
|
| BLAKE2b-256 |
c4174de91439b738555a64f17d8d72097e2aa782096df22f54c07378a2952865
|
File details
Details for the file aqworker-0.1.3-py3-none-any.whl.
File metadata
- Download URL: aqworker-0.1.3-py3-none-any.whl
- Upload date:
- Size: 32.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.12 {"installer":{"name":"uv","version":"0.9.12"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
adeca296b454181df72f7c122193536303cb3f173f14b1c81db44aa3a5d60e2f
|
|
| MD5 |
fc4586079904bf40e78ceaf7de422a01
|
|
| BLAKE2b-256 |
8f2e71871365289ebb69fed67aed35378444d690fc86bd5d4f950052850e1980
|