Skip to main content

A job management system for python

Project description

xqute

A job management system for Python, designed to simplify job scheduling and execution with support for multiple schedulers and plugins.

Features

  • Written in async for high performance
  • Plugin system for extensibility
  • Scheduler adaptor for various backends
  • Job retrying and pipeline halting on failure
  • Support for cloud-based working directories
  • Built-in support for Google Batch Jobs, Slurm, SGE, SSH, and container schedulers

Installation

pip install xqute

A Toy Example

import asyncio
from xqute import Xqute

async def main():
    # Initialize Xqute with 3 jobs allowed to run concurrently
    xqute = Xqute(forks=3)
    for _ in range(10):
        await xqute.feed(['sleep', '1'])
    await xqute.run_until_complete()

if __name__ == '__main__':
    asyncio.run(main())

Daemon Mode (Keep Feeding)

You can also run Xqute in daemon mode, where jobs can be added continuously after starting:

import asyncio
from xqute import Xqute

async def main():
    xqute = Xqute(forks=3)

    # Add initial job
    await xqute.feed(['echo', 'Job 1'])

    # Start in keep_feeding mode (returns immediately)
    await xqute.run_until_complete(keep_feeding=True)

    # Continue adding jobs dynamically
    for i in range(2, 11):
        await xqute.feed(['sleep', '1'])
        await asyncio.sleep(0.1)  # Jobs can be added over time

    # Signal completion and wait for all jobs to finish
    await xqute.stop_feeding()

if __name__ == '__main__':
    asyncio.run(main())

Tip: Use xqute.is_feeding() to check if you need to call stop_feeding().

xqute

API Documentation

Full API documentation is available at: https://pwwang.github.io/xqute/

Usage

Xqute Object

An Xqute object is initialized as follows:

xqute = Xqute(...)

Available arguments are:

  • scheduler: The scheduler class or name (default: local)
  • plugins: Plugins to enable/disable for this session
  • workdir: Directory for job metadata (default: ./.xqute/)
  • forks: Number of jobs allowed to run concurrently
  • error_strategy: Strategy for handling errors (e.g., halt, retry)
  • num_retries: Maximum number of retries when error_strategy is set to retry
  • submission_batch: Number of jobs to submit in a batch
  • scheduler_opts: Additional keyword arguments for the scheduler
  • jobname_prefix: Prefix for job names
  • recheck_interval: Interval to recheck job status. The actual interval will be <recheck_interval> * <xqute.defaults.SLEEP_INTERVAL_POLLING_JOBS>

Note: The producer must be initialized within an event loop.

To add a job to the queue:

await xqute.feed(['echo', 'Hello, World!'])

To run until all jobs complete:

# Traditional mode - wait for all jobs to complete
await xqute.run_until_complete()

# Or daemon mode - add jobs continuously
await xqute.run_until_complete(keep_feeding=True)
# ... add more jobs ...
await xqute.stop_feeding()  # Signal completion and wait

Using SGE Scheduler

xqute = Xqute(
    scheduler='sge',
    forks=100,
    scheduler_opts={
        'qsub': '/path/to/qsub',
        'qdel': '/path/to/qdel',
        'qstat': '/path/to/qstat',
        'q': '1-day',  # or qsub_q='1-day'
    }
)

Keyword arguments starting with sge_ are interpreted as qsub options. For example:

'l': ['h_vmem=2G', 'gpu=1']

will be expanded in the job script as:

#$ -l h_vmem=2G
#$ -l gpu=1

Using Slurm Scheduler

xqute = Xqute(
    scheduler='slurm',
    forks=100,
    scheduler_opts={
        'sbatch': '/path/to/sbatch',
        'scancel': '/path/to/scancel',
        'squeue': '/path/to/squeue',
        'partition': '1-day',
        'time': '01:00:00',
    }
)

Using SSH Scheduler

xqute = Xqute(
    scheduler='ssh',
    forks=100,
    scheduler_opts={
        'ssh': '/path/to/ssh',
        'servers': {
            'server1': {
                'user': 'username',
                'port': 22,
                'keyfile': '/path/to/keyfile',
                'ctrl_persist': 600,
                'ctrl_dir': '/tmp',
            }
        }
    }
)

