Skip to main content

Python SDK for DistWorker distributed task processing system

Project description

DistWorker Python SDK

A Python SDK for connecting workers to the DistWorker distributed task processing system.

Features

  • WebSocket Communication: Real-time communication with the DistWorker controller using WebSocket + Protocol Buffers
  • HMAC Authentication: Secure authentication using HMAC-SHA256 signatures
  • Automatic Reconnection: Built-in reconnection logic with configurable retry policies
  • Task Progress Reporting: Send progress updates during task execution
  • Resource Monitoring: Report worker resource information and usage
  • Async/Await Support: Fully asynchronous API using Python's asyncio

Installation

pip install distworker-sdk

Development Installation

git clone https://github.com/jc-lab/distworker.git
cd distworker/python
pip install -e .

Quick Start

Here's a simple worker that handles mathematical operations:

import asyncio
from distworker import Worker, Task

async def handle_math_task(task: Task):
    """Handle mathematical operations"""
    operation = task.get_input('operation')
    a = task.get_input('a')
    b = task.get_input('b')
    
    if operation == 'add':
        result = a + b
    elif operation == 'multiply':
        result = a * b
    else:
        raise ValueError(f"Unknown operation: {operation}")
    
    return {'result': result}

async def main():
    # Create worker
    worker = Worker(
        controller_url='ws://localhost:8080/ws',
        worker_id='math-worker-001',
        worker_token='your-worker-token',
        resource_info={'cpu_cores': 4, 'memory_mb': 8192}
    )
    
    # Register task handler
    worker.register_handler('math.*', handle_math_task)
    
    # Start worker
    await worker.start()
    
    # Keep running
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        await worker.stop()

if __name__ == '__main__':
    asyncio.run(main())

Configuration

Environment Variables

You can configure the worker using environment variables:

export DISTWORKER_CONTROLLER_URL="ws://localhost:8080/ws"
export DISTWORKER_WORKER_ID="my-worker-001"
export DISTWORKER_WORKER_TOKEN="your-secret-token"

Worker Parameters

  • controller_url: WebSocket URL of the DistWorker controller
  • worker_id: Unique identifier for this worker instance
  • worker_token: Secret token for authentication
  • resource_info: Dictionary with worker resource information
  • reconnect_interval: Seconds between reconnection attempts (default: 5.0)
  • heartbeat_interval: Seconds between heartbeat messages (default: 30.0)
  • max_reconnect_attempts: Maximum reconnection attempts, -1 for unlimited (default: -1)

Task Handling

Basic Handler

async def my_task_handler(task: Task):
    # Access input data
    input_value = task.get_input('key', default_value)
    
    # Access metadata
    priority = task.get_metadata('priority', 'normal')
    
    # Access files
    for file_info in task.files:
        file_id = file_info['file_id']
        filename = file_info['filename']
        storage_url = file_info['storage_url']
    
    # Return results
    return {'status': 'completed', 'result': 'success'}

Progress Reporting

async def long_running_task(task: Task):
    # Send progress updates
    await worker.send_task_progress(25.0, "Processing started")
    await asyncio.sleep(2)
    
    await worker.send_task_progress(50.0, "Half way done")
    await asyncio.sleep(2)
    
    await worker.send_task_progress(90.0, "Almost finished")
    await asyncio.sleep(1)
    
    return {'status': 'completed'}

Error Handling

async def safe_task_handler(task: Task):
    try:
        # Task processing logic
        result = process_data(task.get_input('data'))
        return {'result': result}
    except ValueError as e:
        # Task will be marked as failed with this error
        raise e
    except Exception as e:
        # Convert to a more specific error
        raise RuntimeError(f"Processing failed: {e}")

Examples

The SDK includes example workers:

Basic Worker

python -m distworker.examples.basic_worker

Handles mathematical operations, text processing, and data transformations.

File Processing Worker

python -m distworker.examples.file_worker

Processes files, including image resizing and document conversion.

API Reference

Worker Class

Constructor

Worker(
    controller_url: str,
    worker_id: str,
    worker_token: str,
    queue_patterns: List[str],
    resource_info: Optional[Dict[str, Any]] = None,
    reconnect_interval: float = 5.0,
    heartbeat_interval: float = 30.0,
    max_reconnect_attempts: int = -1
)

Methods

  • register_handler(pattern: str, handler: Callable) - Register a task handler
  • start() - Start the worker and connect to controller
  • stop() - Stop the worker and disconnect
  • send_task_progress(progress: float, message: str, data: Dict) - Send progress update

Task Class

Properties

  • task_id: str - Unique task identifier
  • queue: str - Queue name where task was submitted
  • timeout_ms: int - Task timeout in milliseconds
  • metadata: Dict[str, Any] - Task metadata
  • input_data: Dict[str, Any] - Task input data
  • files: List[Dict[str, Any]] - List of file information

Methods

  • get_input(key: str, default: Any = None) - Get input value by key
  • get_metadata(key: str, default: Any = None) - Get metadata value by key
  • get_file_by_id(file_id: str) - Get file info by ID
  • get_files_by_name(filename: str) - Get files by filename

Development

Running Tests

pip install -e ".[dev]"
pytest

Code Formatting

black distworker/
flake8 distworker/

Type Checking

mypy distworker/

License

Apache-2.0 License - see LICENSE file for details.

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Run the test suite
  6. Submit a pull request

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distworker_sdk-0.0.3rc1.tar.gz (23.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

distworker_sdk-0.0.3rc1-py3-none-any.whl (22.0 kB view details)

Uploaded Python 3

File details

Details for the file distworker_sdk-0.0.3rc1.tar.gz.

File metadata

  • Download URL: distworker_sdk-0.0.3rc1.tar.gz
  • Upload date:
  • Size: 23.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for distworker_sdk-0.0.3rc1.tar.gz
Algorithm Hash digest
SHA256 45e8ed29b6798b5a6b2083d0676a32fde87e25e811dd33274d37afc15755a899
MD5 1c613c17f7307ce1f82125d5ad082526
BLAKE2b-256 ad02c7238d463e8ecd84839e09bac886d34b27c0583778b6a4b7a07fefd03cf6

See more details on using hashes here.

Provenance

The following attestation bundles were made for distworker_sdk-0.0.3rc1.tar.gz:

Publisher: pypi-publish.yml on jc-lab/distworker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file distworker_sdk-0.0.3rc1-py3-none-any.whl.

File metadata

File hashes

Hashes for distworker_sdk-0.0.3rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 9b1f19d7b64177507df7a84bdde8cef6b46233933b0b10f0a88fe802352f8ee2
MD5 7a92228bdef868735d0a27262b841573
BLAKE2b-256 31767862072f928278c640c4f868905326c333d7c4a407cf6440d26509216679

See more details on using hashes here.

Provenance

The following attestation bundles were made for distworker_sdk-0.0.3rc1-py3-none-any.whl:

Publisher: pypi-publish.yml on jc-lab/distworker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page