Skip to main content

Asynchronous job scheduler

Project description

asyncjobs

Asynchronous job scheduler.

Description

A job scheduler for running asynchronous (and synchronous) jobs with dependencies using asyncio. Jobs are identified by their name and implement an async __call__ method. Jobs may await other jobs or schedule work to be done in a thread or subprocess. Jobs are run by a Scheduler, which control the execution of the jobs, as well as the number of concurrent threads and processes doing work. The Scheduler emits events which allow e.g. progress and statistics to be easily collected and monitored. A separate module is provided to turn Scheduler events into an interactive scheduling plot:

Example schedule plot

Jobs complete successfully by returning (with or without a return value). Any exception propagated from a job's __call__ method is regarded as a failure. Any job that depend on (i.e. await the result of) another job will be automatically cancelled by the scheduler if that other job fails. The Scheduler handles cancellation (e.g. Ctrl-C) by cancelling all ongoing and remaining tasks as quickly and cleanly as possible.

Usage examples

Run three simple jobs in sequence

(code also available here):

import asyncio
from asyncjobs import Job, Scheduler

# Helper function
def sleep():
    import time
    print(f'{time.ctime()}: Sleep for a second')
    time.sleep(1)
    print(f'{time.ctime()}: Finished sleep')

# Job #1 prints uptime
job1 = Job('#1')
job1.subprocess_argv = ['uptime']

# Job #2 waits for #1 and then sleeps in a thread
job2 = Job('#2', deps={'#1'})
job2.thread_func = sleep

# Job #3 waits for #2 and then prints uptime (again)
job3 = Job('#3', deps={'#2'})
job3.subprocess_argv = ['uptime']

# Run all jobs in the scheduler
s = Scheduler()
for job in [job1, job2, job3]:
    s.add(job)
asyncio.run(s.run())

should produce output like this:

 16:35:58  up 9 days  3:29,  1 user,  load average: 0.62, 0.55, 0.55
Tue Feb 25 16:35:58 2020: Sleep for a second
Tue Feb 25 16:35:59 2020: Finished sleep
 16:35:59  up 9 days  3:29,  1 user,  load average: 0.62, 0.55, 0.55

Fetching web content in parallel

This example fetches a random Wikipedia article, and then follows links to other articles until 10 articles have been fetched. Sample output:

    fetching https://en.wikipedia.org/wiki/Special:Random...
  * [Indonesia–Mongolia relations] links to 7 articles
      fetching https://en.wikipedia.org/wiki/Indonesia...
      fetching https://en.wikipedia.org/wiki/Mongolia...
      fetching https://en.wikipedia.org/wiki/Jakarta...
      fetching https://en.wikipedia.org/wiki/Mongolian_National_University,_Ulan_Bator...
    * [Mongolia] links to 529 articles
      fetching https://en.wikipedia.org/wiki/Sukarno...
    * [Indonesia] links to 697 articles
      fetching https://en.wikipedia.org/wiki/Megawati_Soekarnoputri...
    * [Jakarta] links to 757 articles
      fetching https://en.wikipedia.org/wiki/Susilo_Bambang_Yudhoyono...
    * [Mongolian National University] links to 2 articles
        fetching https://en.wikipedia.org/wiki/Mongolian_language...
    * [Sukarno] links to 523 articles
        fetching https://en.wikipedia.org/wiki/Mongolian_script...
    * [Susilo Bambang Yudhoyono] links to 159 articles
    * [Megawati Sukarnoputri] links to 88 articles
      * [Mongolian language] links to 259 articles
      * [Mongolian script] links to 142 articles

Wasting time efficiently across multiple threads

The final example (which was used to produce the schedule plot above) simulates a simple build system: It creates a number of jobs (default: 10), each job sleeps for some random time (default: <=100ms), and has some probability of depending on each preceding job (default: 0.5). After awaiting its dependencies, each job may also split portions of its work into one or more sub-jobs, and await their completion, before finishing its remaining work. Everything is scheduled across a fixed number of worker threads (default: 4).

Installation

Run the following to install:

$ pip install asyncjobs

Development

To work on asyncjobs, clone this repo, and run the following (in a virtualenv) to get everything you need to develop and run tests:

$ pip install -e .[dev]

Additionally, if you want to generate scheduling plots (as seen above), you need a couple more dependencies ([plotly])(https://plotly.com/python/) and numpy):

$ pip install -e .[dev,plot]

Alternatively, if you are using Nix, use the included shell.nix to get a development environment with everything automatically installed:

$ nix-shell

Use nox to run all tests, formatters and linters:

$ nox

This will run the test suite under all supported Python versions, format the code with black and run the flake8 linter.

Contributing

Main development happens at https://github.com/jherland/asyncjobs/. Post issues and PRs there.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncjobs-0.1.tar.gz (113.1 kB view details)

Uploaded Source

Built Distribution

asyncjobs-0.1-py3-none-any.whl (15.8 kB view details)

Uploaded Python 3

File details

Details for the file asyncjobs-0.1.tar.gz.

File metadata

  • Download URL: asyncjobs-0.1.tar.gz
  • Upload date:
  • Size: 113.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.6.10

File hashes

Hashes for asyncjobs-0.1.tar.gz
Algorithm Hash digest
SHA256 3f1d0a8629aa6802197abfa9fdcd228cbfa36a33463305389324f7b53c05bcc0
MD5 67dbe922c2f9e237008de9d2f685336c
BLAKE2b-256 ceb19e72a84e33e5c563704447a1e73028c1ed03d1a33d16ef905ba1b886c221

See more details on using hashes here.

File details

Details for the file asyncjobs-0.1-py3-none-any.whl.

File metadata

  • Download URL: asyncjobs-0.1-py3-none-any.whl
  • Upload date:
  • Size: 15.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.6.10

File hashes

Hashes for asyncjobs-0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2b23b829435be46a56869658e38026f07db84409e507bb927e9b1ccbfb629a8d
MD5 1ea3e6f87a6d1e797c32adcc0ffe69dd
BLAKE2b-256 7b40f19f310f62e434ea8eac0fa4076af5eb7e0478f3450c9bd816ca992835f8

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page