Skip to main content

Python client for the Slurm REST API

Project description

Slurpy (SLUrm Rest api PYthon client)

Slurpy is a Python client for the Slurm REST API. Slurm is an open-source job scheduler for high performance compute environments. Its REST API is a set of HTTP endpoints for submitting, monitoring, and managing compute jobs. This Python client is a convenience library for interacting with that API.

Features

  • Multiple API versions - Support for Slurm REST API v0.0.40, v0.0.41, and v0.0.42
  • 🔐 JWT Authentication - Built-in support for Slurm JWT token authentication
  • 🚀 Sync & Async - Both synchronous and asynchronous client support
  • 🐍 Type Hints - Complete type annotations for better IDE support
  • 🧪 Well Tested - Comprehensive integration tests with real Slurm clusters
  • 📦 Easy Installation - Simple pip installation with minimal dependencies

Installation

pip install ebi-slurpy

Quick Start

Slurpy supports both synchronous and asynchronous clients. Choose the one that fits your application:

Synchronous Client

import slurpy.v0040 as slurpy

# Configure the client
configuration = slurpy.Configuration(
    host="http://your-slurm-rest-api:6820"
)

# Set up authentication
configuration.api_key['user'] = "your-username"
configuration.api_key['token'] = "your-jwt-token"

# Create API client
with slurpy.ApiClient(configuration) as client:
    api = slurpy.SlurmApi(client)
    
    # Test connectivity
    response = api.get_ping()
    print(f"Cluster status: {response.to_dict()}")
    
    # List all jobs
    jobs_response = api.get_jobs()
    jobs = jobs_response.to_dict()
    print(f"Found {len(jobs['jobs'])} jobs")

Asynchronous Client

import asyncio
import slurpy.v0040.asyncio as slurpy

async def main():
    # Configure the client
    configuration = slurpy.Configuration(
        host="http://your-slurm-rest-api:6820"
    )
    
    # Set up authentication
    configuration.api_key['user'] = "your-username"
    configuration.api_key['token'] = "your-jwt-token"
    
    # Create API client
    async with slurpy.ApiClient(configuration) as client:
        api = slurpy.SlurmApi(client)
        
        # Test connectivity
        response = await api.get_ping()
        print(f"Cluster status: {response.to_dict()}")
        
        # List all jobs
        jobs_response = await api.get_jobs()
        jobs = jobs_response.to_dict()
        print(f"Found {len(jobs['jobs'])} jobs")

if __name__ == "__main__":
    asyncio.run(main())

Environment Variables

You can use environment variables for configuration with both sync and async clients:

Sync version:

import os
import slurpy.v0040 as slurpy

configuration = slurpy.Configuration(
    host=os.getenv("SLURM_REST_URL", "http://localhost:6820")
)

configuration.api_key['user'] = os.getenv("SLURM_USER_NAME")
configuration.api_key['token'] = os.getenv("SLURM_USER_TOKEN")

Async version:

import os
import slurpy.v0040.asyncio as slurpy

configuration = slurpy.Configuration(
    host=os.getenv("SLURM_REST_URL", "http://localhost:6820")
)

configuration.api_key['user'] = os.getenv("SLURM_USER_NAME")
configuration.api_key['token'] = os.getenv("SLURM_USER_TOKEN")

API Versions

Slurpy supports multiple Slurm REST API versions, each available in both sync and async variants:

v0.0.40

# Synchronous
import slurpy.v0040 as slurpy

# Asynchronous  
import slurpy.v0040.asyncio as slurpy

v0.0.41

# Synchronous
import slurpy.v0041 as slurpy

# Asynchronous
import slurpy.v0041.asyncio as slurpy

v0.0.42

# Synchronous
import slurpy.v0042 as slurpy

# Asynchronous
import slurpy.v0042.asyncio as slurpy

Note: The API interface is consistent across versions and between sync/async variants, but some features and response formats may differ between Slurm versions. Check the Slurm documentation for version-specific differences.

Common Operations

List Jobs

Sync version:

def list_jobs():
    with slurpy.ApiClient(configuration) as client:
        api = slurpy.SlurmApi(client)
        
        response = api.get_jobs()
        jobs = response.to_dict()
        
        for job in jobs['jobs']:
            print(f"Job {job['job_id']}: {job['name']} ({job['job_state']})")

