Skip to main content

Slurm job workflow management

Project description

srunx

PyPI Python 3.12+ License Actions Status

A modern Python library for SLURM workload manager integration with workflow orchestration capabilities.

Features

  • 🧩 Workflow Orchestration: YAML-based workflow definitions with Prefect integration
  • Fine-Grained Parallel Execution: Jobs execute immediately when their specific dependencies complete, not entire workflow phases
  • 🔗 Branched Dependency Control: Independent branches in dependency graphs run simultaneously without false dependencies
  • 📝 Template System: Customizable Jinja2 templates for SLURM scripts
  • 🛡️ Type Safe: Full type hints and mypy compatibility
  • 🖥️ CLI Tools: Command-line interfaces for both job management and workflows
  • 🚀 Simple Job Submission: Easy-to-use API for submitting SLURM jobs
  • ⚙️ Flexible Configuration: Support for various environments (conda, venv, sqsh)
  • 📋 Job Management: Submit, monitor, cancel, and list jobs

Installation

Using uv (Recommended)

uv add srunx

Using pip

pip install srunx

Development Installation

git clone https://github.com/ksterx/srunx.git
cd srunx
uv sync --dev

Quick Start

You can try the workflow example:

cd examples
srunx flow run sample_workflow.yaml
graph TD
    A["Job A"]
    B1["Job B1"]
    B2["Job B2"]
    C["Job C"]
    D["Job D"]

    A --> B1
    A --> C
    B1 --> B2
    B2 --> D
    C --> D

Jobs run precisely when they're ready, minimizing wasted compute hours. The workflow engine provides fine-grained dependency control: when Job A completes, B1 and C start immediately in parallel. As soon as B1 finishes, B2 starts regardless of C's status. Job D waits only for both B2 and C to complete, enabling maximum parallelization.

Workflow Orchestration

Create a workflow YAML file:

# workflow.yaml
name: ml_pipeline
jobs:
  - name: preprocess
    command: ["python", "preprocess.py"]
    nodes: 1
    memory_per_node: "16GB"

  - name: train
    command: ["python", "train.py"]
    depends_on: [preprocess]
    nodes: 1
    gpus_per_node: 2
    memory_per_node: "32GB"
    time_limit: "8:00:00"
    conda: ml_env

  - name: evaluate
    command: ["python", "evaluate.py"]
    depends_on: [train]
    nodes: 1

  - name: notify
    command: ["python", "notify.py"]
    depends_on: [train, evaluate]

Execute the workflow:

# Run workflow
srunx flow run workflow.yaml

# Validate workflow without execution
srunx flow validate workflow.yaml

# Show execution plan
srunx flow run workflow.yaml --dry-run

Advanced Usage

Custom Templates

Create a custom SLURM template:

#!/bin/bash
#SBATCH --job-name={{ job_name }}
#SBATCH --nodes={{ nodes }}
{% if gpus_per_node > 0 -%}
#SBATCH --gpus-per-node={{ gpus_per_node }}
{% endif -%}
#SBATCH --time={{ time_limit }}
#SBATCH --output={{ log_dir }}/%x_%j.out

{{ environment_setup }}

srun {{ command }}

Use it with your job:

job = client.run(job, template_path="custom_template.slurm.jinja")

Environment Configuration

Conda Environment

environment = JobEnvironment(
    conda="my_env",
    env_vars={"CUDA_VISIBLE_DEVICES": "0,1"}
)

Programmatic Workflow Execution

from srunx.workflows import WorkflowRunner

runner = WorkflowRunner.from_yaml("workflow.yaml")
results = runner.run()

print("Job IDs:")
for task_name, job_id in results.items():
    print(f"  {task_name}: {job_id}")

Job Submission

# Submit job without waiting
job = client.submit(job)

# Later, wait for completion
completed_job = client.monitor(job, poll_interval=30)
print(f"Job completed with status: {completed_job.status}")

# Subit and wait for completion
completed_job = client.run(job)
print(f"Job completed with status: {completed_job.status}")

Slack Integration

image

from srunx.callbacks import SlackCallback

slack_callback = SlackCallback(webhook_url="your_webhook_url")
runner = WorkflowRunner.from_yaml("workflow.yaml", callbacks=[slack_callback])

