Skip to main content

Generic Python library for running jobs on HPC clusters

Project description

py-cluster-api

CI

A Python library for submitting and monitoring jobs on HPC clusters. Supports running arbitrary executables (Nextflow pipelines, Python scripts, Java tools, etc.) on LSF clusters and taking action when jobs complete via async callbacks.

Features

  • Async-first — built on asyncio for non-blocking job submission and monitoring
  • LSF executor — submit via bsub, monitor via bjobs -json, cancel via bkill
  • Local executor — run jobs as local subprocesses for development and testing, including array jobs
  • Job monitoring — polls the scheduler and fires callbacks on job completion, failure, or cancellation
  • Job arrays — submit array jobs with per-element log files
  • Zombie detection — jobs that disappear from the scheduler are marked as failed
  • YAML config with profiles — Nextflow-style config with per-environment profiles
  • Callback chaining — register on_success, on_failure, or on_exit handlers on any job

Installation

Requires Python 3.10+.

pip install py-cluster-api

Or with Pixi:

pixi add --pypi py-cluster-api

Quick Start

Single Job

import asyncio
from cluster_api import create_executor, ResourceSpec, JobMonitor

async def main():
    executor = create_executor(profile="janelia_lsf")
    monitor = JobMonitor(executor)
    await monitor.start()

    job = await executor.submit(
        command="nextflow run nf-core/rnaseq --input samples.csv",
        name="rnaseq-run",
        resources=ResourceSpec(cpus=4, memory="32 GB", walltime="24:00", queue="long"),
        env={"NXF_WORK": "/scratch/work"},
    )
    job.on_success(lambda j: print(f"Done! Job {j.job_id}, peak mem: {j.max_mem}"))
    job.on_failure(lambda j: print(f"FAILED! Job {j.job_id}, exit={j.exit_code}"))

    await monitor.wait_for(job)
    await monitor.stop()

asyncio.run(main())

Job Array

async def run_array():
    executor = create_executor(profile="janelia_lsf")
    monitor = JobMonitor(executor)
    await monitor.start()

    job = await executor.submit_array(
        command="python process.py --index $LSB_JOBINDEX",
        name="batch-process",
        array_range=(1, 50),
        resources=ResourceSpec(cpus=1, memory="4 GB", walltime="01:00"),
    )
    job.on_exit(lambda j: print(f"Array finished: {j.job_id}"))

    await monitor.wait_for(job)
    await monitor.stop()

The array index environment variable depends on the executor: LSF uses $LSB_JOBINDEX, while the local executor uses $ARRAY_INDEX.

Local Testing

async def local_test():
    executor = create_executor(executor="local")
    monitor = JobMonitor(executor, poll_interval=1.0)
    await monitor.start()

    job = await executor.submit(command="echo hello world", name="test")
    job.on_success(lambda j: print("It worked!"))

    await monitor.wait_for(job, timeout=10.0)
    await monitor.stop()

Configuration

Configuration is loaded from YAML with optional profiles. The search order is:

  1. Explicit config_path argument
  2. $CLUSTER_API_CONFIG environment variable
  3. ./cluster_api.yaml
  4. ~/.config/cluster_api/config.yaml

Example cluster_api.yaml

executor: local
poll_interval: 10
job_name_prefix: "capi"

profiles:
  janelia_lsf:
    executor: lsf
    queue: normal
    memory: "8 GB"
    walltime: "04:00"
    use_stdin: true
    script_prologue:
      - "module load java/11"

  local_dev:
    executor: local
    poll_interval: 2

Config Options

Option Default Description
executor "local" Backend: lsf or local
cpus None Default CPU count
memory None Default memory (e.g. "8 GB")
walltime None Default wall time (e.g. "04:00")
queue None Default queue/partition
poll_interval 10.0 Seconds between status polls
job_name_prefix "capi" Prefix for all job names
shebang "#!/bin/bash" Script shebang line
script_prologue [] Lines inserted before the command
script_epilogue [] Lines inserted after the command
extra_directives [] Additional scheduler directives
directives_skip [] Substrings to filter out of directives
extra_args [] Extra CLI args appended to the submit command (e.g. bsub)
use_stdin false Submit via stdin (bsub < script.sh)
lsf_units "MB" LSF memory units (KB, MB, GB)
suppress_job_email true Set LSB_JOB_REPORT_MAIL=N
command_timeout 100.0 Timeout in seconds for scheduler commands
zombie_timeout_minutes 30.0 Mark jobs as failed if unseen for this long
completed_retention_minutes 10.0 Keep finished jobs in memory for this long

API Reference

create_executor(profile=None, config_path=None, **overrides)

Factory function that loads config and returns an Executor instance.

Executor

Abstract base class. Key methods:

  • submit(command, name, resources=None, prologue=None, epilogue=None, env=None, metadata=None) — submit a job, returns JobRecord
  • submit_array(command, name, array_range, ...) — submit a job array
  • cancel(job_id) — cancel a job by ID
  • cancel_by_name(name_pattern) — cancel by name pattern (LSF only)
  • cancel_all() — cancel all tracked jobs
  • poll() — query scheduler and update job statuses
  • jobs / active_jobs — properties returning tracked job dicts

JobRecord

Tracks a submitted job. Fields include job_id, name, status, exit_code, exec_host, max_mem, submit_time, start_time, finish_time, and metadata.

  • on_success(callback) — register callback for exit code 0
  • on_failure(callback) — register callback for non-zero exit
  • on_exit(callback, condition=ANY) — register callback for any exit condition
  • is_terminal — whether the job has finished

JobMonitor

Async polling loop that drives status updates and callback dispatch.

  • start() / stop() — control the polling loop
  • wait_for(*records, timeout=None) — block until jobs reach a terminal state

ResourceSpec

Resource requirements: cpus, memory, walltime, queue, work_dir, stdout_path, stderr_path, extra_directives, extra_args.

Development

See docs/Development.md for build instructions, testing, and release process.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py_cluster_api-0.2.1.tar.gz (44.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py_cluster_api-0.2.1-py3-none-any.whl (23.2 kB view details)

Uploaded Python 3

File details

Details for the file py_cluster_api-0.2.1.tar.gz.

File metadata

  • Download URL: py_cluster_api-0.2.1.tar.gz
  • Upload date:
  • Size: 44.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for py_cluster_api-0.2.1.tar.gz
Algorithm Hash digest
SHA256 cd142ec62a6e65dce1569579b44cc275f4f933639aa07350830fe815b848e573
MD5 4292886883894e018e566b170250cfcb
BLAKE2b-256 f375268c2df3a239e4a7bdfb605b1f21d2e71878390bc8dadcc8c8a2cab20f8a

See more details on using hashes here.

File details

Details for the file py_cluster_api-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: py_cluster_api-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 23.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for py_cluster_api-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2cc4bcbf57b8b8641815a204070f68c8c3f2bd581a76b96f7bbaeae1d7640c90
MD5 c177aa928b4d96846b4b66431a4dc673
BLAKE2b-256 99e970890179c55950ad8c7135abf43d95f0ca0326815345401d84aa763f95d9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page