Skip to main content

Python SDK for DistWorker distributed task processing system

Project description

DistWorker Python SDK

A Python SDK for connecting workers to the DistWorker distributed task processing system.

Features

  • WebSocket Communication: Real-time communication with the DistWorker controller using WebSocket + Protocol Buffers
  • HMAC Authentication: Secure authentication using HMAC-SHA256 signatures
  • Automatic Reconnection: Built-in reconnection logic with configurable retry policies
  • Task Progress Reporting: Send progress updates during task execution
  • Resource Monitoring: Report worker resource information and usage
  • Async/Await Support: Fully asynchronous API using Python's asyncio

Installation

pip install distworker-sdk

Development Installation

git clone https://github.com/jc-lab/distworker.git
cd distworker/python
pip install -e .

Quick Start

Here's a simple worker that handles mathematical operations:

import asyncio
from distworker import Worker, Task

async def handle_math_task(task: Task):
    """Handle mathematical operations"""
    operation = task.get_input('operation')
    a = task.get_input('a')
    b = task.get_input('b')
    
    if operation == 'add':
        result = a + b
    elif operation == 'multiply':
        result = a * b
    else:
        raise ValueError(f"Unknown operation: {operation}")
    
    return {'result': result}

async def main():
    # Create worker
    worker = Worker(
        controller_url='ws://localhost:8080/ws',
        worker_id='math-worker-001',
        worker_token='your-worker-token',
        resource_info={'cpu_cores': 4, 'memory_mb': 8192}
    )
    
    # Register task handler
    worker.register_handler('math.*', handle_math_task)
    
    # Start worker
    await worker.start()
    
    # Keep running
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        await worker.stop()

if __name__ == '__main__':
    asyncio.run(main())

Configuration

Environment Variables

You can configure the worker using environment variables:

export DISTWORKER_CONTROLLER_URL="ws://localhost:8080/ws"
export DISTWORKER_WORKER_ID="my-worker-001"
export DISTWORKER_WORKER_TOKEN="your-secret-token"

Worker Parameters

  • controller_url: WebSocket URL of the DistWorker controller
  • worker_id: Unique identifier for this worker instance
  • worker_token: Secret token for authentication
  • resource_info: Dictionary with worker resource information
  • reconnect_interval: Seconds between reconnection attempts (default: 5.0)
  • heartbeat_interval: Seconds between heartbeat messages (default: 30.0)
  • max_reconnect_attempts: Maximum reconnection attempts, -1 for unlimited (default: -1)

Task Handling

Basic Handler

async def my_task_handler(task: Task):
    # Access input data
    input_value = task.get_input('key', default_value)
    
    # Access metadata
    priority = task.get_metadata('priority', 'normal')
    
    # Access files
    for file_info in task.files:
        file_id = file_info['file_id']
        filename = file_info['filename']
        storage_url = file_info['storage_url']
    
    # Return results
    return {'status': 'completed', 'result': 'success'}

Progress Reporting

async def long_running_task(task: Task):
    # Send progress updates
    await worker.send_task_progress(25.0, "Processing started")
    await asyncio.sleep(2)
    
    await worker.send_task_progress(50.0, "Half way done")
    await asyncio.sleep(2)
    
    await worker.send_task_progress(90.0, "Almost finished")
    await asyncio.sleep(1)
    
    return {'status': 'completed'}

Error Handling

async def safe_task_handler(task: Task):
    try:
        # Task processing logic
        result = process_data(task.get_input('data'))
        return {'result': result}
    except ValueError as e:
        # Task will be marked as failed with this error
        raise e
    except Exception as e:
        # Convert to a more specific error
        raise RuntimeError(f"Processing failed: {e}")

Examples

The SDK includes example workers:

Basic Worker

python -m distworker.examples.basic_worker

Handles mathematical operations, text processing, and data transformations.

File Processing Worker

python -m distworker.examples.file_worker

Processes files, including image resizing and document conversion.

API Reference

Worker Class

Constructor

Worker(
    controller_url: str,
    worker_id: str,
    worker_token: str,
    queue_patterns: List[str],
    resource_info: Optional[Dict[str, Any]] = None,
    reconnect_interval: float = 5.0,
    heartbeat_interval: float = 30.0,
    max_reconnect_attempts: int = -1
)

Methods

  • register_handler(pattern: str, handler: Callable) - Register a task handler
  • start() - Start the worker and connect to controller
  • stop() - Stop the worker and disconnect
  • send_task_progress(progress: float, message: str, data: Dict) - Send progress update

Task Class

Properties

  • task_id: str - Unique task identifier
  • queue: str - Queue name where task was submitted
  • timeout_ms: int - Task timeout in milliseconds
  • metadata: Dict[str, Any] - Task metadata
  • input_data: Dict[str, Any] - Task input data
  • files: List[Dict[str, Any]] - List of file information

Methods

  • get_input(key: str, default: Any = None) - Get input value by key
  • get_metadata(key: str, default: Any = None) - Get metadata value by key
  • get_file_by_id(file_id: str) - Get file info by ID
  • get_files_by_name(filename: str) - Get files by filename

Development

Running Tests

pip install -e ".[dev]"
pytest

Code Formatting

black distworker/
flake8 distworker/

Type Checking

mypy distworker/

License

Apache-2.0 License - see LICENSE file for details.

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Run the test suite
  6. Submit a pull request

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distworker_sdk-0.0.1rc7.tar.gz (23.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

distworker_sdk-0.0.1rc7-py3-none-any.whl (21.6 kB view details)

Uploaded Python 3

File details

Details for the file distworker_sdk-0.0.1rc7.tar.gz.

File metadata

  • Download URL: distworker_sdk-0.0.1rc7.tar.gz
  • Upload date:
  • Size: 23.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for distworker_sdk-0.0.1rc7.tar.gz
Algorithm Hash digest
SHA256 98a6cac8b616756a3fc97127cf42699cba8c74cab6de115965e21b322bed4c03
MD5 868e90973ab48306a85641080a059498
BLAKE2b-256 92e58932255f4a259c31480499089e349680d45b36d99262a3fad7ed86a99905

See more details on using hashes here.

Provenance

The following attestation bundles were made for distworker_sdk-0.0.1rc7.tar.gz:

Publisher: pypi-publish.yml on jc-lab/distworker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file distworker_sdk-0.0.1rc7-py3-none-any.whl.

File metadata

File hashes

Hashes for distworker_sdk-0.0.1rc7-py3-none-any.whl
Algorithm Hash digest
SHA256 639acf93224aba1c6dd2d66e7bf38d9f85cfb6f225be6c8dd493fb128397b017
MD5 5b35373676a95bf0f3223da177c118a0
BLAKE2b-256 775eccfa84fab49a761156935dae5bacc11a203361ee133c45d3c1815fb8030b

See more details on using hashes here.

Provenance

The following attestation bundles were made for distworker_sdk-0.0.1rc7-py3-none-any.whl:

Publisher: pypi-publish.yml on jc-lab/distworker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page