Skip to main content

MLOps Python SDK for XCloud Service API

Project description

MLOps Python SDK

MLOps Python SDK for XCloud Service API. Manage and execute tasks with confidence.

Installation

Install the SDK from PyPI:

pip install mlops-python-sdk

Quick Start

1. Setup Authentication

You can authenticate using either an API Key or an Access Token.

Option 1: API Key (Recommended for programmatic access)

  1. Sign up at MLOps
  2. Create an API key from API Keys
  3. Set environment variables:
export MLOPS_API_KEY=xck_******
export MLOPS_DOMAIN=localhost:8090  # optional, default is localhost:8090

Option 2: Access Token (For user authentication)

export MLOPS_ACCESS_TOKEN=your_access_token
export MLOPS_DOMAIN=localhost:8090  # optional

2. Basic Usage

from client import Task, ConnectionConfig
from client.api.client.models.task_status import TaskStatus

# Initialize Task client (uses environment variables by default)
task = Task()

# Or initialize with explicit configuration
config = ConnectionConfig(
    api_key="xck_******",
    domain="localhost:8090",
    debug=False
)
task = Task(config=config)

# Submit a task with script
result = task.submit(
    name="my-training-task",
    cluster_id=1,
    script="#!/bin/bash\necho 'Hello World'",
    resources={"cpu": 4, "memory": "8GB", "gpu": 1}
)

# Or submit with command
result = task.submit(
    name="my-task",
    cluster_id=1,
    command="python train.py",
    resources={"cpu": 4, "memory": "8GB"}
)

# Get task details
task_info = task.get(task_id=result.job_id, cluster_id=1)

# List tasks with filters
running_tasks = task.list(
    status=TaskStatus.RUNNING,
    cluster_id=1,
    page=1,
    page_size=20
)

# Cancel a task
task.cancel(task_id=result.job_id, cluster_id=1)

# Delete a task
task.delete(task_id=task_id, cluster_id=1)

API Reference

Task Class

The Task class provides a high-level interface for managing tasks.

Initialization

from client import Task, ConnectionConfig

# Using environment variables
task = Task()

# With explicit configuration
config = ConnectionConfig(
    api_key="xck_******",           # API key for authentication
    access_token="token_******",     # Access token (alternative to API key)
    domain="localhost:8090",         # API domain
    debug=False,                      # Enable debug mode
    request_timeout=30.0              # Request timeout in seconds
)
task = Task(config=config)

# Or pass parameters directly
task = Task(
    api_key="xck_******",
    domain="localhost:8090"
)

Methods

submit()

Submit a new task to the cluster.

result = task.submit(
    name: str,                    # Task name (required)
    cluster_id: int,              # Cluster ID (required)
    script: Optional[str] = None, # Script content (script or command required)
    command: Optional[str] = None,# Command to execute (script or command required)
    resources: Optional[dict] = None, # Resource requirements
    team_id: Optional[int] = None # Team ID (optional)
) -> TaskSubmitResponse

Resources dictionary can contain:

  • cpu or cpus_per_task: Number of CPUs
  • memory: Memory requirement (e.g., "8GB", "4096M")
  • nodes: Number of nodes
  • gres: GPU resources (e.g., "gpu:1")
  • time: Time limit (e.g., "1-00:00:00" for 1 day)
  • partition: Partition name
  • tres: TRES specification

Example:

result = task.submit(
    name="ml-training",
    cluster_id=1,
    script="#!/bin/bash\npython train.py --epochs 100",
    resources={
        "cpu": 8,
        "memory": "16GB",
        "gpu": 1,
        "time": "2-00:00:00",  # 2 days
        "partition": "gpu"
    }
)
print(f"Task submitted: Job ID = {result.job_id}")
get()

Get task details by task ID.

task_info = task.get(
    task_id: int,    # Task ID (Slurm job ID)
    cluster_id: int  # Cluster ID (required)
) -> Task

Example:

task_info = task.get(task_id=12345, cluster_id=1)
print(f"Task status: {task_info.status}")
print(f"Task name: {task_info.name}")
list()

List tasks with optional filters and pagination.

tasks = task.list(
    page: int = 1,                           # Page number
    page_size: int = 20,                     # Items per page
    status: Optional[TaskStatus] = None,     # Filter by status
    cluster_id: Optional[int] = None,         # Filter by cluster ID
    team_id: Optional[int] = None,           # Filter by team ID
    user_id: Optional[int] = None            # Filter by user ID
) -> TaskListResponse

Example:

from client.api.client.models.task_status import TaskStatus

# List all running tasks
running_tasks = task.list(status=TaskStatus.RUNNING)

# List tasks in a specific cluster
cluster_tasks = task.list(cluster_id=1, page=1, page_size=10)

# List completed tasks with pagination
completed = task.list(
    status=TaskStatus.COMPLETED,
    cluster_id=1,
    page=1,
    page_size=50
)
cancel()

