A Python library for task management with RabbitMQ and polling support
Project description
Task Worker Library
A Python library for task management with RabbitMQ and polling support. This library provides a unified interface for consuming tasks from a task management API with automatic fallback between RabbitMQ and polling modes.
Features
- Dual Mode Support: RabbitMQ with manual acknowledgment/nack and polling fallback
- Automatic Fallback: Seamlessly switches to polling if RabbitMQ is unavailable
- Manual Acknowledgment: Proper message handling with ack/nack for RabbitMQ
- Configurable: Flexible configuration for different environments
- Error Handling: Robust error handling with retry mechanisms
- Logging: Comprehensive logging for debugging and monitoring
Installation
pip install task-worker
For development:
pip install task-worker[dev]
Quick Start
import logging
from task_worker import TaskWorker, TaskWorkerConfig
# Configure logging
logging.basicConfig(level=logging.INFO)
# Define your task processing function
def process_my_task(task):
"""Your custom task processing logic"""
task_id = task.get('id')
metadata = task.get('metadata', {})
print(f"Processing task {task_id}: {metadata}")
# Your business logic here
# ...
# The library handles task registration and result saving
# You just need to process the task data
# Configure the worker
config = TaskWorkerConfig(
task_management_endpoint="https://your-task-api.com",
rabbitmq_consumer_host="amqps://user:pass@your-rabbitmq.com/vhost",
task_type="your-task-type",
worker_name="my-worker",
polling_interval=5
)
# Create and start the worker
worker = TaskWorker(config, process_my_task)
worker.start() # This will run indefinitely
Configuration
TaskWorkerConfig Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
task_management_endpoint |
str | ENV or default URL | Task management API endpoint |
rabbitmq_consumer_host |
str | ENV or None | RabbitMQ connection URL |
task_type |
str | "default" | Type of tasks to process |
worker_name |
str | hostname | Worker identifier |
polling_interval |
int | 5 | Seconds between polls in polling mode |
rabbitmq_queue_prefix |
str | "task.available" | RabbitMQ queue prefix |
prefetch_count |
int | 1 | RabbitMQ prefetch count |
max_retry_attempts |
int | 3 | Max retry attempts for API calls |
Environment Variables
The library automatically reads from these environment variables:
TASK_MANAGEMENT_ENDPOINT: Task management API URLRABBITMQ_CONSUMER_HOST: RabbitMQ connection string
Advanced Usage
Custom Task Result Saving
from task_worker import TaskWorker, TaskWorkerConfig, TaskClient
config = TaskWorkerConfig(
task_management_endpoint="https://your-api.com",
task_type="processing"
)
task_client = TaskClient(config)
def advanced_task_processor(task):
task_id = task.get('id')
try:
# Your processing logic
result_data = {
"meetingId": task.get('metadata', {}).get('meetingId'),
"taskName": task.get('taskType'),
"taskId": task_id,
"moduleName": "YOUR_MODULE",
"result": {"status": "completed", "data": "..."},
"count": 1,
}
# Save result manually if needed
task_client.save_task_result(
task_id=task_id,
task_data=result_data,
task_type=task.get('taskType', ''),
task_webhook_api=task.get('webhookApi', '')
)
except Exception as e:
logging.error(f"Processing failed for task {task_id}: {e}")
raise # Re-raise to trigger nack in RabbitMQ mode
worker = TaskWorker(config, advanced_task_processor)
worker.start()
RabbitMQ Only Mode
from task_worker import RabbitMQWorker, TaskWorkerConfig
config = TaskWorkerConfig(
task_management_endpoint="https://your-api.com",
rabbitmq_consumer_host="amqps://user:pass@rabbitmq.com/vhost",
task_type="urgent-tasks"
)
def process_urgent_task(task):
# Process urgent tasks only via RabbitMQ
pass
# Use RabbitMQ worker directly (no polling fallback)
rabbitmq_worker = RabbitMQWorker(config, process_urgent_task)
if rabbitmq_worker.connect():
rabbitmq_worker.start_consuming()
else:
print("Failed to connect to RabbitMQ")
Status Monitoring
worker = TaskWorker(config, process_task)
# Get worker status
status = worker.get_status()
print(f"Mode: {status['mode']}")
print(f"Task Type: {status['task_type']}")
print(f"RabbitMQ Connected: {status['rabbitmq_connected']}")
# Check if running in RabbitMQ mode
if worker.is_running_rabbitmq_mode():
print("Running with RabbitMQ")
else:
print("Running in polling mode")
Error Handling
The library provides robust error handling:
RabbitMQ Mode
- Message Parsing Errors: Messages are nacked without requeue to prevent infinite loops
- Task Processing Errors: Messages are nacked with requeue for retry by other workers
- Connection Failures: Automatic fallback to polling mode
Polling Mode
- API Failures: Logged and retried after polling interval
- Processing Errors: Logged (no requeue mechanism in polling)
Custom Error Handling
def robust_task_processor(task):
try:
# Your processing logic
process_task_logic(task)
except TemporaryError as e:
logging.warning(f"Temporary error, will retry: {e}")
raise # This will trigger nack with requeue in RabbitMQ
except PermanentError as e:
logging.error(f"Permanent error, not retrying: {e}")
# Don't raise - this will ack the message
except Exception as e:
logging.error(f"Unknown error: {e}")
raise # Let the library handle it
Message Format
Expected RabbitMQ message format:
{
"taskType": "your-task-type",
"metadata": {
"additional": "data"
}
}
Task API response format:
{
"data": {
"id": "task-id-123",
"taskType": "your-task-type",
"metadata": {
"meetingId": "meeting-123",
"cheatingType": "browser",
"batchTranscript": "..."
},
"webhookApi": "https://callback-url.com"
}
}
Testing
# For testing, you can mock the task processor
def mock_task_processor(task):
print(f"Mock processing task: {task}")
config = TaskWorkerConfig(
task_management_endpoint="http://localhost:4000",
task_type="test",
polling_interval=1 # Faster polling for testing
)
worker = TaskWorker(config, mock_task_processor)
worker.start()
License
MIT License
Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file qode_task_worker-1.0.2.tar.gz.
File metadata
- Download URL: qode_task_worker-1.0.2.tar.gz
- Upload date:
- Size: 8.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
25267d61ad262b15bc0532f77464873f7538669003ccc6b3996a11c26f1e20bb
|
|
| MD5 |
38e117215f1fde84d0895cc4f0f6dbeb
|
|
| BLAKE2b-256 |
6bc8b975fa8612a15a8aeea351ef5dc6fe87ae08eb4dce9d7e56b11f20f24d1a
|
File details
Details for the file qode_task_worker-1.0.2-py3-none-any.whl.
File metadata
- Download URL: qode_task_worker-1.0.2-py3-none-any.whl
- Upload date:
- Size: 10.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ddc88055de11cb02fc829034618a3f89d53aec4e6c9d976078c074b45e594a66
|
|
| MD5 |
e2500838e569ef624e718143c518dbb0
|
|
| BLAKE2b-256 |
5a6b09ca327d3dc1f3f45011afa75ac6eb3948028720c36f6c8651b4ed1fc9d8
|