Skip to main content

Python SDK for DistWorker distributed task processing system

Project description

DistWorker Python SDK

A Python SDK for connecting workers to the DistWorker distributed task processing system.

Features

  • WebSocket Communication: Real-time communication with the DistWorker controller using WebSocket + Protocol Buffers
  • HMAC Authentication: Secure authentication using HMAC-SHA256 signatures
  • Automatic Reconnection: Built-in reconnection logic with configurable retry policies
  • Task Progress Reporting: Send progress updates during task execution
  • Resource Monitoring: Report worker resource information and usage
  • Async/Await Support: Fully asynchronous API using Python's asyncio

Installation

pip install distworker-sdk

Development Installation

git clone https://github.com/jc-lab/distworker.git
cd distworker/python
pip install -e .

Quick Start

Here's a simple worker that handles mathematical operations:

import asyncio
from distworker import Worker, Task

async def handle_math_task(task: Task):
    """Handle mathematical operations"""
    operation = task.get_input('operation')
    a = task.get_input('a')
    b = task.get_input('b')
    
    if operation == 'add':
        result = a + b
    elif operation == 'multiply':
        result = a * b
    else:
        raise ValueError(f"Unknown operation: {operation}")
    
    return {'result': result}

async def main():
    # Create worker
    worker = Worker(
        controller_url='ws://localhost:8080/ws',
        worker_id='math-worker-001',
        worker_token='your-worker-token',
        resource_info={'cpu_cores': 4, 'memory_mb': 8192}
    )
    
    # Register task handler
    worker.register_handler('math.*', handle_math_task)
    
    # Start worker
    await worker.start()
    
    # Keep running
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        await worker.stop()

if __name__ == '__main__':
    asyncio.run(main())

Configuration

Environment Variables

You can configure the worker using environment variables:

export DISTWORKER_CONTROLLER_URL="ws://localhost:8080/ws"
export DISTWORKER_WORKER_ID="my-worker-001"
export DISTWORKER_WORKER_TOKEN="your-secret-token"

Worker Parameters

  • controller_url: WebSocket URL of the DistWorker controller
  • worker_id: Unique identifier for this worker instance
  • worker_token: Secret token for authentication
  • resource_info: Dictionary with worker resource information
  • reconnect_interval: Seconds between reconnection attempts (default: 5.0)
  • heartbeat_interval: Seconds between heartbeat messages (default: 30.0)
  • max_reconnect_attempts: Maximum reconnection attempts, -1 for unlimited (default: -1)

Task Handling

Basic Handler

async def my_task_handler(task: Task):
    # Access input data
    input_value = task.get_input('key', default_value)
    
    # Access metadata
    priority = task.get_metadata('priority', 'normal')
    
    # Access files
    for file_info in task.files:
        file_id = file_info['file_id']
        filename = file_info['filename']
        storage_url = file_info['storage_url']
    
    # Return results
    return {'status': 'completed', 'result': 'success'}

Progress Reporting

async def long_running_task(task: Task):
    # Send progress updates
    await worker.send_task_progress(25.0, "Processing started")
    await asyncio.sleep(2)
    
    await worker.send_task_progress(50.0, "Half way done")
    await asyncio.sleep(2)
    
    await worker.send_task_progress(90.0, "Almost finished")
    await asyncio.sleep(1)
    
    return {'status': 'completed'}

Error Handling

async def safe_task_handler(task: Task):
    try:
        # Task processing logic
        result = process_data(task.get_input('data'))
        return {'result': result}
    except ValueError as e:
        # Task will be marked as failed with this error
        raise e
    except Exception as e:
        # Convert to a more specific error
        raise RuntimeError(f"Processing failed: {e}")

Examples

The SDK includes example workers:

Basic Worker

python -m distworker.examples.basic_worker

Handles mathematical operations, text processing, and data transformations.

File Processing Worker

python -m distworker.examples.file_worker

Processes files, including image resizing and document conversion.

API Reference

Worker Class

Constructor

Worker(
    controller_url: str,
    worker_id: str,
    worker_token: str,
    queue_patterns: List[str],
    resource_info: Optional[Dict[str, Any]] = None,
    reconnect_interval: float = 5.0,
    heartbeat_interval: float = 30.0,
    max_reconnect_attempts: int = -1
)

Methods

  • register_handler(pattern: str, handler: Callable) - Register a task handler
  • start() - Start the worker and connect to controller
  • stop() - Stop the worker and disconnect
  • send_task_progress(progress: float, message: str, data: Dict) - Send progress update

Task Class

Properties

  • task_id: str - Unique task identifier
  • queue: str - Queue name where task was submitted
  • timeout_ms: int - Task timeout in milliseconds
  • metadata: Dict[str, Any] - Task metadata
  • input_data: Dict[str, Any] - Task input data
  • files: List[Dict[str, Any]] - List of file information

Methods

  • get_input(key: str, default: Any = None) - Get input value by key
  • get_metadata(key: str, default: Any = None) - Get metadata value by key
  • get_file_by_id(file_id: str) - Get file info by ID
  • get_files_by_name(filename: str) - Get files by filename

Development

Running Tests

pip install -e ".[dev]"
pytest

Code Formatting

black distworker/
flake8 distworker/

Type Checking

mypy distworker/

License

Apache-2.0 License - see LICENSE file for details.

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Run the test suite
  6. Submit a pull request

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distworker_sdk-0.0.1rc6.tar.gz (23.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

distworker_sdk-0.0.1rc6-py3-none-any.whl (21.6 kB view details)

Uploaded Python 3

File details

Details for the file distworker_sdk-0.0.1rc6.tar.gz.

File metadata

  • Download URL: distworker_sdk-0.0.1rc6.tar.gz
  • Upload date:
  • Size: 23.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for distworker_sdk-0.0.1rc6.tar.gz
Algorithm Hash digest
SHA256 f060b03c79f12f800cc031d62bc8cee5681b5b2cf61eabadcdf7f97d3f227092
MD5 f7fc6979eb1aa082470978b931a69ec2
BLAKE2b-256 59d03692347353ffa7447b6e98cbc437144f749f639fd3ba33acb5fdcd4fb510

See more details on using hashes here.

Provenance

The following attestation bundles were made for distworker_sdk-0.0.1rc6.tar.gz:

Publisher: pypi-publish.yml on jc-lab/distworker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file distworker_sdk-0.0.1rc6-py3-none-any.whl.

File metadata

File hashes

Hashes for distworker_sdk-0.0.1rc6-py3-none-any.whl
Algorithm Hash digest
SHA256 9aa5d4e5300c5b7df8504bbb7e891c7e72559e43619f6d773072151d9d624e20
MD5 95e33007fbe3f2c9fbe69a1882c89871
BLAKE2b-256 69ba69c0762889f91ddc7a2f4a690c2916224aa6a69f73e6019e7f84b067df14

See more details on using hashes here.

Provenance

The following attestation bundles were made for distworker_sdk-0.0.1rc6-py3-none-any.whl:

Publisher: pypi-publish.yml on jc-lab/distworker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page