Skip to main content

A job management system for python

Project description

xqute

A job management system for python

Features

  • Written in async
  • Plugin system
  • Scheduler adaptor
  • Job retrying/pipeline halting when failed

Installation

pip install xqute

A toy example

import asyncio
from xqute import Xqute

async def main():
    # 3 jobs allowed to run at the same time
    xqute = Xqute(scheduler_forks=3)
    for _ in range(10):
        await xqute.put('sleep 1')
    await xqute.run_until_complete()

if __name__ == '__main__':
    asyncio.run(main())

xqute

API

https://pwwang.github.io/xqute/

Usage

Xqute object

An xqute is initialized by:

xqute = Xqute(...)

Available arguments are:

  • scheduler: The scheduler class or name
  • plugins: The plugins to enable/disable for this session
  • job_metadir: The job meta directory (Default: ./.xqute/)
  • job_error_strategy: The strategy when there is error happened
  • job_num_retries: Max number of retries when job_error_strategy is retry
  • job_submission_batch: The number of consumers to submit jobs
  • scheduler_forks: Max number of job forks
  • **scheduler_opts: Additional keyword arguments for scheduler

Note that the producer must be initialized in an event loop.

To push a job into the queue:

await xqute.put(['echo', 1])

Using SGE scheduler

xqute = Xqute(
    'sge',
    scheduler_forks=100,
    qsub='path to qsub',
    qdel='path to qdel',
    qstat='path to qstat',
    sge_q='1-day',  # or qsub_q='1-day'
    ...
)

Keyword-arguments with names starting with sge_ will be interpreted as qsub options. list or tuple option values will be expanded. For example: sge_l=['h_vmem=2G', 'gpu=1'] will be expanded in wrapped script like this:

# ...

#$ -l h_vmem=2G
#$ -l gpu=1

# ...

Using Slurm scheduler

xqute = Xqute(
    'slurm',
    scheduler_forks=100,
    sbatch='path to sbatch',
    scancel='path to scancel',
    squeue='path to squeue',
    sbatch_partition='1-day',  # or slurm_partition='1-day'
    sbatch_time='01:00:00',
    ...
)

Using ssh scheduler

xqute = Xqute(
    'ssh',
    scheduler_forks=100,
    ssh='path to ssh',
    ssh_servers={
        "server1": {
            "user": ...,
            "port": 22,
            "keyfile": ...,
            # How long to keep the ssh connection alive
            "ctrl_persist": 600,
            # Where to store the control socket
            "ctrl_dir": "/tmp",
        },
        ...
    }
    ...
)

SSH servers must share the same filesystem and using keyfile authentication.

Plugins

To write a plugin for xqute, you will need to implement the following hooks:

  • on_init(scheduler): Right after scheduler object is initialized
  • on_shutdown(scheduler, sig): When scheduler is shutting down
  • on_job_init(scheduler, job): When the job is initialized
  • on_job_queued(scheduler, job): When the job is queued
  • on_job_submitted(scheduler, job): When the job is submitted
  • on_job_started(scheduler, job): When the job is started (when status changed to running)
  • on_job_polling(scheduler, job): When job status is being polled
  • on_job_killing(scheduler, job): When the job is being killed
  • on_job_killed(scheduler, job): When the job is killed
  • on_job_failed(scheduler, job): When the job is failed
  • on_job_succeeded(scheduler, job): When the job is succeeded

Note that all hooks are corotines except on_init and on_shutdown, that means you should also implement them as corotines (sync implementations are allowed but will be warned).

To implement a hook, you have to fetch the plugin manager:

from simplug import Simplug
pm = Simplug('xqute')

# or
from xqute import simplug as pm

and then use the decorator pm.impl:

@pm.impl
def on_init(scheduler):
    ...

Implementing a scheduler

Currently there are only 2 builtin schedulers: local and sge.

One can implement a scheduler by subclassing the Scheduler abstract class. There are three abstract methods that have to be implemented in the subclass:

from xqute import Scheduer

class MyScheduler(Scheduler):
    name = 'my'
    job_class: MyJob

    async def submit_job(self, job):
        """How to submit a job, return a unique id in the scheduler system
        (the pid for local scheduler for example)
        """

    async def kill_job(self, job):
        """How to kill a job"""

    async def job_is_running(self, job):
        """Check if a job is running"""

As you may see, we may also need to implement a job class before MyScheduler. The only abstract method to be implemented is wrap_cmd:

from xqute import Job

class MyJob(Job):

    async def wrap_cmd(self, scheduler):
        ...

You have to use the trap command in the wrapped script to update job status, return code and clear the job id file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

xqute-0.4.0.tar.gz (19.3 kB view hashes)

Uploaded Source

Built Distribution

xqute-0.4.0-py3-none-any.whl (23.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page