Generic Python library for running jobs on HPC clusters
Project description
py-cluster-api
A Python library for submitting and monitoring jobs on HPC clusters. Supports running arbitrary executables (Nextflow pipelines, Python scripts, Java tools, etc.) on clusters and taking action when jobs complete via async callbacks.
Executors
- Local Subprocess
- IBM Platform LSF
- We will accept PRs that implement and test additional executors (SLURM, etc.)
Features
- Async-first — built on
asynciofor non-blocking job submission and monitoring - Local executor — run jobs as local subprocesses for development and testing, including array jobs
- Job monitoring — polls the scheduler and fires callbacks on job completion, failure, or cancellation
- Job arrays — submit array jobs with per-element log files
- Zombie detection — jobs that disappear from the scheduler are marked as failed
- YAML config with profiles — Nextflow-style config with per-environment profiles
- Callback chaining — register
on_success,on_failure, oron_exithandlers on any job
Installation
Requires Python 3.10+.
pip install py-cluster-api
Or with Pixi:
pixi add --pypi py-cluster-api
Quick Start
Single Job
import asyncio
from cluster_api import create_executor, ResourceSpec, JobMonitor
async def main():
executor = create_executor(profile="janelia_lsf")
monitor = JobMonitor(executor)
await monitor.start()
job = await executor.submit(
command="nextflow run nf-core/rnaseq --input samples.csv",
name="rnaseq-run",
resources=ResourceSpec(cpus=4, gpus=1, memory="32 GB", walltime="24:00", queue="long"),
env={"NXF_WORK": "/scratch/work"},
)
job.on_success(lambda j: print(f"Done! Job {j.job_id}, peak mem: {j.max_mem}"))
job.on_failure(lambda j: print(f"FAILED! Job {j.job_id}, exit={j.exit_code}"))
await monitor.wait_for(job)
await monitor.stop()
asyncio.run(main())
Job Array
async def run_array():
executor = create_executor(profile="janelia_lsf")
monitor = JobMonitor(executor)
await monitor.start()
job = await executor.submit_array(
command="python process.py --index $LSB_JOBINDEX",
name="batch-process",
array_range=(1, 50),
resources=ResourceSpec(cpus=1, memory="4 GB", walltime="01:00"),
)
job.on_exit(lambda j: print(f"Array finished: {j.job_id}"))
await monitor.wait_for(job)
await monitor.stop()
The array index environment variable depends on the executor: LSF uses $LSB_JOBINDEX, while the local executor uses $ARRAY_INDEX.
Reconnecting After Restart
If your process crashes or restarts, reconnect() rediscovers running jobs from the scheduler and resumes tracking them. Requires job_name_prefix to be set in config.
async def resume():
executor = create_executor(profile="janelia_lsf")
monitor = JobMonitor(executor)
await monitor.start()
recovered = await executor.reconnect()
for job in recovered:
print(f"Reconnected to {job.job_id} ({job.name}), status={job.status}")
job.on_exit(lambda j: print(f"Job {j.job_id} finished: {j.status}"))
if recovered:
await monitor.wait_for(*recovered)
await monitor.stop()
Local Testing
async def local_test():
executor = create_executor(executor="local")
monitor = JobMonitor(executor, poll_interval=1.0)
await monitor.start()
job = await executor.submit(command="echo hello world", name="test")
job.on_success(lambda j: print("It worked!"))
await monitor.wait_for(job, timeout=10.0)
await monitor.stop()
Configuration
Configuration is loaded from YAML with optional profiles. The search order is:
- Explicit
config_pathargument $CLUSTER_API_CONFIGenvironment variable./cluster_api.yaml~/.config/cluster_api/config.yaml
Example cluster_api.yaml
executor: local
poll_interval: 10
job_name_prefix: "capi"
profiles:
janelia_lsf:
executor: lsf
queue: normal
gpus: 1
memory: "8 GB"
walltime: "04:00"
script_prologue:
- "module load java/11"
local_dev:
executor: local
poll_interval: 2
Config Options
| Option | Default | Description |
|---|---|---|
executor |
"local" |
Backend: lsf or local |
cpus |
None |
Default CPU count |
gpus |
None |
Default GPU count |
memory |
None |
Default memory (e.g. "8 GB") |
walltime |
None |
Default wall time (e.g. "04:00") |
queue |
None |
Default queue/partition |
poll_interval |
10.0 |
Seconds between status polls |
job_name_prefix |
None |
Optional prefix prepended to job names. When set, polling filters by {prefix}-* and reconnect() is available; when unset, the user controls the full job name and polling queries all jobs |
shebang |
"#!/bin/bash" |
Script shebang line |
script_prologue |
[] |
Lines inserted before the command |
script_epilogue |
[] |
Lines inserted after the command |
extra_directives |
[] |
Additional scheduler directive lines appended verbatim to the script header (e.g. "#BSUB -P myproject") |
directives_skip |
[] |
Substrings to filter out of directives |
extra_args |
[] |
Extra CLI args appended to the submit command (e.g. bsub) |
lsf_units |
"MB" |
LSF memory units (KB, MB, GB) |
suppress_job_email |
true |
Set LSB_JOB_REPORT_MAIL=N |
command_timeout |
100.0 |
Timeout in seconds for scheduler commands |
zombie_timeout_minutes |
30.0 |
Mark jobs as failed if unseen for this long |
completed_retention_minutes |
10.0 |
Keep finished jobs in memory for this long |
API Reference
See docs/API.md for the full API reference and error handling guide.
Development
See docs/Development.md for build instructions, testing, and release process.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file py_cluster_api-0.3.0.tar.gz.
File metadata
- Download URL: py_cluster_api-0.3.0.tar.gz
- Upload date:
- Size: 50.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
27066ce55523cbe4b2675cbce2a2f4e3fd1c556beff72ad86016d10ac8472081
|
|
| MD5 |
2726ffa17cbe1f5cdbe0b3cfe2938c75
|
|
| BLAKE2b-256 |
bf6107d62f532459e97d4a54874e89bd3bd7cb27c4d2b286c34d5b86d3cf99c6
|
File details
Details for the file py_cluster_api-0.3.0-py3-none-any.whl.
File metadata
- Download URL: py_cluster_api-0.3.0-py3-none-any.whl
- Upload date:
- Size: 25.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c2501e95310a1115e155229bc93495ab01750e65879bf0e339489aec5f0bcf31
|
|
| MD5 |
f370eef0a86911b52d4b23f2221d4023
|
|
| BLAKE2b-256 |
69884d20dbaf2af1bb2bfdc774b1b32b80d52cc459acad44abb900d2139ba4ef
|