Skip to main content

A Python library for task management with RabbitMQ and polling support

Project description

Task Worker Library

A Python library for task management with RabbitMQ and polling support. This library provides a unified interface for consuming tasks from a task management API with automatic fallback between RabbitMQ and polling modes.

Features

  • Dual Mode Support: RabbitMQ with manual acknowledgment/nack and polling fallback
  • Automatic Fallback: Seamlessly switches to polling if RabbitMQ is unavailable
  • Manual Acknowledgment: Proper message handling with ack/nack for RabbitMQ
  • Configurable: Flexible configuration for different environments
  • Error Handling: Robust error handling with retry mechanisms
  • Logging: Comprehensive logging for debugging and monitoring

Installation

pip install qode-task-worker

For development:

pip install qode-task-worker[dev]

Quick Start

import logging
from task_worker import TaskWorker, TaskWorkerConfig

# Configure logging
logging.basicConfig(level=logging.INFO)

# Define your task processing function
def process_my_task(task):
    """Your custom task processing logic"""
    task_id = task.get('id')
    metadata = task.get('metadata', {})
    
    print(f"Processing task {task_id}: {metadata}")
    
    # Your business logic here
    
    # The library handles task registration and result saving
    # You just need to process the task data
    return result;

# Configure the worker
config = TaskWorkerConfig(
    task_management_endpoint="https://your-task-api.com",
    rabbitmq_consumer_host="amqps://user:pass@your-rabbitmq.com/vhost",
    task_type="your-task-type",
    worker_name="my-worker",
    polling_interval=5
)

# Create and start the worker
worker = TaskWorker(config, process_my_task)
worker.start()  # This will run indefinitely

Configuration

TaskWorkerConfig Parameters

Parameter Type Default Description
task_management_endpoint str ENV or default URL Task management API endpoint
rabbitmq_consumer_host str ENV or None RabbitMQ connection URL
task_type str "default" Type of tasks to process
worker_name str hostname Worker identifier
polling_interval int 5 Seconds between polls in polling mode
rabbitmq_queue_prefix str "task.available" RabbitMQ queue prefix
prefetch_count int 1 RabbitMQ prefetch count
max_retry_attempts int 3 Max retry attempts for API calls

Environment Variables

The library automatically reads from these environment variables:

  • TASK_MANAGEMENT_ENDPOINT: Task management API URL
  • RABBITMQ_CONSUMER_HOST: RabbitMQ connection string

Advanced Usage

Status Monitoring

worker = TaskWorker(config, process_task)

# Get worker status
status = worker.get_status()
print(f"Mode: {status['mode']}")
print(f"Task Type: {status['task_type']}")
print(f"RabbitMQ Connected: {status['rabbitmq_connected']}")

# Check if running in RabbitMQ mode
if worker.is_running_rabbitmq_mode():
    print("Running with RabbitMQ")
else:
    print("Running in polling mode")

Error Handling

The library provides robust error handling:

RabbitMQ Mode

  • Message Parsing Errors: Messages are nacked without requeue to prevent infinite loops
  • Task Processing Errors: Messages are nacked with requeue for retry by other workers
  • Connection Failures: Automatic fallback to polling mode

Polling Mode

  • API Failures: Logged and retried after polling interval
  • Processing Errors: Logged (no requeue mechanism in polling)

Message Format

Expected RabbitMQ message format:

{
  "taskType": "your-task-type",
  "metadata": {
    "additional": "data"
  }
}

Task API response format:

{
  "data": {
    "id": "task-id-123",
    "taskType": "your-task-type", 
    "metadata": {
        "metadata": "metadata"
    },
    "webhookApi": "https://callback-url.com"
  }
}

License

MIT License

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Submit a pull request

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qode_task_worker-1.0.4.tar.gz (9.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qode_task_worker-1.0.4-py3-none-any.whl (10.1 kB view details)

Uploaded Python 3

File details

Details for the file qode_task_worker-1.0.4.tar.gz.

File metadata

  • Download URL: qode_task_worker-1.0.4.tar.gz
  • Upload date:
  • Size: 9.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.6

File hashes

Hashes for qode_task_worker-1.0.4.tar.gz
Algorithm Hash digest
SHA256 04ba3b8f24d431c482ceedf0c83d72a76cf8d6d098056d1353e5e67e328c4f7a
MD5 364a888c9cb198e6a66e7402537163f4
BLAKE2b-256 d3576b49f6309fbd017aae34b3822368f9d6e729fc41fedb7ada348115d618ee

See more details on using hashes here.

File details

Details for the file qode_task_worker-1.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for qode_task_worker-1.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 a5241940a1c6e267ffc0024010384ea02a320798d1610fe7181a2413fd1b6aa3
MD5 512bd230bc6d3cac2aa93e517f6576f6
BLAKE2b-256 dc0eb6d6db4af392ea6c0fe2d172dcdb6b6b6ca38bdeb9d4cead4d7b35080ff8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page