Skip to main content

A Python library for task management with RabbitMQ and polling support

Project description

Task Worker Library

A Python library for task management with RabbitMQ and polling support. This library provides a unified interface for consuming tasks from a task management API with automatic fallback between RabbitMQ and polling modes.

Features

  • Dual Mode Support: RabbitMQ with manual acknowledgment/nack and polling fallback
  • Automatic Fallback: Seamlessly switches to polling if RabbitMQ is unavailable
  • Manual Acknowledgment: Proper message handling with ack/nack for RabbitMQ
  • Configurable: Flexible configuration for different environments
  • Error Handling: Robust error handling with retry mechanisms
  • Logging: Comprehensive logging for debugging and monitoring

Installation

pip install task-worker

For development:

pip install task-worker[dev]

Quick Start

import logging
from task_worker import TaskWorker, TaskWorkerConfig

# Configure logging
logging.basicConfig(level=logging.INFO)

# Define your task processing function
def process_my_task(task):
    """Your custom task processing logic"""
    task_id = task.get('id')
    metadata = task.get('metadata', {})
    
    print(f"Processing task {task_id}: {metadata}")
    
    # Your business logic here
    # ...
    
    # The library handles task registration and result saving
    # You just need to process the task data

# Configure the worker
config = TaskWorkerConfig(
    task_management_endpoint="https://your-task-api.com",
    rabbitmq_consumer_host="amqps://user:pass@your-rabbitmq.com/vhost",
    task_type="your-task-type",
    worker_name="my-worker",
    polling_interval=5
)

# Create and start the worker
worker = TaskWorker(config, process_my_task)
worker.start()  # This will run indefinitely

Configuration

TaskWorkerConfig Parameters

Parameter Type Default Description
task_management_endpoint str ENV or default URL Task management API endpoint
rabbitmq_consumer_host str ENV or None RabbitMQ connection URL
task_type str "default" Type of tasks to process
worker_name str hostname Worker identifier
polling_interval int 5 Seconds between polls in polling mode
rabbitmq_queue_prefix str "task.available" RabbitMQ queue prefix
prefetch_count int 1 RabbitMQ prefetch count
max_retry_attempts int 3 Max retry attempts for API calls

Environment Variables

The library automatically reads from these environment variables:

  • TASK_MANAGEMENT_ENDPOINT: Task management API URL
  • RABBITMQ_CONSUMER_HOST: RabbitMQ connection string

Advanced Usage

Custom Task Result Saving

from task_worker import TaskWorker, TaskWorkerConfig, TaskClient

config = TaskWorkerConfig(
    task_management_endpoint="https://your-api.com",
    task_type="processing"
)

task_client = TaskClient(config)

def advanced_task_processor(task):
    task_id = task.get('id')
    
    try:
        # Your processing logic
        result_data = {
            "meetingId": task.get('metadata', {}).get('meetingId'),
            "taskName": task.get('taskType'),
            "taskId": task_id,
            "moduleName": "YOUR_MODULE",
            "result": {"status": "completed", "data": "..."},
            "count": 1,
        }
        
        # Save result manually if needed
        task_client.save_task_result(
            task_id=task_id,
            task_data=result_data,
            task_type=task.get('taskType', ''),
            task_webhook_api=task.get('webhookApi', '')
        )
        
    except Exception as e:
        logging.error(f"Processing failed for task {task_id}: {e}")
        raise  # Re-raise to trigger nack in RabbitMQ mode

worker = TaskWorker(config, advanced_task_processor)
worker.start()

RabbitMQ Only Mode

from task_worker import RabbitMQWorker, TaskWorkerConfig

config = TaskWorkerConfig(
    task_management_endpoint="https://your-api.com",
    rabbitmq_consumer_host="amqps://user:pass@rabbitmq.com/vhost",
    task_type="urgent-tasks"
)

def process_urgent_task(task):
    # Process urgent tasks only via RabbitMQ
    pass

# Use RabbitMQ worker directly (no polling fallback)
rabbitmq_worker = RabbitMQWorker(config, process_urgent_task)

if rabbitmq_worker.connect():
    rabbitmq_worker.start_consuming()
else:
    print("Failed to connect to RabbitMQ")

Status Monitoring

worker = TaskWorker(config, process_task)

# Get worker status
status = worker.get_status()
print(f"Mode: {status['mode']}")
print(f"Task Type: {status['task_type']}")
print(f"RabbitMQ Connected: {status['rabbitmq_connected']}")

# Check if running in RabbitMQ mode
if worker.is_running_rabbitmq_mode():
    print("Running with RabbitMQ")
else:
    print("Running in polling mode")

Error Handling

The library provides robust error handling:

RabbitMQ Mode

  • Message Parsing Errors: Messages are nacked without requeue to prevent infinite loops
  • Task Processing Errors: Messages are nacked with requeue for retry by other workers
  • Connection Failures: Automatic fallback to polling mode

Polling Mode

  • API Failures: Logged and retried after polling interval
  • Processing Errors: Logged (no requeue mechanism in polling)

Custom Error Handling

def robust_task_processor(task):
    try:
        # Your processing logic
        process_task_logic(task)
        
    except TemporaryError as e:
        logging.warning(f"Temporary error, will retry: {e}")
        raise  # This will trigger nack with requeue in RabbitMQ
        
    except PermanentError as e:
        logging.error(f"Permanent error, not retrying: {e}")
        # Don't raise - this will ack the message
        
    except Exception as e:
        logging.error(f"Unknown error: {e}")
        raise  # Let the library handle it

Message Format

Expected RabbitMQ message format:

{
  "taskType": "your-task-type",
  "metadata": {
    "additional": "data"
  }
}

Task API response format:

{
  "data": {
    "id": "task-id-123",
    "taskType": "your-task-type", 
    "metadata": {
      "meetingId": "meeting-123",
      "cheatingType": "browser",
      "batchTranscript": "..."
    },
    "webhookApi": "https://callback-url.com"
  }
}

Testing

# For testing, you can mock the task processor
def mock_task_processor(task):
    print(f"Mock processing task: {task}")

config = TaskWorkerConfig(
    task_management_endpoint="http://localhost:4000",
    task_type="test",
    polling_interval=1  # Faster polling for testing
)

worker = TaskWorker(config, mock_task_processor)
worker.start()

License

MIT License

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Submit a pull request

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qode_task_worker-1.0.2.tar.gz (8.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qode_task_worker-1.0.2-py3-none-any.whl (10.7 kB view details)

Uploaded Python 3

File details

Details for the file qode_task_worker-1.0.2.tar.gz.

File metadata

  • Download URL: qode_task_worker-1.0.2.tar.gz
  • Upload date:
  • Size: 8.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.5

File hashes

Hashes for qode_task_worker-1.0.2.tar.gz
Algorithm Hash digest
SHA256 25267d61ad262b15bc0532f77464873f7538669003ccc6b3996a11c26f1e20bb
MD5 38e117215f1fde84d0895cc4f0f6dbeb
BLAKE2b-256 6bc8b975fa8612a15a8aeea351ef5dc6fe87ae08eb4dce9d7e56b11f20f24d1a

See more details on using hashes here.

File details

Details for the file qode_task_worker-1.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for qode_task_worker-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 ddc88055de11cb02fc829034618a3f89d53aec4e6c9d976078c074b45e594a66
MD5 e2500838e569ef624e718143c518dbb0
BLAKE2b-256 5a6b09ca327d3dc1f3f45011afa75ac6eb3948028720c36f6c8651b4ed1fc9d8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page