Skip to main content

Python client for the Kaio multi-tenant machine learning platform

Project description

Kaio Python Client

A Python client for the Kaio multi-tenant machine learning platform that enables developers to run SageMaker Processing jobs through simple APIs with automatic base docker image resolution, secure file uploads, and multi-instance parallel processing capabilities.

Installation

pip install kaio

Quick Start

from kaio import Client

# Initialize client
client = Client("https://api.kaion5.com")

# Login with your API key
client.login("your-api-key")

# Submit a job
result = client.submit_job(
    directory="./my_code",
    job_name="training-job",
    instance_type="ml.g4dn.xlarge",
    entrypoint="train.py"
)

Features

  • SageMaker Processing & Training Jobs: Runs data processing, ML workloads, and full training jobs on AWS SageMaker
  • Multi-Instance Processing & Training: Parallel processing and distributed training across multiple instances
  • Distributed Training Support: MPI, Parameter Server, and SageMaker Distributed training configurations
  • Spot Instance Support: Cost optimization with managed spot instances for training jobs
  • Automatic Image Resolution: Detects your local ML framework and selects appropriate Docker images
  • Secure File Uploads: Handles code packaging and S3 uploads automatically
  • Job Management: Submit, monitor, and download results from SageMaker jobs
  • Web Dashboard: Access your jobs dashboard at https://www.kaion5.com/home/kaio-platform.html to monitor jobs and download outputs
  • Multi-Framework Support: Works with PyTorch, TensorFlow, and Scikit-learn
  • GPU/CPU Instance Matching: Automatically selects GPU or CPU optimized containers
  • Automatic Dependencies: Adds required packages (nbconvert, psutil, GPUtil) to requirements.txt
  • JWT Token Management: Handles authentication token refresh automatically
  • Environment Variable Access: Provides SM_CURRENT_HOST and SM_HOSTS for custom parallelization logic

API Reference

Client

Client(api_base, verbose=False)

Initialize the Kaio client.

Parameters:

  • api_base (str): Base URL of the Kaio API endpoint
  • verbose (bool): Enable verbose logging for debugging. Defaults to False.

Example:

client = Client("https://api.kaion5.com", verbose=True)

login(api_key)

Authenticate with API key and obtain JWT token.

Parameters:

  • api_key (str): Your Kaio platform API key

Returns:

  • Client: Self for method chaining

Raises:

  • requests.HTTPError: If authentication fails

submit_job(**kwargs)

Submit a SageMaker Processing or Training job with automatic image resolution.

Parameters:

  • directory (str): Path to code directory containing your code and data. All files in this directory will be packaged and uploaded to SageMaker. Defaults to current directory.
  • job_name (str): Unique name for the job. Defaults to "job".
  • instance_type (str): SageMaker instance type. Defaults to "ml.m5.large".
  • instance_count (int): Number of instances for parallel processing. Defaults to 1.
    • Multi-Instance Processing: When instance_count > 1, SageMaker launches multiple instances to process large datasets in parallel
    • Environment Variables: Each instance receives SM_CURRENT_HOST (current instance identifier) and SM_HOSTS (comma-separated list of all instance identifiers)
    • No Inter-Instance Networking: Instances run independently without network communication between them
    • Custom Parallelization Logic: Use SM_CURRENT_HOST and SM_HOSTS to implement data partitioning and parallel processing
  • volume_size_gb (int): EBS volume size in GB. Defaults to 5. Maximum 50GB.
  • entrypoint (str): Main script to execute (.py or .ipynb). Defaults to "train.py".
  • input_data (str, optional): S3 URI for input data (not implemented yet).
  • framework (str, optional): ML framework ("pytorch", "tensorflow", "sklearn").
  • framework_version (str, optional): Framework version.
  • image_uri (str, optional): Custom Docker image URI. If provided, overrides framework resolution.
  • workflow (str): Workflow type - "processing" or "training". Defaults to "processing".
  • training_args (dict, optional): Advanced training configuration for training workflow.

Returns:

  • dict: Job submission result with status, job_name, and entrypoint

Raises:

  • requests.HTTPError: If API calls fail
  • FileNotFoundError: If directory or entrypoint doesn't exist
  • ValueError: If code package exceeds volume capacity

get_jobs()

Get all jobs for the authenticated user.

Returns:

  • dict: List of jobs with count

Raises:

  • requests.HTTPError: If API error

Example:

jobs = client.get_jobs()
print(f"Total jobs: {jobs['count']}")

get_job(unique_job_name)

Get specific job status and details.

Parameters:

  • unique_job_name (str): Unique job name returned from submit_job

Returns:

  • dict: Job details including status and timestamp

Raises:

  • requests.HTTPError: If job not found or API error

Example:

job = client.get_job("unique-job-name-123")
print(f"Status: {job['status']}")

download_output(unique_job_name, output_dir=".")

Download completed job output files.

Parameters:

  • unique_job_name (str): Unique job name returned from submit_job
  • output_dir (str): Local directory to save output. Defaults to current directory.

Returns:

  • list: List of downloaded file paths

Raises:

  • RuntimeError: If job is not completed
  • requests.HTTPError: If download fails

Example:

files = client.download_output("unique-job-name-123", "./results")
for file_path in files:
    print(f"Downloaded: {file_path}")

Supported Instance Types

CPU Instances

  • ml.m5.large, ml.m5.xlarge, ml.m5.2xlarge, ml.m5.4xlarge
  • ml.c5.large, ml.c5.xlarge, ml.c5.2xlarge, ml.c5.4xlarge

GPU Instances

  • ml.g4dn.xlarge, ml.g4dn.2xlarge, ml.g4dn.4xlarge, ml.g4dn.8xlarge
  • ml.p3.2xlarge, ml.p3.8xlarge, ml.p3.16xlarge
  • ml.g5.xlarge, ml.g5.2xlarge, ml.g5.4xlarge, ml.g5.8xlarge

Training Arguments

For training workflow jobs, use the training_args parameter to configure advanced training features:

Spot Instances

Reduce training costs by up to 90% with managed spot instances:

training_args = {
    "spot_instances": {
        "enabled": True,
        "max_wait": 3600  # Max wait time in seconds
    }
}

Distributed Training

MPI (Message Passing Interface)

For multi-node communication in distributed training:

training_args = {
    "distributed_training": {
        "mpi": {
            "enabled": True,
            "processes_per_host": 8,  # Usually = GPUs per instance
            "custom_mpi_options": "-verbose"
        }
    }
}

Parameter Server

For TensorFlow distributed training:

training_args = {
    "distributed_training": {
        "parameter_server": {"enabled": True}
    }
}

SageMaker Distributed

Optimized distributed training for large models:

training_args = {
    "distributed_training": {
        "smdistributed": {
            "dataparallel": {"enabled": True},  # Faster gradient sync
            "modelparallel": {"enabled": True, "parameters": {}}  # Large model splitting
        }
    }
}

Framework Auto-Detection

The SDK automatically detects your local ML framework and selects appropriate Docker images:

  • PyTorch: Detects version and selects matching SageMaker PyTorch container
  • TensorFlow: Detects version and selects matching SageMaker TensorFlow container
  • Scikit-learn: Falls back to scikit-learn container for general ML workloads

Code Requirements

Data and Code Structure

Important: All data needed for your job must be included in the directory you submit with client.submit_job(). The entire directory is packaged and uploaded to SageMaker.

Security Notice: Jobs run securely in Kaion5 Compute's AWS account. Do not upload sensitive or confidential data. All uploaded data and job outputs are automatically deleted after 7 days. Future development will include Bring-Your-Own-Account workflows for enhanced security.

Telemetry Data: Kaion5 Compute retains job telemetry data (job names, instance types, status, runtime, compute metrics, storage configurations) for platform optimization. Code, data, and logs are permanently deleted after 7 days.

File Size Limits

Code packages must not exceed half your volume size:

  • 5GB volume → 2.5GB max code package
  • 10GB volume → 5GB max code package
  • Maximum storage per job: 50GB

Multi-Instance Code Structure

For multi-instance jobs, your code should use SageMaker environment variables:

import os