Async version:

async def list_jobs():
    async with slurpy.ApiClient(configuration) as client:
        api = slurpy.SlurmApi(client)
        
        response = await api.get_jobs()
        jobs = response.to_dict()
        
        for job in jobs['jobs']:
            print(f"Job {job['job_id']}: {job['name']} ({job['job_state']})")

Submit a Job

Sync version:

def submit_job():
    job_spec = {
        "script": "#!/bin/bash\\nsleep 60\\necho 'Job completed'",
        "job": {
            "name": "my_job",
            "current_working_directory": "/home/user",
            "environment": ["PATH=/bin:/usr/bin"],
            "ntasks": 1,
            "time_limit": {"set": True, "number": 300},  # 5 minutes
        }
    }
    
    with slurpy.ApiClient(configuration) as client:
        api = slurpy.SlurmApi(client)
        
        job_request = slurpy.JobSubmitReq.from_dict(job_spec)
        response = api.post_job_submit(job_submit_req=job_request)
        
        result = response.to_dict()
        print(f"Job submitted with ID: {result['job_id']}")

Async version:

async def submit_job():
    job_spec = {
        "script": "#!/bin/bash\\nsleep 60\\necho 'Job completed'",
        "job": {
            "name": "my_job",
            "current_working_directory": "/home/user",
            "environment": ["PATH=/bin:/usr/bin"],
            "ntasks": 1,
            "time_limit": {"set": True, "number": 300},  # 5 minutes
        }
    }
    
    async with slurpy.ApiClient(configuration) as client:
        api = slurpy.SlurmApi(client)
        
        job_request = slurpy.JobSubmitReq.from_dict(job_spec)
        response = await api.post_job_submit(job_submit_req=job_request)
        
        result = response.to_dict()
        print(f"Job submitted with ID: {result['job_id']}")

Get Job Details

Sync version:

def get_job(job_id: str):
    with slurpy.ApiClient(configuration) as client:
        api = slurpy.SlurmApi(client)
        
        response = api.get_job(job_id=job_id)
        job = response.to_dict()
        
        print(f"Job {job_id} status: {job['jobs'][0]['job_state']}")

Async version:

async def get_job(job_id: str):
    async with slurpy.ApiClient(configuration) as client:
        api = slurpy.SlurmApi(client)
        
        response = await api.get_job(job_id=job_id)
        job = response.to_dict()
        
        print(f"Job {job_id} status: {job['jobs'][0]['job_state']}")

Cancel a Job

Sync version:

def cancel_job(job_id: str):
    with slurpy.ApiClient(configuration) as client:
        api = slurpy.SlurmApi(client)
        
        response = api.delete_job(job_id)
        print(f"Job {job_id} cancellation requested")

Async version:

async def cancel_job(job_id: str):
    async with slurpy.ApiClient(configuration) as client:
        api = slurpy.SlurmApi(client)
        
        response = await api.delete_job(job_id)
        print(f"Job {job_id} cancellation requested")

Error Handling

Error handling works the same for both sync and async clients:

Sync version:

from slurpy.v0040.rest import ApiException

def robust_job_operation():
    try:
        with slurpy.ApiClient(configuration) as client:
            api = slurpy.SlurmApi(client)
            response = api.get_jobs()
            return response.to_dict()
            
    except ApiException as e:
        print(f"API Error: {e.status} - {e.reason}")
        print(f"Response body: {e.body}")
        
    except Exception as e:
        print(f"Unexpected error: {e}")

Async version:

from slurpy.v0040.asyncio.rest import ApiException

async def robust_job_operation():
    try:
        async with slurpy.ApiClient(configuration) as client:
            api = slurpy.SlurmApi(client)
            response = await api.get_jobs()
            return response.to_dict()
            
    except ApiException as e:
        print(f"API Error: {e.status} - {e.reason}")
        print(f"Response body: {e.body}")
        
    except Exception as e:
        print(f"Unexpected error: {e}")

Configuration Options

SSL/TLS Configuration

