Skip to main content

A Python library for task management with RabbitMQ and polling support

Project description

Task Worker Library

A Python library for task management with RabbitMQ and polling support. This library provides a unified interface for consuming tasks from a task management API with automatic fallback between RabbitMQ and polling modes.

Features

  • Dual Mode Support: RabbitMQ with manual acknowledgment/nack and polling fallback
  • Automatic Fallback: Seamlessly switches to polling if RabbitMQ is unavailable
  • Manual Acknowledgment: Proper message handling with ack/nack for RabbitMQ
  • Configurable: Flexible configuration for different environments
  • Error Handling: Robust error handling with retry mechanisms
  • Logging: Comprehensive logging for debugging and monitoring

Installation

pip install task-worker

For development:

pip install task-worker[dev]

Quick Start

import logging
from task_worker import TaskWorker, TaskWorkerConfig

# Configure logging
logging.basicConfig(level=logging.INFO)

# Define your task processing function
def process_my_task(task):
    """Your custom task processing logic"""
    task_id = task.get('id')
    metadata = task.get('metadata', {})
    
    print(f"Processing task {task_id}: {metadata}")
    
    # Your business logic here
    
    # The library handles task registration and result saving
    # You just need to process the task data
    return result;

# Configure the worker
config = TaskWorkerConfig(
    task_management_endpoint="https://your-task-api.com",
    rabbitmq_consumer_host="amqps://user:pass@your-rabbitmq.com/vhost",
    task_type="your-task-type",
    worker_name="my-worker",
    polling_interval=5
)

# Create and start the worker
worker = TaskWorker(config, process_my_task)
worker.start()  # This will run indefinitely

Configuration

TaskWorkerConfig Parameters

Parameter Type Default Description
task_management_endpoint str ENV or default URL Task management API endpoint
rabbitmq_consumer_host str ENV or None RabbitMQ connection URL
task_type str "default" Type of tasks to process
worker_name str hostname Worker identifier
polling_interval int 5 Seconds between polls in polling mode
rabbitmq_queue_prefix str "task.available" RabbitMQ queue prefix
prefetch_count int 1 RabbitMQ prefetch count
max_retry_attempts int 3 Max retry attempts for API calls

Environment Variables

The library automatically reads from these environment variables:

  • TASK_MANAGEMENT_ENDPOINT: Task management API URL
  • RABBITMQ_CONSUMER_HOST: RabbitMQ connection string

Advanced Usage

Status Monitoring

worker = TaskWorker(config, process_task)

# Get worker status
status = worker.get_status()
print(f"Mode: {status['mode']}")
print(f"Task Type: {status['task_type']}")
print(f"RabbitMQ Connected: {status['rabbitmq_connected']}")

# Check if running in RabbitMQ mode
if worker.is_running_rabbitmq_mode():
    print("Running with RabbitMQ")
else:
    print("Running in polling mode")

Error Handling

The library provides robust error handling:

RabbitMQ Mode

  • Message Parsing Errors: Messages are nacked without requeue to prevent infinite loops
  • Task Processing Errors: Messages are nacked with requeue for retry by other workers
  • Connection Failures: Automatic fallback to polling mode

Polling Mode

  • API Failures: Logged and retried after polling interval
  • Processing Errors: Logged (no requeue mechanism in polling)

Message Format

Expected RabbitMQ message format:

{
  "taskType": "your-task-type",
  "metadata": {
    "additional": "data"
  }
}

Task API response format:

{
  "data": {
    "id": "task-id-123",
    "taskType": "your-task-type", 
    "metadata": {
        "metadata": "metadata"
    },
    "webhookApi": "https://callback-url.com"
  }
}

License

MIT License

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Submit a pull request

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qode_task_worker-1.0.3.tar.gz (8.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qode_task_worker-1.0.3-py3-none-any.whl (10.0 kB view details)

Uploaded Python 3

File details

Details for the file qode_task_worker-1.0.3.tar.gz.

File metadata

  • Download URL: qode_task_worker-1.0.3.tar.gz
  • Upload date:
  • Size: 8.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.5

File hashes

Hashes for qode_task_worker-1.0.3.tar.gz
Algorithm Hash digest
SHA256 2d0f6ac2daa7b39e004c1e66319c0a5b8e4f3227d2d3562835b4d74281af3b99
MD5 e2834cabba9e2fbc0f4ff2fb1706ddd7
BLAKE2b-256 fae96cb4c0951d81347fb63c198b2b0ad85c27391a00eb29e122b2431fd3a824

See more details on using hashes here.

File details

Details for the file qode_task_worker-1.0.3-py3-none-any.whl.

File metadata

File hashes

Hashes for qode_task_worker-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 312ceb59a57f5cab2751f0ddae76fca3d5fbb5589e6aa41084075a0df87c5154
MD5 482edb32e17afdcba0a44c6c88e1ab2c
BLAKE2b-256 ceec2ecdc14ab06abcba74eee4a938b6ad5190aabadb737b276cb30a37e0148f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page