A job management system for python
Project description
xqute
xqute schedules, submits, monitors, and manages batch jobs across local, HPC, cloud, and container backends — all through a single async Python API. It's built for bioinformatics pipelines, ML hyperparameter sweeps, batch data processing, and any workload that needs to fan out across heterogeneous compute.
✨ Features
- Blazingly fast — built on
asynciowithuvloop; thousands of jobs, minimal overhead - Six scheduler backends — local, SGE, Slurm, SSH, Google Cloud Batch, Docker/Podman/Apptainer
- Plugin system — 14 lifecycle hooks let you add logging, notifications, or custom logic without touching core code
- Error strategies — automatic retry with configurable limits, or halt-the-world on first failure
- File-based status tracking — jobs self-report via status files; survives network failures and scheduler quirks
- Daemon mode —
keep_feedinglets you add jobs dynamically at any point - Cloud storage — workdirs on GCS (
gs://), Azure (az://), or S3 (s3://) - Path translation — seamless SpecPath / MountedPath duality for cross-machine execution
- Timeouts — per-job timeout enforcement via
coreutils timeout
📦 Installation
pip install xqute
With optional extras:
pip install 'xqute[gs]' # Google Cloud Storage support
pip install 'xqute[cloudsh]' # Cloud shell support
🚀 Quick start
Default (local scheduler)
import asyncio
from xqute import Xqute
async def main():
xqute = Xqute(forks=3)
for _ in range(10):
await xqute.feed(["sleep", "1"])
await xqute.run_until_complete()
asyncio.run(main())
Daemon mode — add jobs while running
xqute = Xqute(forks=3)
# Start — returns immediately
await xqute.run_until_complete(keep_feeding=True)
# Feed jobs dynamically
for i in range(100):
await xqute.feed(["python", "train.py", str(i)])
await asyncio.sleep(0.1)
# Signal done and wait for everything to finish
await xqute.stop_feeding()
🎯 Scheduler backends
xqute ships with six schedulers. Swap the scheduler argument to switch.
Slurm
xqute = Xqute(
scheduler="slurm",
forks=100,
scheduler_opts={
"partition": "gpu",
"time": "24:00:00",
"mem": "8G",
"gres": "gpu:1",
},
)
SGE (Sun Grid Engine)
xqute = Xqute(
scheduler="sge",
forks=100,
scheduler_opts={
"q": "1-day",
"l": ["h_vmem=4G", "gpu=1"],
},
)
SSH (multi-server)
xqute = Xqute(
scheduler="ssh",
forks=100,
scheduler_opts={
"servers": {
"node1": {"user": "alice", "host": "node1.example.com", "keyfile": "/home/alice/.ssh/id_rsa"},
"node2": {"user": "alice", "host": "node2.example.com", "keyfile": "/home/alice/.ssh/id_rsa"},
}
},
)
Note: SSH servers must share the same filesystem and use key-based auth.
Google Cloud Batch
xqute = Xqute(
scheduler="gbatch",
forks=100,
scheduler_opts={
"project": "my-gcp-project",
"location": "us-central1",
"taskGroups": [{
"taskSpec": {
"runnables": [{
"container": {"imageUri": "ubuntu", "entrypoint": "bash", "commands": ["-c", "..."]}
}]
},
"taskCount": 500,
"parallelism": 100,
}],
},
)
Container (Docker / Podman / Apptainer)
xqute = Xqute(
scheduler="container",
forks=10,
scheduler_opts={
"image": "docker://python:3.12",
"entrypoint": "/bin/bash",
"bin": "docker",
"volumes": ["/data:/data"],
"envs": {"TF_CPP_MIN_LOG_LEVEL": "2"},
},
)
🔌 Plugins
14 lifecycle hooks via simplug. Example — send Slack notifications on failures:
from xqute import simplug as pm
@pm.impl
async def on_job_failed(scheduler, job):
import requests
requests.post(WEBHOOK, json={"text": f"Job {job.index} failed"})
See the Plugins page for the full list of hooks and more examples.
📖 Documentation
Full documentation is at pwwang.github.io/xqute:
- Quick Start — get running in minutes
- User Guide — initialization, error handling, monitoring
- Schedulers — all six backends with config reference
- Plugins — lifecycle hooks and plugin authoring
- Advanced — custom schedulers, Dask/Airflow integration, perf tuning
- API Reference — auto-generated from source
🛠️ Custom scheduler
Implement three async methods to add your own backend:
from xqute import Scheduler
class MyScheduler(Scheduler):
name = "mycluster"
async def submit_job(self, job):
"""Submit and return a unique job ID."""
async def kill_job(self, job):
"""Kill the job given its JID."""
async def job_is_running(self, job):
"""Return True if the job is still running."""
Then pass it directly: Xqute(scheduler=MyScheduler, ...).
📊 Architecture
Jobs are wrapped in a bash template with an EXIT trap that writes status files (job.status, job.rc, job.stdout, job.stderr) into a per-job metadir. The polling loop reads these files — no scheduler API calls for status. This design makes xqute resilient to network hiccups and scheduler oddities.
INIT → QUEUED → SUBMITTED → RUNNING → FINISHED
↓ ↓
KILLING → FAILED
🤝 Contributing
Issues and PRs welcome on GitHub. See AGENTS.md for dev setup and conventions.
📝 License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file xqute-2.1.0.tar.gz.
File metadata
- Download URL: xqute-2.1.0.tar.gz
- Upload date:
- Size: 451.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e939b142c2f6e0a5afb04c623eb4717e3b494622a4fc8628e5c286b7a90ede02
|
|
| MD5 |
38daf7c6c6ce6a302a4a1e29c96bf6a5
|
|
| BLAKE2b-256 |
56c9669173b67dc5c369be83afa0f3631ea86bc74b983d1a6840de6388ec30e6
|
File details
Details for the file xqute-2.1.0-py3-none-any.whl.
File metadata
- Download URL: xqute-2.1.0-py3-none-any.whl
- Upload date:
- Size: 44.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
715360f149ff3f3e9f64b0d8d2aba51554b05cbdb11d962251cdf9e3684dc54f
|
|
| MD5 |
09d05bfc113fe471c7cd83bb884a3ca0
|
|
| BLAKE2b-256 |
5ce343ee3eb27e7b6ca29d022aa39fe8dedc8c65358de16ffd50c7622d8209f9
|