configuration = slurpy.Configuration(
    host="https://secure-slurm-api:6820",
    ssl_ca_cert="/path/to/ca-cert.pem",  # CA certificate
    cert_file="/path/to/client-cert.pem",  # Client certificate
    key_file="/path/to/client-key.pem",   # Client private key
    verify_ssl=True
)

Complete Example

Here's a complete workflow example available in both sync and async versions:

Synchronous Version

#!/usr/bin/env python3
import os
from typing import Optional

import slurpy.v0040 as slurpy
from slurpy.v0040.rest import ApiException


def slurm_workflow():
    """Complete workflow: ping, submit job, monitor, cleanup."""
    
    # Configuration
    configuration = slurpy.Configuration(
        host=os.getenv("SLURM_REST_URL", "http://localhost:6820")
    )
    configuration.api_key['user'] = os.getenv("SLURM_USER_NAME", "slurm")
    configuration.api_key['token'] = os.getenv("SLURM_USER_TOKEN")
    
    if not configuration.api_key['token']:
        print("Error: SLURM_USER_TOKEN environment variable is required")
        return
    
    job_id: Optional[str] = None
    
    try:
        with slurpy.ApiClient(configuration) as client:
            api = slurpy.SlurmApi(client)
            
            # 1. Test connectivity
            print("🔍 Testing cluster connectivity...")
            ping_response = api.get_ping()
            print(f"✅ Cluster is responsive: {ping_response.to_dict()}")
            
            # 2. Submit a job
            print("\\n📤 Submitting job...")
            job_spec = {
                "script": "#!/bin/bash\\nsleep 30\\necho 'Hello from Slurm!'",
                "job": {
                    "name": "slurpy_demo",
                    "current_working_directory": "/tmp",
                    "environment": ["PATH=/bin:/usr/bin"],
                    "ntasks": 1,
                    "time_limit": {"set": True, "number": 120},
                }
            }
            
            job_request = slurpy.JobSubmitReq.from_dict(job_spec)
            submit_response = api.post_job_submit(job_submit_req=job_request)
            result = submit_response.to_dict()
            job_id = str(result['job_id'])
            print(f"✅ Job submitted successfully with ID: {job_id}")
            
            # 3. Monitor job
            print(f"\\n👀 Monitoring job {job_id}...")
            job_response = api.get_job(job_id=job_id)
            job_data = job_response.to_dict()
            job_info = job_data['jobs'][0]
            print(f"📊 Job status: {job_info['job_state']}")
            print(f"📋 Job details: {job_info['name']} on {job_info.get('nodes', 'pending')}")
            
            # 4. List all current jobs
            print("\\n📋 Current cluster jobs:")
            jobs_response = api.get_jobs()
            jobs = jobs_response.to_dict()
            print(f"Found {len(jobs['jobs'])} total jobs in the cluster")
            
    except ApiException as e:
        print(f"❌ Slurm API error: {e.status} - {e.reason}")
        if e.body:
            print(f"Response: {e.body}")
            
    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        
    finally:
        # Cleanup: Cancel the job if it was created
        if job_id:
            try:
                with slurpy.ApiClient(configuration) as client:
                    api = slurpy.SlurmApi(client)
                    api.delete_job(job_id)
                    print(f"\\n🧹 Cleanup: Job {job_id} cancellation requested")
            except Exception as cleanup_error:
                print(f"⚠️ Cleanup warning: Could not cancel job {job_id}: {cleanup_error}")


if __name__ == "__main__":
    slurm_workflow()

Asynchronous Version

#!/usr/bin/env python3
import asyncio
import os
from typing import Optional

import slurpy.v0040.asyncio as slurpy
from slurpy.v0040.asyncio.rest import ApiException