Note: SSH servers must share the same filesystem and use keyfile authentication.

Using Google Batch Jobs Scheduler

xqute = Xqute(
    scheduler='gbatch',
    forks=100,
    scheduler_opts={
        'project': 'your-gcp-project-id',
        'location': 'us-central1',
        'gcloud': '/path/to/gcloud',
        'taskGroups': [ ... ],
    }
)

Using Container Scheduler

xqute = Xqute(
    scheduler='container',
    forks=100,
    scheduler_opts={
        'image': 'docker://bash:latest',
        'entrypoint': '/usr/local/bin/bash',
        'bin': 'docker',
        'volumes': '/host/path:/container/path',
        'envs': {'MY_ENV_VAR': 'value'},
        'remove': True,
        'bin_args': ['--hostname', 'xqute-container'],
    }
)

Plugins

To create a plugin for xqute, implement the following hooks:

  • def on_init(scheduler): Called after the scheduler is initialized
  • def on_shutdown(scheduler, sig): Called when the scheduler shuts down
  • async def on_job_init(scheduler, job): Called when a job is initialized
  • async def on_job_queued(scheduler, job): Called when a job is queued
  • async def on_job_submitted(scheduler, job): Called when a job is submitted
  • async def on_job_started(scheduler, job): Called when a job starts running
  • async def on_job_polling(scheduler, job, counter): Called during job status polling
  • async def on_job_killing(scheduler, job): Called when a job is being killed
  • async def on_job_killed(scheduler, job): Called when a job is killed
  • async def on_job_failed(scheduler, job): Called when a job fails
  • async def on_job_succeeded(scheduler, job): Called when a job succeeds
  • def on_jobcmd_init(scheduler, job) -> str: Called during job command initialization
  • def on_jobcmd_prep(scheduler, job) -> str: Called before the job command runs
  • def on_jobcmd_end(scheduler, job) -> str: Called after the job command completes

To implement a hook, use the simplug plugin manager:

from xqute import simplug as pm

@pm.impl
def on_init(scheduler):
    ...

Implementing a Scheduler

To create a custom scheduler, subclass the Scheduler abstract class and implement the following methods:

from xqute import Scheduler

class MyScheduler(Scheduler):
    name = 'mysched'

    async def submit_job(self, job):
        """Submit a job and return its unique ID."""

    async def kill_job(self, job):
        """Kill a job."""

    async def job_is_running(self, job):
        """Check if a job is running."""

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

xqute-2.0.8.tar.gz (36.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

xqute-2.0.8-py3-none-any.whl (43.7 kB view details)

Uploaded Python 3

File details

Details for the file xqute-2.0.8.tar.gz.

File metadata

  • Download URL: xqute-2.0.8.tar.gz
  • Upload date:
  • Size: 36.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.12.3 Linux/6.14.0-1017-azure

File hashes

Hashes for xqute-2.0.8.tar.gz
Algorithm Hash digest
SHA256 2c76e246302554eb71912eaced2f7ac2e1566073cd45e193bab464f3be89a2dd
MD5 c7f5d3b5bfd3b9c92846bca4404bd90d
BLAKE2b-256 6e541064e41f02d84473c91de31b6fabacda1cf6ff6dcc0637cb6fc1fcd47289

See more details on using hashes here.

File details

Details for the file xqute-2.0.8-py3-none-any.whl.

File metadata

  • Download URL: xqute-2.0.8-py3-none-any.whl
  • Upload date:
  • Size: 43.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.12.3 Linux/6.14.0-1017-azure

File hashes

Hashes for xqute-2.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 752d035be1bb59b7a95c5a61ec5b7808695cfae6efa22b2c58ac1d3d613a72e3
MD5 8654a55d74b8300d7df547a47de9f069
BLAKE2b-256 55b74ce865d69d5a4498abaffd5c04012d4ddba52c9883032da3494bffa64bf3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page