Generic Python library for running jobs on HPC clusters
Project description
py-cluster-api
A Python library for submitting and monitoring jobs on HPC clusters. Supports running arbitrary executables (Nextflow pipelines, Python scripts, Java tools, etc.) on LSF clusters and taking action when jobs complete via async callbacks.
Features
- Async-first — built on
asynciofor non-blocking job submission and monitoring - LSF executor — submit via
bsub, monitor viabjobs -json, cancel viabkill - Local executor — run jobs as local subprocesses for development and testing, including array jobs
- Job monitoring — polls the scheduler and fires callbacks on job completion, failure, or cancellation
- Job arrays — submit array jobs with per-element log files
- Zombie detection — jobs that disappear from the scheduler are marked as failed
- YAML config with profiles — Nextflow-style config with per-environment profiles
- Callback chaining — register
on_success,on_failure, oron_exithandlers on any job
Installation
Requires Python 3.10+.
pip install py-cluster-api
Or with Pixi:
pixi add --pypi py-cluster-api
Quick Start
Single Job
import asyncio
from cluster_api import create_executor, ResourceSpec, JobMonitor
async def main():
executor = create_executor(profile="janelia_lsf")
monitor = JobMonitor(executor)
await monitor.start()
job = await executor.submit(
command="nextflow run nf-core/rnaseq --input samples.csv",
name="rnaseq-run",
resources=ResourceSpec(cpus=4, memory="32 GB", walltime="24:00", queue="long"),
env={"NXF_WORK": "/scratch/work"},
)
job.on_success(lambda j: print(f"Done! Job {j.job_id}, peak mem: {j.max_mem}"))
job.on_failure(lambda j: print(f"FAILED! Job {j.job_id}, exit={j.exit_code}"))
await monitor.wait_for(job)
await monitor.stop()
asyncio.run(main())
Job Array
async def run_array():
executor = create_executor(profile="janelia_lsf")
monitor = JobMonitor(executor)
await monitor.start()
job = await executor.submit_array(
command="python process.py --index $LSB_JOBINDEX",
name="batch-process",
array_range=(1, 50),
resources=ResourceSpec(cpus=1, memory="4 GB", walltime="01:00"),
)
job.on_exit(lambda j: print(f"Array finished: {j.job_id}"))
await monitor.wait_for(job)
await monitor.stop()
The array index environment variable depends on the executor: LSF uses $LSB_JOBINDEX, while the local executor uses $ARRAY_INDEX.
Local Testing
async def local_test():
executor = create_executor(executor="local")
monitor = JobMonitor(executor, poll_interval=1.0)
await monitor.start()
job = await executor.submit(command="echo hello world", name="test")
job.on_success(lambda j: print("It worked!"))
await monitor.wait_for(job, timeout=10.0)
await monitor.stop()
Configuration
Configuration is loaded from YAML with optional profiles. The search order is:
- Explicit
config_pathargument $CLUSTER_API_CONFIGenvironment variable./cluster_api.yaml~/.config/cluster_api/config.yaml
Example cluster_api.yaml
executor: local
poll_interval: 10
job_name_prefix: "capi"
profiles:
janelia_lsf:
executor: lsf
queue: normal
memory: "8 GB"
walltime: "04:00"
script_prologue:
- "module load java/11"
local_dev:
executor: local
poll_interval: 2
Config Options
| Option | Default | Description |
|---|---|---|
executor |
"local" |
Backend: lsf or local |
cpus |
None |
Default CPU count |
memory |
None |
Default memory (e.g. "8 GB") |
walltime |
None |
Default wall time (e.g. "04:00") |
queue |
None |
Default queue/partition |
poll_interval |
10.0 |
Seconds between status polls |
job_name_prefix |
"capi" |
Prefix for all job names |
shebang |
"#!/bin/bash" |
Script shebang line |
script_prologue |
[] |
Lines inserted before the command |
script_epilogue |
[] |
Lines inserted after the command |
extra_directives |
[] |
Additional scheduler flags (directive prefix added automatically) |
directives_skip |
[] |
Substrings to filter out of directives |
extra_args |
[] |
Extra CLI args appended to the submit command (e.g. bsub) |
lsf_units |
"MB" |
LSF memory units (KB, MB, GB) |
suppress_job_email |
true |
Set LSB_JOB_REPORT_MAIL=N |
command_timeout |
100.0 |
Timeout in seconds for scheduler commands |
zombie_timeout_minutes |
30.0 |
Mark jobs as failed if unseen for this long |
completed_retention_minutes |
10.0 |
Keep finished jobs in memory for this long |
API Reference
create_executor(profile=None, config_path=None, **overrides)
Factory function that loads config and returns an Executor instance.
Executor
Abstract base class. Key methods:
submit(command, name, resources=None, prologue=None, epilogue=None, env=None, metadata=None)— submit a job, returnsJobRecordsubmit_array(command, name, array_range, ...)— submit a job arraycancel(job_id)— cancel a job by IDcancel_by_name(name_pattern)— cancel by name pattern (LSF only)cancel_all()— cancel all tracked jobspoll()— query scheduler and update job statusesjobs/active_jobs— properties returning tracked job dicts
JobRecord
Tracks a submitted job. Fields include job_id, name, status, exit_code, exec_host, max_mem, submit_time, start_time, finish_time, and metadata.
on_success(callback)— register callback for exit code 0on_failure(callback)— register callback for non-zero exiton_exit(callback, condition=ANY)— register callback for any exit conditionis_terminal— whether the job has finished
JobMonitor
Async polling loop that drives status updates and callback dispatch.
start()/stop()— control the polling loopwait_for(*records, timeout=None)— block until jobs reach a terminal state
ResourceSpec
Resource requirements: cpus, memory, walltime, queue, work_dir, stdout_path, stderr_path, extra_directives, extra_args.
Development
See docs/Development.md for build instructions, testing, and release process.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file py_cluster_api-0.2.4.tar.gz.
File metadata
- Download URL: py_cluster_api-0.2.4.tar.gz
- Upload date:
- Size: 47.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ba104da8b5481c4e068a42dc9d9c6fd88e95acba04b321668a744a2ee829f425
|
|
| MD5 |
2a972e6f939a8de8acaf0b4c6f044da9
|
|
| BLAKE2b-256 |
bbb021fce90c452329b90135b2dbc29bb50491492fd89e060e959f166d8ceb86
|
File details
Details for the file py_cluster_api-0.2.4-py3-none-any.whl.
File metadata
- Download URL: py_cluster_api-0.2.4-py3-none-any.whl
- Upload date:
- Size: 24.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6f0554394e2b36429641a40ddb8391636a87033c67f12ad5af515a5ede1ba0df
|
|
| MD5 |
27b006de696e65780a5fa766e64b067e
|
|
| BLAKE2b-256 |
58070bf2e004d397f3beca035a53d193e5ed08f37372cac92f5017f9afac0439
|