async def slurm_workflow():
    """Complete workflow: ping, submit job, monitor, cleanup."""
    
    # Configuration
    configuration = slurpy.Configuration(
        host=os.getenv("SLURM_REST_URL", "http://localhost:6820")
    )
    configuration.api_key['user'] = os.getenv("SLURM_USER_NAME", "slurm")
    configuration.api_key['token'] = os.getenv("SLURM_USER_TOKEN")
    
    if not configuration.api_key['token']:
        print("Error: SLURM_USER_TOKEN environment variable is required")
        return
    
    job_id: Optional[str] = None
    
    try:
        async with slurpy.ApiClient(configuration) as client:
            api = slurpy.SlurmApi(client)
            
            # 1. Test connectivity
            print("🔍 Testing cluster connectivity...")
            ping_response = await api.get_ping()
            print(f"✅ Cluster is responsive: {ping_response.to_dict()}")
            
            # 2. Submit a job
            print("\\n📤 Submitting job...")
            job_spec = {
                "script": "#!/bin/bash\\nsleep 30\\necho 'Hello from Slurm!'",
                "job": {
                    "name": "slurpy_demo",
                    "current_working_directory": "/tmp",
                    "environment": ["PATH=/bin:/usr/bin"],
                    "ntasks": 1,
                    "time_limit": {"set": True, "number": 120},
                }
            }
            
            job_request = slurpy.JobSubmitReq.from_dict(job_spec)
            submit_response = await api.post_job_submit(job_submit_req=job_request)
            result = submit_response.to_dict()
            job_id = str(result['job_id'])
            print(f"✅ Job submitted successfully with ID: {job_id}")
            
            # 3. Monitor job
            print(f"\\n👀 Monitoring job {job_id}...")
            job_response = await api.get_job(job_id=job_id)
            job_data = job_response.to_dict()
            job_info = job_data['jobs'][0]
            print(f"📊 Job status: {job_info['job_state']}")
            print(f"📋 Job details: {job_info['name']} on {job_info.get('nodes', 'pending')}")
            
            # 4. List all current jobs
            print("\\n📋 Current cluster jobs:")
            jobs_response = await api.get_jobs()
            jobs = jobs_response.to_dict()
            print(f"Found {len(jobs['jobs'])} total jobs in the cluster")
            
    except ApiException as e:
        print(f"❌ Slurm API error: {e.status} - {e.reason}")
        if e.body:
            print(f"Response: {e.body}")
            
    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        
    finally:
        # Cleanup: Cancel the job if it was created
        if job_id:
            try:
                async with slurpy.ApiClient(configuration) as client:
                    api = slurpy.SlurmApi(client)
                    await api.delete_job(job_id)
                    print(f"\\n🧹 Cleanup: Job {job_id} cancellation requested")
            except Exception as cleanup_error:
                print(f"⚠️ Cleanup warning: Could not cancel job {job_id}: {cleanup_error}")


if __name__ == "__main__":
    asyncio.run(slurm_workflow())

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.

Support

  • 📖 Documentation: Check the docstrings and type hints
  • 🐛 Issues: Report bugs on GitHub Issues
  • 💬 Discussions: Ask questions in GitHub Discussions
  • 📧 Contact: Reach out to the maintainers

Related Projects

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ebi_slurpy-0.1.1.tar.gz (799.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ebi_slurpy-0.1.1-py3-none-any.whl (2.7 MB view details)

Uploaded Python 3

File details

Details for the file ebi_slurpy-0.1.1.tar.gz.

File metadata

  • Download URL: ebi_slurpy-0.1.1.tar.gz
  • Upload date:
  • Size: 799.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.4 CPython/3.13.0 Darwin/23.5.0

File hashes

Hashes for ebi_slurpy-0.1.1.tar.gz
Algorithm Hash digest
SHA256 58bd29918c91d1eca809986b42f7241089c2491aaf526abbf9fc6086d4a44a42
MD5 0b4d65483689ad1c26ad07389cf95a22
BLAKE2b-256 aba00f651e0fc9d9e07d3309212f89a71aa6ca74ab8b39e8e30c1a0366b73fff

See more details on using hashes here.

File details

Details for the file ebi_slurpy-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: ebi_slurpy-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 2.7 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.4 CPython/3.13.0 Darwin/23.5.0

File hashes

Hashes for ebi_slurpy-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 65e1904abcd8ab8e02bbae49027ff53478ef8ab94f97022cee4ca790ec7b9886
MD5 253c1896ccf9c324bc52f131e49b23f6
BLAKE2b-256 e03bdf338fe2a3c3a9a7fe9fd8739cb45175074299d90db24ff8411100e9be86

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page