Asynchronous job scheduler
Project description
asyncjobs
Asynchronous job scheduler.
Description
A job scheduler for running asynchronous (and synchronous) jobs with
dependencies using asyncio. Jobs are identified by their name and implement
an async __call__
method. Jobs may await other jobs or schedule work to be
done in a thread or subprocess. Jobs are run by a Scheduler, which control the
execution of the jobs, as well as the number of concurrent threads and
processes doing work. The Scheduler emits events which allow e.g. progress and
statistics to be easily collected and monitored. A separate module is provided
to turn Scheduler events into an interactive scheduling plot:
Jobs complete successfully by returning (with or without a return value). Any
exception propagated from a job's __call__
method is regarded as a failure.
Any job that depend on (i.e. await the result of) another job will be
automatically cancelled by the scheduler if that other job fails.
The Scheduler handles cancellation (e.g. Ctrl-C) by cancelling all ongoing
and remaining tasks as quickly and cleanly as possible.
Usage examples
Run three simple jobs in sequence
import asyncio
from asyncjobs import Job, Scheduler
# Helper function
def sleep():
import time
print(f'{time.ctime()}: Sleep for a second')
time.sleep(1)
print(f'{time.ctime()}: Finished sleep')
# Job #1 prints uptime
job1 = Job('#1')
job1.subprocess_argv = ['uptime']
# Job #2 waits for #1 and then sleeps in a thread
job2 = Job('#2', deps={'#1'})
job2.thread_func = sleep
# Job #3 waits for #2 and then prints uptime (again)
job3 = Job('#3', deps={'#2'})
job3.subprocess_argv = ['uptime']
# Run all jobs in the scheduler
s = Scheduler()
for job in [job1, job2, job3]:
s.add(job)
asyncio.run(s.run())
should produce output like this:
16:35:58 up 9 days 3:29, 1 user, load average: 0.62, 0.55, 0.55
Tue Feb 25 16:35:58 2020: Sleep for a second
Tue Feb 25 16:35:59 2020: Finished sleep
16:35:59 up 9 days 3:29, 1 user, load average: 0.62, 0.55, 0.55
Fetching web content in parallel
This example fetches a random Wikipedia article, and then follows links to other articles until 10 articles have been fetched. Sample output:
fetching https://en.wikipedia.org/wiki/Special:Random...
* [Indonesia–Mongolia relations] links to 7 articles
fetching https://en.wikipedia.org/wiki/Indonesia...
fetching https://en.wikipedia.org/wiki/Mongolia...
fetching https://en.wikipedia.org/wiki/Jakarta...
fetching https://en.wikipedia.org/wiki/Mongolian_National_University,_Ulan_Bator...
* [Mongolia] links to 529 articles
fetching https://en.wikipedia.org/wiki/Sukarno...
* [Indonesia] links to 697 articles
fetching https://en.wikipedia.org/wiki/Megawati_Soekarnoputri...
* [Jakarta] links to 757 articles
fetching https://en.wikipedia.org/wiki/Susilo_Bambang_Yudhoyono...
* [Mongolian National University] links to 2 articles
fetching https://en.wikipedia.org/wiki/Mongolian_language...
* [Sukarno] links to 523 articles
fetching https://en.wikipedia.org/wiki/Mongolian_script...
* [Susilo Bambang Yudhoyono] links to 159 articles
* [Megawati Sukarnoputri] links to 88 articles
* [Mongolian language] links to 259 articles
* [Mongolian script] links to 142 articles
Wasting time efficiently across multiple threads
The final example (which was used to produce the schedule plot above) simulates a simple build system: It creates a number of jobs (default: 10), each job sleeps for some random time (default: <=100ms), and has some probability of depending on each preceding job (default: 0.5). After awaiting its dependencies, each job may also split portions of its work into one or more sub-jobs, and await their completion, before finishing its remaining work. Everything is scheduled across a fixed number of worker threads (default: 4).
Installation
Run the following to install:
$ pip install asyncjobs
Development
To work on asyncjobs, clone this repo, and run the following (in a virtualenv) to get everything you need to develop and run tests:
$ pip install -e .[dev]
Additionally, if you want to generate scheduling plots (as seen above), you need a couple more dependencies ([plotly])(https://plotly.com/python/) and numpy):
$ pip install -e .[dev,plot]
Alternatively, if you are using Nix, use the included
shell.nix
to get a development environment with everything automatically
installed:
$ nix-shell
Use nox
to run all tests, formatters and linters:
$ nox
This will run the test suite under all supported Python versions, format the
code with black
and run the
flake8
linter.
Contributing
Main development happens at https://github.com/jherland/asyncjobs/. Post issues and PRs there.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file asyncjobs-0.1.tar.gz
.
File metadata
- Download URL: asyncjobs-0.1.tar.gz
- Upload date:
- Size: 113.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.6.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3f1d0a8629aa6802197abfa9fdcd228cbfa36a33463305389324f7b53c05bcc0 |
|
MD5 | 67dbe922c2f9e237008de9d2f685336c |
|
BLAKE2b-256 | ceb19e72a84e33e5c563704447a1e73028c1ed03d1a33d16ef905ba1b886c221 |
File details
Details for the file asyncjobs-0.1-py3-none-any.whl
.
File metadata
- Download URL: asyncjobs-0.1-py3-none-any.whl
- Upload date:
- Size: 15.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.6.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2b23b829435be46a56869658e38026f07db84409e507bb927e9b1ccbfb629a8d |
|
MD5 | 1ea3e6f87a6d1e797c32adcc0ffe69dd |
|
BLAKE2b-256 | 7b40f19f310f62e434ea8eac0fa4076af5eb7e0478f3450c9bd816ca992835f8 |