Skip to main content

Python SDK for go-server distributed task scheduling system

Project description

Go-Server Python SDK

Python SDK for the go-server distributed task scheduling system. This SDK provides both client and worker libraries for interacting with the go-server scheduler.

GitHub

Features

  • 🚀 Easy Integration: Simple APIs for both clients and workers
  • 🔄 Automatic Retry: Built-in retry mechanisms for robust operation
  • 📡 WebSocket Support: Real-time communication with the scheduler
  • 🛡️ Type Safety: Full type hints support for better development experience
  • 📚 Method Documentation: Support for documenting registered methods
  • ⚖️ Load Balancing: Automatic task distribution across workers

Installation

pip install -r requirements.txt

Or install from source:

python setup.py install

Quick Start

Client Usage

from python_sdk.schedulersdk import SchedulerClient

# Create a client
client = SchedulerClient("http://localhost:8080")

# Execute a task synchronously
result = client.execute_sync("add", {"a": 1, "b": 2}, timeout=30.0)
print(f"Result: {result.result}")  # Output: Result: 3

# Execute a task asynchronously
response = client.execute("add", {"a": 5, "b": 3})
task_id = response.task_id

# Get the result later
result = client.get_result(task_id)
print(f"Async result: {result.result}")  # Output: Async result: 8

Worker Usage

from python_sdk.workersdk import Worker, Config
import time

# Define your methods
def add_numbers(params):
    """Add two numbers"""
    return params["a"] + params["b"]

def multiply_numbers(params):
    """Multiply two numbers"""
    return params["a"] * params["b"]

def long_running_task(params):
    """Simulate a long-running task"""
    time.sleep(params.get("duration", 5))
    return {"status": "completed", "message": "Task finished"}

# Create worker configuration
config = Config(
    scheduler_url="http://localhost:8080",
    worker_group="math_workers",
    max_retry=3,
    ping_interval=30
)

# Create and configure worker
worker = Worker(config)

# Register methods with documentation
worker.register_method(
    "add", 
    add_numbers,
    "Add two numbers",
    "Parameters: {\"a\": number, \"b\": number}",
    "Returns: number"
)

worker.register_method(
    "multiply", 
    multiply_numbers,
    "Multiply two numbers",
    "Parameters: {\"a\": number, \"b\": number}",
    "Returns: number"
)

worker.register_method(
    "long_task",
    long_running_task,
    "Execute a long-running task",
    "Parameters: {\"duration\": number (optional, default: 5)}",
    "Returns: {\"status\": string, \"message\": string}"
)

# Start the worker
try:
    worker.start()
    print("Worker started. Press Ctrl+C to stop.")
    
    # Keep the worker running
    import signal
    import sys
    
    def signal_handler(sig, frame):
        print("\nStopping worker...")
        worker.stop()
        sys.exit(0)
    
    signal.signal(signal.SIGINT, signal_handler)
    signal.pause()
    
except Exception as e:
    print(f"Error starting worker: {e}")
    worker.stop()

Simple Call Function

For quick one-off calls, you can use the simple call function:

from python_sdk.workersdk import call

# Simple synchronous call
result = call("http://localhost:8080", "add", {"a": 1, "b": 2})
print(f"Result: {result}")  # Output: Result: 3

# Call with type hint
result: int = call("http://localhost:8080", "add", {"a": 1, "b": 2}, int)

# Async call
from python_sdk.workersdk import call_async, get_result

task_id = call_async("http://localhost:8080", "long_task", {"duration": 10})
print(f"Task submitted: {task_id}")

# Get result later
result = get_result("http://localhost:8080", task_id)
print(f"Task result: {result}")

Retry Client

For more robust operation, use the retry client:

from python_sdk.schedulersdk import RetryClient

# Create retry client with custom settings
client = RetryClient(
    base_url="http://localhost:8080",
    max_retries=5,
    retry_delay=2.0,
    timeout=30
)

# Execute with automatic retry
result = client.execute_with_retry("add", {"a": 1, "b": 2})
print(f"Result: {result.result}")

# Synchronous execution with retry
result = client.execute_sync_with_retry("multiply", {"a": 3, "b": 4}, timeout=60.0)
print(f"Result: {result.result}")

API Reference

SchedulerClient

  • execute(method, params): Execute a task asynchronously
  • get_result(task_id): Get result for a task (with polling)
  • execute_sync(method, params, timeout): Execute a task synchronously

RetryClient

  • execute_with_retry(method, params): Execute with automatic retry
  • execute_sync_with_retry(method, params, timeout): Synchronous execution with retry

Worker

  • register_method(name, handler, *docs): Register a method handler
  • start(): Start the worker
  • stop(): Stop the worker

Configuration

config = Config(
    scheduler_url="http://localhost:8080",  # Scheduler URL
    worker_group="my_workers",              # Worker group name
    max_retry=3,                            # Connection retry attempts
    ping_interval=30                        # Heartbeat interval (seconds)
)

Error Handling

The SDK provides comprehensive error handling:

from python_sdk.schedulersdk import SchedulerClient
import requests

client = SchedulerClient("http://localhost:8080")

try:
    result = client.execute_sync("nonexistent_method", {})
except requests.RequestException as e:
    print(f"Network error: {e}")
except ValueError as e:
    print(f"Invalid response: {e}")
except RuntimeError as e:
    print(f"Task execution error: {e}")
except TimeoutError as e:
    print(f"Timeout: {e}")

Development

Running Tests

pip install -e ".[dev]"
pytest

Code Formatting

black python_sdk/
flake8 python_sdk/
mypy python_sdk/

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

go_server_sdk-1.2.0.tar.gz (15.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

go_server_sdk-1.2.0-py3-none-any.whl (11.2 kB view details)

Uploaded Python 3

File details

Details for the file go_server_sdk-1.2.0.tar.gz.

File metadata

  • Download URL: go_server_sdk-1.2.0.tar.gz
  • Upload date:
  • Size: 15.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.4

File hashes

Hashes for go_server_sdk-1.2.0.tar.gz
Algorithm Hash digest
SHA256 dd179841669d72e93a6c27943f818b1a41cafdcec6cb4d9d5f79e986fdefaf73
MD5 4b0a4b5e149d7a1a1f9f0db4c0eb6700
BLAKE2b-256 e3f3bfa21f23e09a7cccf3383f79cccd1c4edb694690633de78c29dad0f66c42

See more details on using hashes here.

File details

Details for the file go_server_sdk-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: go_server_sdk-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 11.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.4

File hashes

Hashes for go_server_sdk-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e77ee5073a631d79fa712a673d03c099859cfef4b580abcfd7da883f9b55d8ea
MD5 4608700d177fcf9b285b5d6d24070e97
BLAKE2b-256 f63dfac537912241722617519f7eb1df46c97f2d0341ad0a938a193eefaf30cf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page