or you can use the CLI:

srunx flow run workflow.yaml --slack

API Reference

Core Classes

Job

Main job configuration class with resources and environment settings.

JobResource

Resource allocation specification (nodes, GPUs, memory, time).

JobEnvironment

Environment setup (conda, venv, sqsh, environment variables).

Slurm

Main interface for SLURM operations (submit, status, cancel, list).

WorkflowRunner

Workflow execution engine with YAML support.

CLI Commands

Main CLI (srunx)

  • submit - Submit SLURM jobs
  • status - Check job status
  • queue - List jobs
  • cancel - Cancel jobs

Workflow CLI (srunx flow)

  • Execute YAML-defined workflows
  • Validate workflow files
  • Show execution plans

Configuration

Environment Variables

  • SLURM_LOG_DIR: Default directory for SLURM logs (default: logs)

Template Locations

srunx includes built-in templates:

  • base.slurm.jinja: Basic job template
  • advanced.slurm.jinja: Full-featured template with all options

Development

Setup Development Environment

git clone https://github.com/ksterx/srunx.git
cd srunx
uv sync --dev

Run Tests

uv run pytest

Type Checking

uv run mypy .

Code Formatting

uv run ruff check .
uv run ruff format .

Examples

Machine Learning Pipeline

# Complete ML pipeline example
from srunx import Job, JobResource, JobEnvironment, Slurm

def create_ml_job(script: str, **kwargs) -> Job:
    return Job(
        name=f"ml_{script.replace('.py', '')}",
        command=["python", script] + [f"--{k}={v}" for k, v in kwargs.items()],
        resources=JobResource(
            nodes=1,
            gpus_per_node=1,
            memory_per_node="32GB",
            time_limit="4:00:00"
        ),
        environment=JobEnvironment(conda="pytorch")
    )

client = Slurm()

# Submit preprocessing job
prep_job = create_ml_job("preprocess.py", data_path="/data", output_path="/processed")
prep_job = client.run(prep_job)

# Wait for preprocessing to complete
client.monitor(prep_job)

# Submit training job
train_job = create_ml_job("train.py", data_path="/processed", model_path="/models")
train_job = client.run(train_job)

print(f"Training job {train_job.job_id} submitted")

Distributed Computing

# Multi-node distributed job
distributed_job = Job(
    name="distributed_training",
    command=[
        "mpirun", "-np", "16",
        "python", "distributed_train.py"
    ],
    resources=JobResource(
        nodes=4,
        ntasks_per_node=4,
        cpus_per_task=8,
        gpus_per_node=2,
        memory_per_node="128GB",
        time_limit="12:00:00"
    ),
    environment=JobEnvironment(
        conda="distributed_ml"
    )
)

job = client.run(distributed_job)

Development Workflow

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Run type checking and tests
  6. Submit a pull request

License

This project is licensed under the Apache-2.0 License.

Support

Acknowledgments

  • Built with Pydantic for data validation
  • Template rendering with Jinja2
  • Package management with uv

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

srunx-0.2.3.tar.gz (323.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

srunx-0.2.3-py3-none-any.whl (28.1 kB view details)

Uploaded Python 3

File details

Details for the file srunx-0.2.3.tar.gz.

File metadata

  • Download URL: srunx-0.2.3.tar.gz
  • Upload date:
  • Size: 323.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for srunx-0.2.3.tar.gz
Algorithm Hash digest
SHA256 45076bf00cb59657051c4463ada4b3ecbac0aa7a38ce55518f51d427927b8e8a
MD5 c6874f423cb473c8246d9015af97e6f6
BLAKE2b-256 9f8811943ea3697200ac866863ea02314bdc6d8c90003a956d801075b860fdbd

See more details on using hashes here.

File details

Details for the file srunx-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: srunx-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 28.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for srunx-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 7b04672bb1373bbfd44123e28517b69b3825997e106ef0836d5b3664ebb423b8
MD5 5f13e240581c2de527e01666eb358728
BLAKE2b-256 9fea31e0e3621d0fbe1d2ffa29b4f3b843652eb6e7857f0558a7dea189cdd752

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page