Skip to main content

A job management system for python

Project description

xqute

A job management system for python

Features

  • Written in async
  • Plugin system
  • Scheduler adaptor
  • Job retrying/pipeline halting when failed
  • Support cloud working directory
  • Support Google Batch Jobs scheduler

Installation

pip install xqute

A toy example

import asyncio
from xqute import Xqute

async def main():
    # 3 jobs allowed to run at the same time
    xqute = Xqute(forks=3)
    for _ in range(10):
        await xqute.put('sleep 1')
    await xqute.run_until_complete()

if __name__ == '__main__':
    asyncio.run(main())

xqute

API

https://pwwang.github.io/xqute/

Usage

Xqute object

An xqute is initialized by:

xqute = Xqute(...)

Available arguments are:

  • scheduler: The scheduler class or name
  • plugins: The plugins to enable/disable for this session
  • workdir: The job meta directory (Default: ./.xqute/)
  • forks: The number of jobs allowed to run at the same time
  • error_strategy: The strategy when there is error happened
  • num_retries: Max number of retries when job_error_strategy is retry
  • submission_batch: The number of consumers to submit jobs
  • scheduler_opts: Additional keyword arguments for scheduler
  • jobname_prefix: The prefix of the job name
  • recheck_interval: The interval to recheck the job status

Note that the producer must be initialized in an event loop.

To push a job into the queue:

await xqute.put(['echo', 1])

Using SGE scheduler

xqute = Xqute(
    'sge',
    forks=100,
    scheduler_opts=dict(
        qsub='path to qsub',
        qdel='path to qdel',
        qstat='path to qstat',
        q='1-day',  # or qsub_q='1-day'
    )
    ...
)

Keyword-arguments with names starting with sge_ will be interpreted as qsub options. list or tuple option values will be expanded. For example: l=['h_vmem=2G', 'gpu=1'] will be expanded in wrapped script like this:

# ...

#$ -l h_vmem=2G
#$ -l gpu=1

# ...

Using Slurm scheduler

xqute = Xqute(
    'slurm',
    forks=100,
    scheduler_opts = {
        "sbatch": 'path to sbatch',
        "scancel": 'path to scancel',
        "squeue": 'path to squeue',
        "partition": '1-day',  # or partition='1-day'
        "time": '01:00:00',
        ...
    },
)

Using ssh scheduler

xqute = Xqute(
    'ssh',
    forks=100,
    scheduler_opts={
        "ssh": 'path to ssh',
        "servers": {
            "server1": {
                "user": ...,
                "port": 22,
                "keyfile": ...,
                # How long to keep the ssh connection alive
                "ctrl_persist": 600,
                # Where to store the control socket
                "ctrl_dir": "/tmp",
            },
            ...
        }
    },
    ...
)

SSH servers must share the same filesystem and using keyfile authentication.

Using Google Batch Jobs scheduler

xqute = Xqute(
    'gbatch',
    forks=100,
    scheduler_opts={
        "project": "your-gcp-project-id",
        "location": "us-central1",
        "gcloud": "path to gcloud",  # must be authenticated
        # see https://cloud.google.com/batch/docs/create-run-example-job#create-job
        "taskGroups": [ ... ],
    }
)

Using Container scheduler

xqute = Xqute(
    'container',
    forks=100,
    scheduler_opts={
        "image": "docker://bash:latest",  # or path to sif file for apptainer
        "entrypoint": "/usr/local/bin/bash",
        "bin": "docker",  # or "podman" or "apptainer"
        "volumes": "/path/on/host:/path/in/container",  # extra volume mapping
        "envs": {"MY_ENV_VAR": "value"},  # environment variables to set
        "remove": True,  # remove container after execution (Docker/Podman only)
        # additional arguments to pass to the container runtime
        "bin_args": ["--hostname", "xqute-container"],
    }
)

Plugins

To write a plugin for xqute, you will need to implement the following hooks:

  • def on_init(scheduler): Right after scheduler object is initialized
  • def on_shutdown(scheduler, sig): When scheduler is shutting down
  • async def on_job_init(scheduler, job): When the job is initialized
  • async def on_job_queued(scheduler, job): When the job is queued
  • async def on_job_submitted(scheduler, job): When the job is submitted
  • async def on_job_started(scheduler, job): When the job is started (when status changed to running)
  • async def on_job_polling(scheduler, job, counter): When job status is being polled
  • async def on_job_killing(scheduler, job): When the job is being killed
  • async def on_job_killed(scheduler, job): When the job is killed
  • async def on_job_failed(scheduler, job): When the job is failed
  • async def on_job_succeeded(scheduler, job): When the job is succeeded
  • def on_jobcmd_init(scheduler, job) -> str: When the job command wrapper script is initialized before the prescript is run. This will replace the placeholder {jobcmd_init} in the wrapper script.
  • def on_jobcmd_prep(scheduler, job) -> str: When the job command is right about to run in the wrapper script. This will replace the placeholder {jobcmd_prep} in the wrapper script.
  • def on_jobcmd_end(scheduler, job) -> str: When the job command wrapper script is about to end and after the postscript is run. This will replace the placeholder {jobcmd_end} in the wrapper script.

Note that all hooks are corotines except on_init, on_shutdown and on_jobcmd_*, that means you should also implement them as corotines (sync implementations are allowed but will be warned).

You may also check where the hooks are called in the following diagram:

xqute-design

To implement a hook, you have to fetch the plugin manager:

from simplug import Simplug
pm = Simplug('xqute')

# or
from xqute import simplug as pm

and then use the decorator pm.impl:

@pm.impl
def on_init(scheduler):
    ...

Implementing a scheduler

Currently there are a few builtin schedulers: local, slurm, gbatch, container and sge.

One can implement a scheduler by subclassing the Scheduler abstract class. There are three abstract methods that have to be implemented in the subclass:

from xqute import Scheduer


class MyScheduler(Scheduler):
    name = 'mysched'

    async def submit_job(self, job):
        """How to submit a job, return a unique id in the scheduler system
        (the pid for local scheduler for example)
        """

    async def kill_job(self, job):
        """How to kill a job"""

    async def job_is_running(self, job):
        """Check if a job is running"""

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

xqute-0.10.10.tar.gz (32.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

xqute-0.10.10-py3-none-any.whl (38.2 kB view details)

Uploaded Python 3

File details

Details for the file xqute-0.10.10.tar.gz.

File metadata

  • Download URL: xqute-0.10.10.tar.gz
  • Upload date:
  • Size: 32.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.4 CPython/3.12.3 Linux/6.11.0-1018-azure

File hashes

Hashes for xqute-0.10.10.tar.gz
Algorithm Hash digest
SHA256 cd68432defc25127e69802221631a917e19707f17e69efa19749f3c534a93ba2
MD5 8f9aca1a7c00335c998520478e0817f9
BLAKE2b-256 5f0a4d2b76c01fc540cd2219a8457a784b4ebfe0e6f009187f8966d343c818d5

See more details on using hashes here.

File details

Details for the file xqute-0.10.10-py3-none-any.whl.

File metadata

  • Download URL: xqute-0.10.10-py3-none-any.whl
  • Upload date:
  • Size: 38.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.4 CPython/3.12.3 Linux/6.11.0-1018-azure

File hashes

Hashes for xqute-0.10.10-py3-none-any.whl
Algorithm Hash digest
SHA256 06e557fef76ce509525e87d9d02aaac7fbbc9be4fd35adc4457b0bf45ea02d1e
MD5 10a8cb669340e841ad27e50c2bc1457c
BLAKE2b-256 0b87a709c47ef4de4d0a3731d61f1df161433b00d75c932fb538fc5a85706cc9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page