JobMaster allows a user to quickly and simply deploy and manage a collection of tasks.
Project description
JobMaster
JobMaster is a simple job scheduling library for Python. It works with any PostgreSQL database, and is designed to be simple to use and easy to integrate into your existing codebase.
Installation
Not yet publised on pip.
Usage
Any function in your project can become a JobMaster Task by using the @task
decorator.
For example, you may have a function like this:
# jmtests/awesome_things/things1.py
from jobmaster import task
@task
def foo(file_path: str, number: [1, 2, 3]):
"""
Write a number to a file.
:param file_path: the file to write to
:param number: the number to write
"""
with open(file_path, 'w') as f:
f.write(str(number))
Think of "tasks" as functions and "jobs" as instances of those functions with specific arguments.
Once you have properly configured JobMaster, this task will be registered with
type_key='things1'
(the name of the module)task_key='foo'
(the name of the function)
You can add a job to the queue by calling the procedure
call jobmaster.insert_job('things1', 'foo', 10, '{"file_path": "/tmp/sum.txt", "number": 2}'::json)
from wherever you have a connection to your database: from a different python script, from a web application, etc..
jobmaster.insert_job
takes 4 arguments:
- The type key of the task
- The task key of the task
- The priority of the job
- The arguments to pass to the task, json formatted
Somewhere in your python project, you will have a script that pops jobs from the queue and executes them:
# jmtests/__init__.py
import sqlalchemy
db_engine = sqlalchemy.create_engine("postgresql+pg8000://", ...)
# jmtests/utils/jmutils.py
from jobmaster import JobMaster
from .. import db_engine
from '<any module with tasks>' import *
jobmaster = JobMaster(db_engine=db_engine, _validate_dependencies=True)
# Run jobs from the queue until the queue is empty
if __name__ == '__main__':
jobs = jobmaster.run()
This could be run in a loop, or in a cron job, or in a web server, etc..
Tasks can depend on other tasks, and JobMaster will automatically run them in the correct order.
Simple example
In one file you might have:
# module nice_tasks.py
from jobmaster import task, Dependency, same
@task
def foo(file_path: str, number: int):
with open(file_path, 'w') as f:
f.write(str(number))
@task(dependencies=Dependency(foo, 6, file_path=same))
def bar(file_path: str, number: int, letter: ['A', 'B', 'C'] = 'A'):
with open(file_path, 'r') as f:
_n = int(f.read())
with open(file_path, 'w') as f:
f.write(f"{number + _n}{letter}")
@task(
process_limit=3,
dependencies=[
Dependency(foo, 2, number=10, file_path=same),
Dependency(bar, 2, file_path=same)
]
)
def baz(file_path: str):
with open(file_path, 'r') as f:
s = f.read()
And in another file you might have:
# module main.py
import sqlalchemy
from jobmaster import JobMaster
# Create a SQLAlchemy engine for your PostgreSQL database
my_database_engine = sqlalchemy.create_engine("postgresql+pg8000://", ...)
# Create a JobMaster instance
jobmaster = JobMaster(db_engine=my_database_engine)
Setup
The first time you run your code, you'll need to create the necessary tables in your database. You can do this by using the JobMaster.deploy()
method:
# module deploy_jobmaster.py
from .main import jobmaster
# Deploy the necessary tables
jobmaster.deploy()
python3 deploy_jobmaster.py
This will create a schema in your database called jobmaster
and create the necessary tables and functions for JobMaster to work.
If you already have a schema called jobmaster
in your database, you can specify a different schema name when creating the JobMaster
instance:
jobmaster = JobMaster(db_engine=my_database_engine, schema="job_mistress")
If you change any of the task definitions, you must update the database by running jobmaster.deploy()
again. Running jobmaster.deploy(_reset=True)
will drop the schema and all tables then recreate them from scratch, losing any jobs you had in your queue.
Tasks
We have already introduced the @task
decorator.
This can be used without arguments as in
# module my_tasks.py
from jobmaster import task
@task
def foo(a: int, b: int):
# do something
This will register the function foo
as a task with type key 'my_tasks'
and task key 'foo'
.
JobMaster will register the parameters a
and b
, and their types (both int
) are infered from the function signature, it is therefore important to use python type hints.
Optional parameters
If a parameter is optional, you can specify a default value for it:
@task
def foo(a: int = 1, b: int = 2):
# do something
"select-from" parameters
If a parameter has a number of possible options, this should be specified in the type hint:
@task
def foo(a: int, b: [1, 2, 3]):
# do something
the argument values 1
, 2
, and 3
are the only valid values for b
.
"write-all" parameters
For such parameters, it may be desirable to insert a job to the queue which performs the task for all possible values of the parameter.
This can be achieved by specifying the relevant parameters in the write_all
argument of the @task
decorator:
@task(write_all=['b'])
def foo(a: int, b: [1, 2, 3]):
# do something
then calling
call jobmaster.insert_job('my_tasks', 'foo', 10, '{"a": 1, "b": "ALL"}'::json)
('ALL'
in upper-case). When this job is executed, instead of actually executing the task foo
, JobMaster will insert new a job for each possible value of b
into the queue.
Custom type keys
The type key is the name of the module by default, but you can specify a different type key by passing it as an argument to the decorator:
@task(type_key='cool_tasks', write_all=['b'])
def foo(a: int, b: [1, 2, 3]):
# do something
Dependencies
Tasks can depend on other tasks. This is specified by passing Dependency
object (or a list of Dependency
objects) to the dependencies
argument of the @task
decorator:
from jobmaster import task, Dependency, same
@task(type_key='cool_tasks', write_all=['b'])
def foo(a: int, b: [1, 2, 3]):
# do something
@task(
type_key='cool_tasks',
write_all=['b'],
dependencies=Dependency(foo, 2, a=1, b=same)
)
def bar(a: int, b: [1, 2, 3], c: str):
# do something
The Dependency
object takes the task function, a time (in hours), and the arguments to pass to the task.
In this example, the task bar
depends on the task foo
with a=1
and b
the same as the b
of the job for bar
.
When a job with task-type "bar"
is popped from the queue, JobMaster will first check if there is a job for task-type "foo"
with a=1
and b
the same as the b
of the job for bar
, which has been completed in the past 2 hours.
If there isn't, it will insert a job for task-type "foo"
with a=1
and b
the same as the b
of the job for bar
into the queue, with a higher priority than the job for bar
, then re-insert the job for bar
into the queue.
Process units
JobMaster can limit jobs using process units.
When a JobMaster object is initialised, the number of process units available on the system is passed as an argument:
jobmaster = JobMaster(db_engine=my_database_engine, system_process_units=1_000)
If this argument is not specified, JobMaster will check the environment variable JOBMASTER_SYSTEM_PROCESS_UNITS
for the number of process units available. If this is not set, or is not formatted like an integer, it will default to 10,000.
You can then specify the number of process units a task requires by passing an integer to the process_units
argument of the @task
decorator. If this argument is not specified, the task will require 1 process unit by default.
You can potentially think of process units as Megabytes of RAM. So if you want to restrict JobMaster to using 1GB, set system_process_units=1_000
and set process_units
for each task according to how much RAM you expect it to use in MB.
When JobMaster attempts to pop a job from the queue, it checks the queue for all "running" jobs on the same system and sums their process units. This is subtracted from the system process units to get the available process units.
Then, when checking the queue for "waiting" jobs to pop, there is a WHERE process_units <= available_process_units
clause.
Example:
from jobmaster import task, Dependency, same
@task(process_units=10, write_all=['b'])
def foo(a: int, b: [1, 2, 3]):
# do something
@task(
type_key='cool_tasks',
write_all=['b'],
dependencies=Dependency(foo, 2, a=1, b=same),
process_units=20
)
def bar(a: int, b: [1, 2, 3], c: str):
# do something
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for jobmaster-0.1.0-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4f6a6e884734ab1f4831ac9986afe7280726cf48302159442bc99465b08cd357 |
|
MD5 | 0a0a52d1fded326869fcb97e65e54e0c |
|
BLAKE2b-256 | 5d5f367471978199e2baca11e836b56af44bd665e9b8b72ed5b9539a9142ea8a |