Python SDK for DistWorker distributed task processing system
Project description
DistWorker Python SDK
A Python SDK for connecting workers to the DistWorker distributed task processing system.
Features
- WebSocket Communication: Real-time communication with the DistWorker controller using WebSocket + Protocol Buffers
- HMAC Authentication: Secure authentication using HMAC-SHA256 signatures
- Automatic Reconnection: Built-in reconnection logic with configurable retry policies
- Task Progress Reporting: Send progress updates during task execution
- Resource Monitoring: Report worker resource information and usage
- Async/Await Support: Fully asynchronous API using Python's asyncio
Installation
pip install distworker-sdk
Development Installation
git clone https://github.com/jc-lab/distworker.git
cd distworker/python
pip install -e .
Quick Start
Here's a simple worker that handles mathematical operations:
import asyncio
from distworker import Worker, Task
async def handle_math_task(task: Task):
"""Handle mathematical operations"""
operation = task.get_input('operation')
a = task.get_input('a')
b = task.get_input('b')
if operation == 'add':
result = a + b
elif operation == 'multiply':
result = a * b
else:
raise ValueError(f"Unknown operation: {operation}")
return {'result': result}
async def main():
# Create worker
worker = Worker(
controller_url='ws://localhost:8080/ws',
worker_id='math-worker-001',
worker_token='your-worker-token',
resource_info={'cpu_cores': 4, 'memory_mb': 8192}
)
# Register task handler
worker.register_handler('math.*', handle_math_task)
# Start worker
await worker.start()
# Keep running
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await worker.stop()
if __name__ == '__main__':
asyncio.run(main())
Configuration
Environment Variables
You can configure the worker using environment variables:
export DISTWORKER_CONTROLLER_URL="ws://localhost:8080/ws"
export DISTWORKER_WORKER_ID="my-worker-001"
export DISTWORKER_WORKER_TOKEN="your-secret-token"
Worker Parameters
- controller_url: WebSocket URL of the DistWorker controller
- worker_id: Unique identifier for this worker instance
- worker_token: Secret token for authentication
- resource_info: Dictionary with worker resource information
- reconnect_interval: Seconds between reconnection attempts (default: 5.0)
- heartbeat_interval: Seconds between heartbeat messages (default: 30.0)
- max_reconnect_attempts: Maximum reconnection attempts, -1 for unlimited (default: -1)
Task Handling
Basic Handler
async def my_task_handler(task: Task):
# Access input data
input_value = task.get_input('key', default_value)
# Access metadata
priority = task.get_metadata('priority', 'normal')
# Access files
for file_info in task.files:
file_id = file_info['file_id']
filename = file_info['filename']
storage_url = file_info['storage_url']
# Return results
return {'status': 'completed', 'result': 'success'}
Progress Reporting
async def long_running_task(task: Task):
# Send progress updates
await worker.send_task_progress(25.0, "Processing started")
await asyncio.sleep(2)
await worker.send_task_progress(50.0, "Half way done")
await asyncio.sleep(2)
await worker.send_task_progress(90.0, "Almost finished")
await asyncio.sleep(1)
return {'status': 'completed'}
Error Handling
async def safe_task_handler(task: Task):
try:
# Task processing logic
result = process_data(task.get_input('data'))
return {'result': result}
except ValueError as e:
# Task will be marked as failed with this error
raise e
except Exception as e:
# Convert to a more specific error
raise RuntimeError(f"Processing failed: {e}")
Examples
The SDK includes example workers:
Basic Worker
python -m distworker.examples.basic_worker
Handles mathematical operations, text processing, and data transformations.
File Processing Worker
python -m distworker.examples.file_worker
Processes files, including image resizing and document conversion.
API Reference
Worker Class
Constructor
Worker(
controller_url: str,
worker_id: str,
worker_token: str,
queue_patterns: List[str],
resource_info: Optional[Dict[str, Any]] = None,
reconnect_interval: float = 5.0,
heartbeat_interval: float = 30.0,
max_reconnect_attempts: int = -1
)
Methods
register_handler(pattern: str, handler: Callable)- Register a task handlerstart()- Start the worker and connect to controllerstop()- Stop the worker and disconnectsend_task_progress(progress: float, message: str, data: Dict)- Send progress update
Task Class
Properties
task_id: str- Unique task identifierqueue: str- Queue name where task was submittedtimeout_ms: int- Task timeout in millisecondsmetadata: Dict[str, Any]- Task metadatainput_data: Dict[str, Any]- Task input datafiles: List[Dict[str, Any]]- List of file information
Methods
get_input(key: str, default: Any = None)- Get input value by keyget_metadata(key: str, default: Any = None)- Get metadata value by keyget_file_by_id(file_id: str)- Get file info by IDget_files_by_name(filename: str)- Get files by filename
Development
Running Tests
pip install -e ".[dev]"
pytest
Code Formatting
black distworker/
flake8 distworker/
Type Checking
mypy distworker/
License
Apache-2.0 License - see LICENSE file for details.
Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Run the test suite
- Submit a pull request
Support
- GitHub Issues: https://github.com/jc-lab/distworker/issues
- Documentation: https://github.com/jc-lab/distworker/blob/main/README.md
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file distworker_sdk-0.0.2.tar.gz.
File metadata
- Download URL: distworker_sdk-0.0.2.tar.gz
- Upload date:
- Size: 23.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2a2e8fc9e0135719ee4592e71ac7c1813980142125e2efe96605c39c8920f050
|
|
| MD5 |
93ca2b81af1dc524e5307f859afbd612
|
|
| BLAKE2b-256 |
471f0883bf449c5129a0654580b6f6fdbe699ff22a683cc1d84aba31adf5d556
|
Provenance
The following attestation bundles were made for distworker_sdk-0.0.2.tar.gz:
Publisher:
pypi-publish.yml on jc-lab/distworker
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
distworker_sdk-0.0.2.tar.gz -
Subject digest:
2a2e8fc9e0135719ee4592e71ac7c1813980142125e2efe96605c39c8920f050 - Sigstore transparency entry: 233855768
- Sigstore integration time:
-
Permalink:
jc-lab/distworker@8f63291f21aaa14cd60a0bc71e5be040a9a74972 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/jc-lab
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi-publish.yml@8f63291f21aaa14cd60a0bc71e5be040a9a74972 -
Trigger Event:
push
-
Statement type:
File details
Details for the file distworker_sdk-0.0.2-py3-none-any.whl.
File metadata
- Download URL: distworker_sdk-0.0.2-py3-none-any.whl
- Upload date:
- Size: 21.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ae5a16dc97d491be38806e46af2ad557942779d2721880f72865f549dd889ae
|
|
| MD5 |
2a059bb55a9eef9ce46566d68036dc88
|
|
| BLAKE2b-256 |
989ed9ef59457ab2385b130d7e3f7f0d4f64b801b9f52717a7ee3b3ac6483e34
|
Provenance
The following attestation bundles were made for distworker_sdk-0.0.2-py3-none-any.whl:
Publisher:
pypi-publish.yml on jc-lab/distworker
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
distworker_sdk-0.0.2-py3-none-any.whl -
Subject digest:
7ae5a16dc97d491be38806e46af2ad557942779d2721880f72865f549dd889ae - Sigstore transparency entry: 233855769
- Sigstore integration time:
-
Permalink:
jc-lab/distworker@8f63291f21aaa14cd60a0bc71e5be040a9a74972 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/jc-lab
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi-publish.yml@8f63291f21aaa14cd60a0bc71e5be040a9a74972 -
Trigger Event:
push
-
Statement type: