Python client for the Kaio multi-tenant machine learning platform
Project description
Kaio Python Client
A Python client for the Kaio multi-tenant machine learning platform that enables developers to run SageMaker Processing jobs through simple APIs with automatic base docker image resolution, secure file uploads, and multi-instance parallel processing capabilities.
Installation
pip install kaio
Quick Start
from kaio import Client
# Initialize client
client = Client("https://api.kaion5.com")
# Login with your API key
client.login("your-api-key")
# Submit a job
result = client.submit_job(
directory="./my_code",
job_name="training-job",
instance_type="ml.g4dn.xlarge",
entrypoint="train.py"
)
Features
- SageMaker Processing & Training Jobs: Runs data processing, ML workloads, and full training jobs on AWS SageMaker
- Multi-Instance Processing & Training: Parallel processing and distributed training across multiple instances
- Distributed Training Support: MPI, Parameter Server, and SageMaker Distributed training configurations
- Spot Instance Support: Cost optimization with managed spot instances for training jobs
- Automatic Image Resolution: Detects your local ML framework and selects appropriate Docker images
- Secure File Uploads: Handles code packaging and S3 uploads automatically
- Job Management: Submit, monitor, and download results from SageMaker jobs
- Web Dashboard: Access your jobs dashboard at https://www.kaion5.com/home/kaio-platform.html to monitor jobs and download outputs
- Multi-Framework Support: Works with PyTorch, TensorFlow, and Scikit-learn
- GPU/CPU Instance Matching: Automatically selects GPU or CPU optimized containers
- Automatic Dependencies: Adds required packages (nbconvert, psutil, GPUtil) to requirements.txt
- JWT Token Management: Handles authentication token refresh automatically
- Environment Variable Access: Provides SM_CURRENT_HOST and SM_HOSTS for custom parallelization logic
API Reference
Client
Client(api_base, verbose=False)
Initialize the Kaio client.
Parameters:
api_base(str): Base URL of the Kaio API endpointverbose(bool): Enable verbose logging for debugging. Defaults to False.
Example:
client = Client("https://api.kaion5.com", verbose=True)
login(api_key)
Authenticate with API key and obtain JWT token.
Parameters:
api_key(str): Your Kaio platform API key
Returns:
Client: Self for method chaining
Raises:
requests.HTTPError: If authentication fails
submit_job(**kwargs)
Submit a SageMaker Processing or Training job with automatic image resolution.
Parameters:
directory(str): Path to code directory containing your code and data. All files in this directory will be packaged and uploaded to SageMaker. Defaults to current directory.job_name(str): Unique name for the job. Defaults to "job".instance_type(str): SageMaker instance type. Defaults to "ml.m5.large".instance_count(int): Number of instances for parallel processing. Defaults to 1.- Multi-Instance Processing: When
instance_count > 1, SageMaker launches multiple instances to process large datasets in parallel - Environment Variables: Each instance receives
SM_CURRENT_HOST(current instance identifier) andSM_HOSTS(comma-separated list of all instance identifiers) - No Inter-Instance Networking: Instances run independently without network communication between them
- Custom Parallelization Logic: Use
SM_CURRENT_HOSTandSM_HOSTSto implement data partitioning and parallel processing
- Multi-Instance Processing: When
volume_size_gb(int): EBS volume size in GB. Defaults to 5. Maximum 50GB.entrypoint(str): Main script to execute (.py or .ipynb). Defaults to "train.py".input_data(str, optional): S3 URI for input data (not implemented yet).framework(str, optional): ML framework ("pytorch", "tensorflow", "sklearn").framework_version(str, optional): Framework version.image_uri(str, optional): Custom Docker image URI. If provided, overrides framework resolution.workflow(str): Workflow type - "processing" or "training". Defaults to "processing".training_args(dict, optional): Advanced training configuration for training workflow.
Returns:
dict: Job submission result with status, job_name, and entrypoint
Raises:
requests.HTTPError: If API calls failFileNotFoundError: If directory or entrypoint doesn't existValueError: If code package exceeds volume capacity
get_jobs()
Get all jobs for the authenticated user.
Returns:
dict: List of jobs with count
Raises:
requests.HTTPError: If API error
Example:
jobs = client.get_jobs()
print(f"Total jobs: {jobs['count']}")
get_job(unique_job_name)
Get specific job status and details.
Parameters:
unique_job_name(str): Unique job name returned from submit_job
Returns:
dict: Job details including status and timestamp
Raises:
requests.HTTPError: If job not found or API error
Example:
job = client.get_job("unique-job-name-123")
print(f"Status: {job['status']}")
download_output(unique_job_name, output_dir=".")
Download completed job output files.
Parameters:
unique_job_name(str): Unique job name returned from submit_joboutput_dir(str): Local directory to save output. Defaults to current directory.
Returns:
list: List of downloaded file paths
Raises:
RuntimeError: If job is not completedrequests.HTTPError: If download fails
Example:
files = client.download_output("unique-job-name-123", "./results")
for file_path in files:
print(f"Downloaded: {file_path}")
Supported Instance Types
CPU Instances
ml.m5.large,ml.m5.xlarge,ml.m5.2xlarge,ml.m5.4xlargeml.c5.large,ml.c5.xlarge,ml.c5.2xlarge,ml.c5.4xlarge
GPU Instances
ml.g4dn.xlarge,ml.g4dn.2xlarge,ml.g4dn.4xlarge,ml.g4dn.8xlargeml.p3.2xlarge,ml.p3.8xlarge,ml.p3.16xlargeml.g5.xlarge,ml.g5.2xlarge,ml.g5.4xlarge,ml.g5.8xlarge
Training Arguments
For training workflow jobs, use the training_args parameter to configure advanced training features:
Spot Instances
Reduce training costs by up to 90% with managed spot instances:
training_args = {
"spot_instances": {
"enabled": True,
"max_wait": 3600 # Max wait time in seconds
}
}
Distributed Training
MPI (Message Passing Interface)
For multi-node communication in distributed training:
training_args = {
"distributed_training": {
"mpi": {
"enabled": True,
"processes_per_host": 8, # Usually = GPUs per instance
"custom_mpi_options": "-verbose"
}
}
}
Parameter Server
For TensorFlow distributed training:
training_args = {
"distributed_training": {
"parameter_server": {"enabled": True}
}
}
SageMaker Distributed
Optimized distributed training for large models:
training_args = {
"distributed_training": {
"smdistributed": {
"dataparallel": {"enabled": True}, # Faster gradient sync
"modelparallel": {"enabled": True, "parameters": {}} # Large model splitting
}
}
}
Framework Auto-Detection
The SDK automatically detects your local ML framework and selects appropriate Docker images:
- PyTorch: Detects version and selects matching SageMaker PyTorch container
- TensorFlow: Detects version and selects matching SageMaker TensorFlow container
- Scikit-learn: Falls back to scikit-learn container for general ML workloads
Code Requirements
Data and Code Structure
Important: All data needed for your job must be included in the directory you submit with client.submit_job(). The entire directory is packaged and uploaded to SageMaker.
Security Notice: Jobs run securely in Kaion5 Compute's AWS account. Do not upload sensitive or confidential data. All uploaded data and job outputs are automatically deleted after 7 days. Future development will include Bring-Your-Own-Account workflows for enhanced security.
Telemetry Data: Kaion5 Compute retains job telemetry data (job names, instance types, status, runtime, compute metrics, storage configurations) for platform optimization. Code, data, and logs are permanently deleted after 7 days.
File Size Limits
Code packages must not exceed half your volume size:
- 5GB volume → 2.5GB max code package
- 10GB volume → 5GB max code package
- Maximum storage per job: 50GB
Multi-Instance Code Structure
For multi-instance jobs, your code should use SageMaker environment variables:
import os
def main():
# Get SageMaker environment variables
current_host = os.environ.get('SM_CURRENT_HOST', 'algo-1')
all_hosts = os.environ.get('SM_HOSTS', 'algo-1').split(',')
# Determine this instance's role
host_rank = all_hosts.index(current_host)
total_hosts = len(all_hosts)
print(f"Running on {current_host} (rank {host_rank}/{total_hosts})")
# For processing: implement data partitioning logic
# For training: use with distributed training frameworks
if os.environ.get('SM_TRAINING_ENV'): # Training job
setup_distributed_training(host_rank, total_hosts)
else: # Processing job
process_data_partition(host_rank, total_hosts)
if __name__ == "__main__":
main()
Requirements File
Important: Include a requirements.txt file in your code directory listing all Python packages your job needs. This file will be automatically installed within the job environment.
Automatic Dependencies
The SDK automatically adds these packages to your requirements.txt:
nbconvert- For Jupyter notebook executionpsutil- For system monitoringGPUtil- For GPU monitoring
Workflows
Processing Workflow
SageMaker Processing Jobs are designed for data processing, feature engineering, model evaluation, and basic model training tasks. When using instance_count > 1, you can parallelize processing of large datasets across multiple instances.
Environment Variables: Each instance receives SM_CURRENT_HOST and SM_HOSTS for custom parallelization logic.
Limitations: No inter-instance communication or distributed training support.
Training Workflow
SageMaker Training Jobs provide full distributed training capabilities with advanced features:
- Distributed Training: MPI, Parameter Server, and SageMaker Distributed training
- Spot Instance Support: Cost optimization with managed spot instances
- Checkpointing: Automatic model checkpointing and recovery
- Advanced ML Features: Framework-specific distributed training optimizations
Multi-Instance Processing Example
import os
# Get instance information
current_host = os.environ.get('SM_CURRENT_HOST', 'algo-1')
all_hosts = os.environ.get('SM_HOSTS', 'algo-1').split(',')
host_rank = all_hosts.index(current_host)
total_hosts = len(all_hosts)
print(f"Instance {host_rank + 1} of {total_hosts} (Host: {current_host})")
# Partition data based on host rank
data_partition_size = len(dataset) // total_hosts
start_idx = host_rank * data_partition_size
end_idx = start_idx + data_partition_size if host_rank < total_hosts - 1 else len(dataset)
# Process assigned data partition
my_data = dataset[start_idx:end_idx]
process_data_partition(my_data)
Examples
Processing Workflow Examples
from kaio import Client
client = Client("https://api.kaion5.com")
client.login("your-api-key")
# Single instance data processing
result = client.submit_job(
directory="./data_processing",
job_name="data-preprocessing",
workflow="processing",
instance_type="ml.m5.xlarge",
entrypoint="preprocess.py"
)
# Multi-instance parallel processing
result = client.submit_job(
directory="./parallel_processing",
job_name="multi-instance-processing",
workflow="processing",
instance_type="ml.m5.2xlarge",
instance_count=4,
entrypoint="process_data.py",
volume_size_gb=50
)
Training Workflow Examples
# Basic training job
result = client.submit_job(
directory="./training_code",
job_name="model-training",
workflow="training",
instance_type="ml.g4dn.xlarge",
entrypoint="train.py"
)
# Training with spot instances (cost optimization)
result = client.submit_job(
directory="./training_code",
job_name="spot-training",
workflow="training",
instance_type="ml.p3.2xlarge",
training_args={
"spot_instances": {
"enabled": True,
"max_wait": 7200 # 2 hours max wait
}
}
)
# Distributed training with MPI
result = client.submit_job(
directory="./distributed_training",
job_name="distributed-training",
workflow="training",
instance_type="ml.p3.8xlarge",
instance_count=4,
training_args={
"distributed_training": {
"mpi": {
"enabled": True,
"processes_per_host": 4 # 4 GPUs per p3.8xlarge
}
}
}
)
# Large model training with SageMaker distributed
result = client.submit_job(
directory="./large_model",
job_name="large-model-training",
workflow="training",
instance_type="ml.p4d.24xlarge",
instance_count=2,
training_args={
"spot_instances": {"enabled": True, "max_wait": 10800},
"distributed_training": {
"smdistributed": {
"modelparallel": {"enabled": True},
"dataparallel": {"enabled": True}
}
}
}
)
Framework-Specific Examples
# TensorFlow with custom image
result = client.submit_job(
directory="./tensorflow_training",
job_name="tf-training",
workflow="training",
instance_type="ml.p3.2xlarge",
framework="tensorflow",
framework_version="2.13.0",
entrypoint="train.py"
)
# Jupyter notebook execution
result = client.submit_job(
directory="./notebooks",
job_name="notebook-analysis",
workflow="processing",
instance_type="ml.m5.xlarge",
entrypoint="analysis.ipynb"
)
Job Management
Getting All Jobs
# Get list of all your jobs
jobs = client.get_jobs()
print(f"Total jobs: {jobs['count']}")
for job in jobs['jobs']:
print(f"Job: {job['job_name']} - Status: {job['status']}")
Getting Job Status
# Get specific job details (use unique_job_name from submit_job response)
job_details = client.get_job("unique-job-name-123")
print(f"Status: {job_details['status']}")
print(f"Started: {job_details['start_time']}")
if job_details['status'] == 'Completed':
print(f"Completed: {job_details['end_time']}")
Downloading Job Outputs
# Download all output files from a completed job
downloaded_files = client.download_output("unique-job-name-123", "./results")
for file_path in downloaded_files:
print(f"Downloaded: {file_path}")
Complete Job Workflow
# Submit training job
result = client.submit_job(
directory="./my_training",
job_name="model-training",
workflow="training",
instance_type="ml.g4dn.xlarge"
)
unique_job_name = result['unique_job_name']
print(f"Job submitted: {unique_job_name}")
# Monitor job status
import time
while True:
job = client.get_job(unique_job_name)
status = job['status']
print(f"Job status: {status}")
if status in ['Completed', 'Failed', 'Stopped']:
break
time.sleep(30) # Check every 30 seconds
# Download results if completed
if status == 'Completed':
files = client.download_output(unique_job_name, "./results")
print(f"Downloaded {len(files)} files")
Error Handling
import requests
try:
result = client.submit_job(
directory="./code",
job_name="my-job",
instance_type="ml.g4dn.xlarge"
)
except requests.HTTPError as e:
if e.response.status_code == 403:
print("Access denied - check your API key")
else:
print(f"API error: {e}")
except ValueError as e:
print(f"Configuration error: {e}")
except FileNotFoundError as e:
print(f"File not found: {e}")
Job Dashboard
Access your jobs at: https://www.kaion5.com/home/kaio-platform.html
After signing in to the Kaio Platform, you can:
- Monitor job status and progress
- View job logs and details
- Download job outputs and results
- Manage your job history
License
MIT License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kaio-0.1.7.tar.gz.
File metadata
- Download URL: kaio-0.1.7.tar.gz
- Upload date:
- Size: 10.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4590185c6c2e807ca0c9dfd1e6626650ad1be82a3585eb57081c62ad6ac57555
|
|
| MD5 |
aed9f4840483ff36df1e3b3d46329ffc
|
|
| BLAKE2b-256 |
1dcbc3802c2012d4d62071a1e9b91911e0f7ee54ba7aa0f33ebb7ec1f23271c8
|
File details
Details for the file kaio-0.1.7-py3-none-any.whl.
File metadata
- Download URL: kaio-0.1.7-py3-none-any.whl
- Upload date:
- Size: 7.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2adbfcdba13d1358e80cfb41b48ad76919af704ad7d52481b244874da0eb6840
|
|
| MD5 |
3aa1f000a01e8161b710d556288faa47
|
|
| BLAKE2b-256 |
160981af2e4eb8a716d47bcafbbf4c42a4c5335753488c9d60340f45273d9785
|