def main():
    # Get SageMaker environment variables
    current_host = os.environ.get('SM_CURRENT_HOST', 'algo-1')
    all_hosts = os.environ.get('SM_HOSTS', 'algo-1').split(',')
    
    # Determine this instance's role
    host_rank = all_hosts.index(current_host)
    total_hosts = len(all_hosts)
    
    print(f"Running on {current_host} (rank {host_rank}/{total_hosts})")
    
    # For processing: implement data partitioning logic
    # For training: use with distributed training frameworks
    if os.environ.get('SM_TRAINING_ENV'):  # Training job
        setup_distributed_training(host_rank, total_hosts)
    else:  # Processing job
        process_data_partition(host_rank, total_hosts)

if __name__ == "__main__":
    main()

Requirements File

Important: Include a requirements.txt file in your code directory listing all Python packages your job needs. This file will be automatically installed within the job environment.

Automatic Dependencies

The SDK automatically adds these packages to your requirements.txt:

  • nbconvert - For Jupyter notebook execution
  • psutil - For system monitoring
  • GPUtil - For GPU monitoring

Workflows

Processing Workflow

SageMaker Processing Jobs are designed for data processing, feature engineering, model evaluation, and basic model training tasks. When using instance_count > 1, you can parallelize processing of large datasets across multiple instances.

Environment Variables: Each instance receives SM_CURRENT_HOST and SM_HOSTS for custom parallelization logic.

Limitations: No inter-instance communication or distributed training support.

Training Workflow

SageMaker Training Jobs provide full distributed training capabilities with advanced features:

  • Distributed Training: MPI, Parameter Server, and SageMaker Distributed training
  • Spot Instance Support: Cost optimization with managed spot instances
  • Checkpointing: Automatic model checkpointing and recovery
  • Advanced ML Features: Framework-specific distributed training optimizations

Multi-Instance Processing Example

import os

# Get instance information
current_host = os.environ.get('SM_CURRENT_HOST', 'algo-1')
all_hosts = os.environ.get('SM_HOSTS', 'algo-1').split(',')
host_rank = all_hosts.index(current_host)
total_hosts = len(all_hosts)

print(f"Instance {host_rank + 1} of {total_hosts} (Host: {current_host})")

# Partition data based on host rank
data_partition_size = len(dataset) // total_hosts
start_idx = host_rank * data_partition_size
end_idx = start_idx + data_partition_size if host_rank < total_hosts - 1 else len(dataset)

# Process assigned data partition
my_data = dataset[start_idx:end_idx]
process_data_partition(my_data)

Examples

Processing Workflow Examples

from kaio import Client

client = Client("https://api.kaion5.com")
client.login("your-api-key")

# Single instance data processing
result = client.submit_job(
    directory="./data_processing",
    job_name="data-preprocessing",
    workflow="processing",
    instance_type="ml.m5.xlarge",
    entrypoint="preprocess.py"
)

# Multi-instance parallel processing
result = client.submit_job(
    directory="./parallel_processing",
    job_name="multi-instance-processing",
    workflow="processing",
    instance_type="ml.m5.2xlarge",
    instance_count=4,
    entrypoint="process_data.py",
    volume_size_gb=50
)

Training Workflow Examples

# Basic training job
result = client.submit_job(
    directory="./training_code",
    job_name="model-training",
    workflow="training",
    instance_type="ml.g4dn.xlarge",
    entrypoint="train.py"
)

# Training with spot instances (cost optimization)
result = client.submit_job(
    directory="./training_code",
    job_name="spot-training",
    workflow="training",
    instance_type="ml.p3.2xlarge",
    training_args={
        "spot_instances": {
            "enabled": True,
            "max_wait": 7200  # 2 hours max wait
        }
    }
)

# Distributed training with MPI
result = client.submit_job(
    directory="./distributed_training",
    job_name="distributed-training",
    workflow="training",
    instance_type="ml.p3.8xlarge",
    instance_count=4,
    training_args={
        "distributed_training": {
            "mpi": {
                "enabled": True,
                "processes_per_host": 4  # 4 GPUs per p3.8xlarge
            }
        }
    }
)

# Large model training with SageMaker distributed
result = client.submit_job(
    directory="./large_model",
    job_name="large-model-training",
    workflow="training",
    instance_type="ml.p4d.24xlarge",
    instance_count=2,
    training_args={
        "spot_instances": {"enabled": True, "max_wait": 10800},
        "distributed_training": {
            "smdistributed": {
                "modelparallel": {"enabled": True},
                "dataparallel": {"enabled": True}
            }
        }
    }
)

Framework-Specific Examples

# TensorFlow with custom image
result = client.submit_job(
    directory="./tensorflow_training",
    job_name="tf-training",
    workflow="training",
    instance_type="ml.p3.2xlarge",
    framework="tensorflow",
    framework_version="2.13.0",
    entrypoint="train.py"
)

# Jupyter notebook execution
result = client.submit_job(
    directory="./notebooks",
    job_name="notebook-analysis",
    workflow="processing",
    instance_type="ml.m5.xlarge",
    entrypoint="analysis.ipynb"
)

Job Management

Getting All Jobs

# Get list of all your jobs
jobs = client.get_jobs()
print(f"Total jobs: {jobs['count']}")
for job in jobs['jobs']:
    print(f"Job: {job['job_name']} - Status: {job['status']}")

Getting Job Status

# Get specific job details (use unique_job_name from submit_job response)
job_details = client.get_job("unique-job-name-123")
print(f"Status: {job_details['status']}")
print(f"Started: {job_details['start_time']}")
if job_details['status'] == 'Completed':
    print(f"Completed: {job_details['end_time']}")

Downloading Job Outputs

# Download all output files from a completed job
downloaded_files = client.download_output("unique-job-name-123", "./results")
for file_path in downloaded_files:
    print(f"Downloaded: {file_path}")

Complete Job Workflow

# Submit training job
result = client.submit_job(
    directory="./my_training",
    job_name="model-training",
    workflow="training",
    instance_type="ml.g4dn.xlarge"
)

unique_job_name = result['unique_job_name']
print(f"Job submitted: {unique_job_name}")

# Monitor job status
import time
while True:
    job = client.get_job(unique_job_name)
    status = job['status']
    print(f"Job status: {status}")
    
    if status in ['Completed', 'Failed', 'Stopped']:
        break
    
    time.sleep(30)  # Check every 30 seconds

# Download results if completed
if status == 'Completed':
    files = client.download_output(unique_job_name, "./results")
    print(f"Downloaded {len(files)} files")

Error Handling

import requests

try:
    result = client.submit_job(
        directory="./code",
        job_name="my-job",
        instance_type="ml.g4dn.xlarge"
    )
except requests.HTTPError as e:
    if e.response.status_code == 403:
        print("Access denied - check your API key")
    else:
        print(f"API error: {e}")
except ValueError as e:
    print(f"Configuration error: {e}")
except FileNotFoundError as e:
    print(f"File not found: {e}")

Job Dashboard

Access your jobs at: https://www.kaion5.com/home/kaio-platform.html

After signing in to the Kaio Platform, you can:

  • Monitor job status and progress
  • View job logs and details
  • Download job outputs and results
  • Manage your job history

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kaio-0.1.7.tar.gz (10.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kaio-0.1.7-py3-none-any.whl (7.1 kB view details)

Uploaded Python 3

File details

Details for the file kaio-0.1.7.tar.gz.

File metadata

  • Download URL: kaio-0.1.7.tar.gz
  • Upload date:
  • Size: 10.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.4

File hashes

Hashes for kaio-0.1.7.tar.gz
Algorithm Hash digest
SHA256 4590185c6c2e807ca0c9dfd1e6626650ad1be82a3585eb57081c62ad6ac57555
MD5 aed9f4840483ff36df1e3b3d46329ffc
BLAKE2b-256 1dcbc3802c2012d4d62071a1e9b91911e0f7ee54ba7aa0f33ebb7ec1f23271c8

See more details on using hashes here.

File details

Details for the file kaio-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: kaio-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 7.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.4

File hashes

Hashes for kaio-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 2adbfcdba13d1358e80cfb41b48ad76919af704ad7d52481b244874da0eb6840
MD5 3aa1f000a01e8161b710d556288faa47
BLAKE2b-256 160981af2e4eb8a716d47bcafbbf4c42a4c5335753488c9d60340f45273d9785

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page