Cancel a running task.

task.cancel(
    task_id: int,    # Task ID (Slurm job ID)
    cluster_id: int  # Cluster ID (required)
)

Example:

task.cancel(task_id=12345, cluster_id=1)

TaskStatus Enum

Task status values for filtering:

from client.api.client.models.task_status import TaskStatus

TaskStatus.PENDING      # Task is pending
TaskStatus.QUEUED       # Task is queued
TaskStatus.RUNNING      # Task is running
TaskStatus.COMPLETED    # Task completed successfully
TaskStatus.SUCCEEDED    # Task succeeded
TaskStatus.FAILED       # Task failed
TaskStatus.CANCELLED    # Task was cancelled
TaskStatus.CREATED      # Task was created

Configuration

Environment Variables

The SDK reads configuration from environment variables:

  • MLOPS_API_KEY: API key for authentication
  • MLOPS_ACCESS_TOKEN: Access token for authentication (alternative to API key)
  • MLOPS_DOMAIN: API domain (default: localhost:8090)
  • MLOPS_DEBUG: Enable debug mode (true/false, default: false)
  • MLOPS_API_PATH: API path prefix (default: /api/v1)

ConnectionConfig

You can also configure the connection programmatically:

from client import ConnectionConfig

config = ConnectionConfig(
    domain="api.example.com",
    api_key="xck_******",
    debug=True,
    request_timeout=60.0,
    api_path="/api/v1"
)

Error Handling

The SDK provides specific exception types:

from client.exceptions import (
    APIException,           # General API errors
    AuthenticationException, # Authentication failures
    NotFoundException,       # Resource not found
    RateLimitException,     # Rate limit exceeded
    TimeoutException,       # Request timeout
    InvalidArgumentException # Invalid arguments
)

try:
    result = task.submit(name="test", cluster_id=1, command="echo hello")
except AuthenticationException as e:
    print(f"Authentication failed: {e}")
except NotFoundException as e:
    print(f"Resource not found: {e}")
except APIException as e:
    print(f"API error: {e}")

Examples

Submit a Machine Learning Training Job

from client import Task

task = Task()

result = task.submit(
    name="pytorch-training",
    cluster_id=1,
    script="""#!/bin/bash
#SBATCH --gres=gpu:1
#SBATCH --cpus-per-task=8
#SBATCH --mem=32GB

python train.py --config config.yaml
""",
    resources={
        "cpus_per_task": 8,
        "memory": "32GB",
        "gres": "gpu:1",
        "time": "4-00:00:00",  # 4 days
        "partition": "gpu"
    }
)

print(f"Training job submitted: {result.job_id}")

Monitor Task Status

from client import Task
from client.api.client.models.task_status import TaskStatus
import time

task = Task()
job_id = 12345
cluster_id = 1

while True:
    task_info = task.get(task_id=job_id, cluster_id=cluster_id)
    print(f"Status: {task_info.status}")
    
    if task_info.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:
        break
    
    time.sleep(10)  # Check every 10 seconds

List and Filter Tasks

from client import Task
from client.api.client.models.task_status import TaskStatus

task = Task()

# Get all running tasks in cluster 1
running = task.list(
    status=TaskStatus.RUNNING,
    cluster_id=1
)

for t in running.tasks:
    print(f"{t.name}: {t.status} (Job ID: {t.job_id})")

# Get failed tasks
failed = task.list(status=TaskStatus.FAILED)

print(f"Total failed tasks: {failed.total}")

Documentation

License

MIT

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mlops_python_sdk-0.0.1.tar.gz (28.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mlops_python_sdk-0.0.1-py3-none-any.whl (41.8 kB view details)

Uploaded Python 3

File details

Details for the file mlops_python_sdk-0.0.1.tar.gz.

File metadata

  • Download URL: mlops_python_sdk-0.0.1.tar.gz
  • Upload date:
  • Size: 28.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.8 Darwin/23.2.0

File hashes

Hashes for mlops_python_sdk-0.0.1.tar.gz
Algorithm Hash digest
SHA256 be65d7e75f537581466b54399f561d6de8a0cd3e0834015d694127d95b969947
MD5 dbc7274fcdf0c9f0a1d5bb5fea077d11
BLAKE2b-256 9cb83c0061c09fd5f35b93f51c334cdab30d3f7c142588b10607124cd8da644d

See more details on using hashes here.

File details

Details for the file mlops_python_sdk-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: mlops_python_sdk-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 41.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.8 Darwin/23.2.0

File hashes

Hashes for mlops_python_sdk-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 4c5dde1bdc501612cfeebbfe1f1ced92bf8b7d8205deae49ad59a40c9aab6282
MD5 d41d30558515b93bbcd882b59abba7de
BLAKE2b-256 838c6de1902f766bd2adda886f2653c8b9c8a6ff715812fbcef393471690a359

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page