Skip to main content

A job management system for python

Project description

xqute

An async-first job management and scheduling framework for Python.

PyPI version Python versions License


xqute schedules, submits, monitors, and manages batch jobs across local, HPC, cloud, and container backends — all through a single async Python API. It's built for bioinformatics pipelines, ML hyperparameter sweeps, batch data processing, and any workload that needs to fan out across heterogeneous compute.

✨ Features

  • Blazingly fast — built on asyncio with uvloop; thousands of jobs, minimal overhead
  • Six scheduler backends — local, SGE, Slurm, SSH, Google Cloud Batch, Docker/Podman/Apptainer
  • Plugin system — 14 lifecycle hooks let you add logging, notifications, or custom logic without touching core code
  • Error strategies — automatic retry with configurable limits, or halt-the-world on first failure
  • File-based status tracking — jobs self-report via status files; survives network failures and scheduler quirks
  • Daemon modekeep_feeding lets you add jobs dynamically at any point
  • Cloud storage — workdirs on GCS (gs://), Azure (az://), or S3 (s3://)
  • Path translation — seamless SpecPath / MountedPath duality for cross-machine execution
  • Timeouts — per-job timeout enforcement via coreutils timeout

📦 Installation

pip install xqute

With optional extras:

pip install 'xqute[gs]'      # Google Cloud Storage support
pip install 'xqute[cloudsh]'  # Cloud shell support

🚀 Quick start

Default (local scheduler)

import asyncio
from xqute import Xqute

async def main():
    xqute = Xqute(forks=3)
    for _ in range(10):
        await xqute.feed(["sleep", "1"])
    await xqute.run_until_complete()

asyncio.run(main())

Daemon mode — add jobs while running

xqute = Xqute(forks=3)

# Start — returns immediately
await xqute.run_until_complete(keep_feeding=True)

# Feed jobs dynamically
for i in range(100):
    await xqute.feed(["python", "train.py", str(i)])
    await asyncio.sleep(0.1)

# Signal done and wait for everything to finish
await xqute.stop_feeding()

🎯 Scheduler backends

xqute ships with six schedulers. Swap the scheduler argument to switch.

Slurm

xqute = Xqute(
    scheduler="slurm",
    forks=100,
    scheduler_opts={
        "partition": "gpu",
        "time": "24:00:00",
        "mem": "8G",
        "gres": "gpu:1",
    },
)

SGE (Sun Grid Engine)

xqute = Xqute(
    scheduler="sge",
    forks=100,
    scheduler_opts={
        "q": "1-day",
        "l": ["h_vmem=4G", "gpu=1"],
    },
)

SSH (multi-server)

xqute = Xqute(
    scheduler="ssh",
    forks=100,
    scheduler_opts={
        "servers": {
            "node1": {"user": "alice", "host": "node1.example.com", "keyfile": "/home/alice/.ssh/id_rsa"},
            "node2": {"user": "alice", "host": "node2.example.com", "keyfile": "/home/alice/.ssh/id_rsa"},
        }
    },
)

Note: SSH servers must share the same filesystem and use key-based auth.

Google Cloud Batch

xqute = Xqute(
    scheduler="gbatch",
    forks=100,
    scheduler_opts={
        "project": "my-gcp-project",
        "location": "us-central1",
        "taskGroups": [{
            "taskSpec": {
                "runnables": [{
                    "container": {"imageUri": "ubuntu", "entrypoint": "bash", "commands": ["-c", "..."]}
                }]
            },
            "taskCount": 500,
            "parallelism": 100,
        }],
    },
)

Container (Docker / Podman / Apptainer)

xqute = Xqute(
    scheduler="container",
    forks=10,
    scheduler_opts={
        "image": "docker://python:3.12",
        "entrypoint": "/bin/bash",
        "bin": "docker",
        "volumes": ["/data:/data"],
        "envs": {"TF_CPP_MIN_LOG_LEVEL": "2"},
    },
)

🔌 Plugins

14 lifecycle hooks via simplug. Example — send Slack notifications on failures:

from xqute import simplug as pm

@pm.impl
async def on_job_failed(scheduler, job):
    import requests
    requests.post(WEBHOOK, json={"text": f"Job {job.index} failed"})

See the Plugins page for the full list of hooks and more examples.

📖 Documentation

Full documentation is at pwwang.github.io/xqute:

  • Quick Start — get running in minutes
  • User Guide — initialization, error handling, monitoring
  • Schedulers — all six backends with config reference
  • Plugins — lifecycle hooks and plugin authoring
  • Advanced — custom schedulers, Dask/Airflow integration, perf tuning
  • API Reference — auto-generated from source

🛠️ Custom scheduler

Implement three async methods to add your own backend:

from xqute import Scheduler

class MyScheduler(Scheduler):
    name = "mycluster"

    async def submit_job(self, job):
        """Submit and return a unique job ID."""

    async def kill_job(self, job):
        """Kill the job given its JID."""

    async def job_is_running(self, job):
        """Return True if the job is still running."""

Then pass it directly: Xqute(scheduler=MyScheduler, ...).

📊 Architecture

Jobs are wrapped in a bash template with an EXIT trap that writes status files (job.status, job.rc, job.stdout, job.stderr) into a per-job metadir. The polling loop reads these files — no scheduler API calls for status. This design makes xqute resilient to network hiccups and scheduler oddities.

INIT → QUEUED → SUBMITTED → RUNNING → FINISHED
                              ↓           ↓
                          KILLING →   FAILED

🤝 Contributing

Issues and PRs welcome on GitHub. See AGENTS.md for dev setup and conventions.

📝 License

MIT — see LICENSE.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

xqute-2.1.0.tar.gz (451.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

xqute-2.1.0-py3-none-any.whl (44.4 kB view details)

Uploaded Python 3

File details

Details for the file xqute-2.1.0.tar.gz.

File metadata

  • Download URL: xqute-2.1.0.tar.gz
  • Upload date:
  • Size: 451.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for xqute-2.1.0.tar.gz
Algorithm Hash digest
SHA256 e939b142c2f6e0a5afb04c623eb4717e3b494622a4fc8628e5c286b7a90ede02
MD5 38daf7c6c6ce6a302a4a1e29c96bf6a5
BLAKE2b-256 56c9669173b67dc5c369be83afa0f3631ea86bc74b983d1a6840de6388ec30e6

See more details on using hashes here.

File details

Details for the file xqute-2.1.0-py3-none-any.whl.

File metadata

  • Download URL: xqute-2.1.0-py3-none-any.whl
  • Upload date:
  • Size: 44.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for xqute-2.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 715360f149ff3f3e9f64b0d8d2aba51554b05cbdb11d962251cdf9e3684dc54f
MD5 09d05bfc113fe471c7cd83bb884a3ca0
BLAKE2b-256 5ce343ee3eb27e7b6ca29d022aa39fe8dedc8c65358de16ffd50c7622d8209f9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page