Skip to main content

Python SDK for DistWorker distributed task processing system

Project description

DistWorker Python SDK

A Python SDK for connecting workers to the DistWorker distributed task processing system.

Features

  • WebSocket Communication: Real-time communication with the DistWorker controller using WebSocket + Protocol Buffers
  • HMAC Authentication: Secure authentication using HMAC-SHA256 signatures
  • Automatic Reconnection: Built-in reconnection logic with configurable retry policies
  • Task Progress Reporting: Send progress updates during task execution
  • Queue Pattern Matching: Support for MQTT-style wildcards (* and #) in queue patterns
  • Resource Monitoring: Report worker resource information and usage
  • Async/Await Support: Fully asynchronous API using Python's asyncio

Installation

pip install distworker-sdk

Development Installation

git clone https://github.com/jc-lab/distworker.git
cd distworker/python-worker-sdk
pip install -e .

Quick Start

Here's a simple worker that handles mathematical operations:

import asyncio
from distworker import Worker, Task

async def handle_math_task(task: Task):
    """Handle mathematical operations"""
    operation = task.get_input('operation')
    a = task.get_input('a')
    b = task.get_input('b')
    
    if operation == 'add':
        result = a + b
    elif operation == 'multiply':
        result = a * b
    else:
        raise ValueError(f"Unknown operation: {operation}")
    
    return {'result': result}

async def main():
    # Create worker
    worker = Worker(
        controller_url='ws://localhost:8080/ws',
        worker_id='math-worker-001',
        worker_token='your-worker-token',
        resource_info={'cpu_cores': 4, 'memory_mb': 8192}
    )
    
    # Register task handler
    worker.register_handler('math.*', handle_math_task)
    
    # Start worker
    await worker.start()
    
    # Keep running
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        await worker.stop()

if __name__ == '__main__':
    asyncio.run(main())

Configuration

Environment Variables

You can configure the worker using environment variables:

export DISTWORKER_CONTROLLER_URL="ws://localhost:8080/ws"
export DISTWORKER_WORKER_ID="my-worker-001"
export DISTWORKER_WORKER_TOKEN="your-secret-token"

Worker Parameters

  • controller_url: WebSocket URL of the DistWorker controller
  • worker_id: Unique identifier for this worker instance
  • worker_token: Secret token for authentication
  • resource_info: Dictionary with worker resource information
  • reconnect_interval: Seconds between reconnection attempts (default: 5.0)
  • heartbeat_interval: Seconds between heartbeat messages (default: 30.0)
  • max_reconnect_attempts: Maximum reconnection attempts, -1 for unlimited (default: -1)

Task Handling

Basic Handler

async def my_task_handler(task: Task):
    # Access input data
    input_value = task.get_input('key', default_value)
    
    # Access metadata
    priority = task.get_metadata('priority', 'normal')
    
    # Access files
    for file_info in task.files:
        file_id = file_info['file_id']
        filename = file_info['filename']
        storage_url = file_info['storage_url']
    
    # Return results
    return {'status': 'completed', 'result': 'success'}

Progress Reporting

async def long_running_task(task: Task):
    # Send progress updates
    await worker.send_task_progress(25.0, "Processing started")
    await asyncio.sleep(2)
    
    await worker.send_task_progress(50.0, "Half way done")
    await asyncio.sleep(2)
    
    await worker.send_task_progress(90.0, "Almost finished")
    await asyncio.sleep(1)
    
    return {'status': 'completed'}

Error Handling

async def safe_task_handler(task: Task):
    try:
        # Task processing logic
        result = process_data(task.get_input('data'))
        return {'result': result}
    except ValueError as e:
        # Task will be marked as failed with this error
        raise e
    except Exception as e:
        # Convert to a more specific error
        raise RuntimeError(f"Processing failed: {e}")

Examples

The SDK includes example workers:

Basic Worker

python -m distworker.examples.basic_worker

Handles mathematical operations, text processing, and data transformations.

File Processing Worker

python -m distworker.examples.file_worker

Processes files, including image resizing and document conversion.

API Reference

Worker Class

Constructor

Worker(
    controller_url: str,
    worker_id: str,
    worker_token: str,
    queue_patterns: List[str],
    resource_info: Optional[Dict[str, Any]] = None,
    reconnect_interval: float = 5.0,
    heartbeat_interval: float = 30.0,
    max_reconnect_attempts: int = -1
)

Methods

  • register_handler(pattern: str, handler: Callable) - Register a task handler
  • start() - Start the worker and connect to controller
  • stop() - Stop the worker and disconnect
  • send_task_progress(progress: float, message: str, data: Dict) - Send progress update

Task Class

Properties

  • task_id: str - Unique task identifier
  • queue: str - Queue name where task was submitted
  • timeout_ms: int - Task timeout in milliseconds
  • metadata: Dict[str, Any] - Task metadata
  • input_data: Dict[str, Any] - Task input data
  • files: List[Dict[str, Any]] - List of file information

Methods

  • get_input(key: str, default: Any = None) - Get input value by key
  • get_metadata(key: str, default: Any = None) - Get metadata value by key
  • get_file_by_id(file_id: str) - Get file info by ID
  • get_files_by_name(filename: str) - Get files by filename

Development

Running Tests

pip install -e ".[dev]"
pytest

Code Formatting

black distworker/
flake8 distworker/

Type Checking

mypy distworker/

License

MIT License - see LICENSE file for details.

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Run the test suite
  6. Submit a pull request

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distworker_sdk-0.0.2rc1.tar.gz (40.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

distworker_sdk-0.0.2rc1-py3-none-any.whl (46.4 kB view details)

Uploaded Python 3

File details

Details for the file distworker_sdk-0.0.2rc1.tar.gz.

File metadata

  • Download URL: distworker_sdk-0.0.2rc1.tar.gz
  • Upload date:
  • Size: 40.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for distworker_sdk-0.0.2rc1.tar.gz
Algorithm Hash digest
SHA256 e94d0c4f9c8235466653804e73c966b82e41fa5f2f8ee877f3bfcdd25fe2a67d
MD5 ecd407461f849ed828e86a110eb01768
BLAKE2b-256 1930390be33a41c720232010a1b1a6eed821c22a0b901f532e8fa931dd6655b9

See more details on using hashes here.

File details

Details for the file distworker_sdk-0.0.2rc1-py3-none-any.whl.

File metadata

File hashes

Hashes for distworker_sdk-0.0.2rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 ef94268164ff3fc8e4067956535ac566459d86688b1b22218dbb27c22d57a708
MD5 db87e2a1dfd65346e2d35ca7873da342
BLAKE2b-256 eefc111e81319d920481c510c65627d4a9f16767c354d957495453eb1138a047

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page