A job management system for python
Project description
xqute
A job management system for python
Features
- Written in async
- Plugin system
- Scheduler adaptor
- Job retrying/pipeline halting when failed
Installation
pip install xqute
A toy example
import asyncio
from xqute import Xqute
async def main():
# 3 jobs allowed to run at the same time
xqute = Xqute(scheduler_forks=3)
for _ in range(10):
await xqute.put('sleep 1')
await xqute.run_until_complete()
if __name__ == '__main__':
asyncio.run(main())
API
https://pwwang.github.io/xqute/
Usage
Xqute object
An xqute is initialized by:
xqute = Xqute(...)
Available arguments are:
- scheduler: The scheduler class or name
- plugins: The plugins to enable/disable for this session
- job_metadir: The job meta directory (Default:
./.xqute/
) - job_error_strategy: The strategy when there is error happened
- job_num_retries: Max number of retries when job_error_strategy is retry
- job_submission_batch: The number of consumers to submit jobs
- scheduler_forks: Max number of job forks
- **scheduler_opts: Additional keyword arguments for scheduler
Note that the producer must be initialized in an event loop.
To push a job into the queue:
await xqute.put(['echo', 1])
Using SGE scheduler
xqute = Xqute(
'sge',
scheduler_forks=100,
qsub='path to qsub',
qdel='path to qdel',
qstat='path to qstat',
sge_q='1-day', # or qsub_q='1-day'
...
)
Keyword-arguments with names starting with sge_
will be interpreted as qsub
options. list
or tuple
option values will be expanded. For example:
sge_l=['h_vmem=2G', 'gpu=1']
will be expanded in wrapped script like this:
# ...
#$ -l h_vmem=2G
#$ -l gpu=1
# ...
Using Slurm scheduler
xqute = Xqute(
'slurm',
scheduler_forks=100,
sbatch='path to sbatch',
scancel='path to scancel',
squeue='path to squeue',
sbatch_partition='1-day', # or slurm_partition='1-day'
sbatch_time='01:00:00',
...
)
Using ssh scheduler
xqute = Xqute(
'ssh',
scheduler_forks=100,
ssh='path to ssh',
ssh_servers={
"server1": {
"user": ...,
"port": 22,
"keyfile": ...,
# How long to keep the ssh connection alive
"ctrl_persist": 600,
# Where to store the control socket
"ctrl_dir": "/tmp",
},
...
}
...
)
SSH servers must share the same filesystem and using keyfile authentication.
Plugins
To write a plugin for xqute
, you will need to implement the following hooks:
def on_init(scheduler)
: Right after scheduler object is initializeddef on_shutdown(scheduler, sig)
: When scheduler is shutting downasync def on_job_init(scheduler, job)
: When the job is initializedasync def on_job_queued(scheduler, job)
: When the job is queuedasync def on_job_submitted(scheduler, job)
: When the job is submittedasync def on_job_started(scheduler, job)
: When the job is started (when status changed to running)async def on_job_polling(scheduler, job)
: When job status is being polledasync def on_job_killing(scheduler, job)
: When the job is being killedasync def on_job_killed(scheduler, job)
: When the job is killedasync def on_job_failed(scheduler, job)
: When the job is failedasync def on_job_succeeded(scheduler, job)
: When the job is succeededdef on_jobcmd_init(scheduler, job) -> str
: When the job command wrapper script is initialized before the prescript is run. This will replace the placeholder#![jobcmd_init]
in the wrapper script.def on_jobcmd_prep(scheduler, job) -> str
: When the job command is right about to run in the wrapper script. This will replace the placeholder#![jobcmd_prep]
in the wrapper script.def on_jobcmd_end(scheduler, job) -> str
: When the job command wrapper script is about to end and after the postscript is run. This will replace the placeholder#![jobcmd_end]
in the wrapper script.
Note that all hooks are corotines except on_init
and on_shutdown
, that means you should also implement them as corotines (sync implementations are allowed but will be warned).
To implement a hook, you have to fetch the plugin manager:
from simplug import Simplug
pm = Simplug('xqute')
# or
from xqute import simplug as pm
and then use the decorator pm.impl
:
@pm.impl
def on_init(scheduler):
...
Implementing a scheduler
Currently there are only 2 builtin schedulers: local
and sge
.
One can implement a scheduler by subclassing the Scheduler
abstract class. There are three abstract methods that have to be implemented in the subclass:
from xqute import Scheduer
class MyScheduler(Scheduler):
name = 'my'
job_class: MyJob
async def submit_job(self, job):
"""How to submit a job, return a unique id in the scheduler system
(the pid for local scheduler for example)
"""
async def kill_job(self, job):
"""How to kill a job"""
async def job_is_running(self, job):
"""Check if a job is running"""
As you may see, we may also need to implement a job class before MyScheduler
. The only abstract method to be implemented is wrap_cmd
:
from xqute import Job
class MyJob(Job):
def shebang(self, scheduler: Scheduler) -> str:
"""The shebang and the options for the job script"""
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file xqute-0.5.3.tar.gz
.
File metadata
- Download URL: xqute-0.5.3.tar.gz
- Upload date:
- Size: 20.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c7189a089a6f1c89755c7505cead669597b89ff2451cb3f2bf2b53decfa9f152 |
|
MD5 | 7d20aac1735ce250773c4383cfa29dc2 |
|
BLAKE2b-256 | d8037fad1535dbe42f1bbdfbb1343ca49bbb1328425fadbbcfc93514ac72e816 |
File details
Details for the file xqute-0.5.3-py3-none-any.whl
.
File metadata
- Download URL: xqute-0.5.3-py3-none-any.whl
- Upload date:
- Size: 24.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0172bf092132ea6a83db64c7c502a49fa7a8e2e07f56c5d2cce83e56ddf5395b |
|
MD5 | c05d3134ba3243ac94a9beb3a202a2c5 |
|
BLAKE2b-256 | b2d61f9bdabc6078afd4c3adfd2c8f4a7de5f8ef20ab0fa1c5b974fdf8c7bbed |