Skip to main content

An easy to use, celerly-like jobs framework, for creating, distributing, and managing workloads

Project description

A jobs framework for managing and distributing async / non-async tasks

Documentation Status PyPI version

Quick Start

$ virtualenv -p python3.7 easy-job-env

$ source easy-jobs-env/bin/activate

(easy-rpc-env)$ pip install easyjobs

Documentation: easyjobs.readthedocs.io

Usage - Jobs Manager

import asyncio, os
from easyjobs.manager import EasyJobsManager
from fastapi import FastAPI

server = FastAPI()

os.environ['DB_PATH'] = '/mnt/jobs_database/'

@server.on_event('startup')
async def startup():

    job_manager = await EasyJobsManager.create(
        server,
        server_secret='abcd1234'
    )
$ uvicorn --host 0.0.0.0 --port 8220 job_manager:server

Basic Usage - Worker

import asyncio, os
from fastapi import FastAPI
from easyjobs.workers.worker import EasyJobsWorker

server = FastAPI()

@server.on_event('startup')
async def setup():
    worker = await EasyJobsWorker.create(
        server,
        server_secret='abcd1234',
        manager_host='0.0.0.0',
        manager_port=8220,
        manager_secret='abcd1234',
        jobs_queue='ETL',
        max_tasks_per_worker=5
    )

    every_minute = '* * * * *'
    default_args = {'args': ['http://stats']}

    async def get_data(url):
        return {'a': 1, 'b': 2, 'c': 3}
    async def load_db(data: dict):
        await db.tables['transformed'].insert(**data)
        return f"data {data} loaded to db"
    async def send_email(address: str, message: str):
        return f"email sent to {address}"

    @worker.task(run_after=['transform'], schedule=every_minute, default_args=default_args)
    async def extract(url: str):
        print(f"extract started")
        data = await get_data(url)
        return {'data': data}

    @worker.task(run_after=['load'])
    async def transform(data: dict):
        print(f"transform started")
        for k in data.copy():
            data[k] = int(data[k]) + 2
        return {'data': data}

    @worker.task(on_failure='failure_notify', run_after=['compute'])
    async def load(data):
        print(f"load started")
        await load_db(data)
        return {'data': data}

    @worker.task()
    async def failure_notify(job_failed):
        await send_email('admin@company.io', job_failed)
        return job_failed

    @worker.task()
    async def deploy_environment():
        print(f"deploy_environment - started")
        await asyncio.sleep(5)
        print(f"deploy_environment - completed")
        return f"deploy_environment - completed"

    @worker.task()
    async def prepare_db():
        print(f"prepare_db - started")
        await asyncio.sleep(5)
        print(f"prepare_db - completed")
        return f"deploy_environment - completed"

    @worker.task(run_before=['deploy_environment', 'prepare_db'])
    async def configure_environment():
        print(f"pre_compute - starting")
        await asyncio.sleep(5)
        print(f"pre_compute - finished")
        return f"pre_compute - finished"

    os.environ['WORKER_TASK_DIR'] = '/home/codemation/subprocesses'

    @worker.task(subprocess=True, run_before=['configure_environment'])
    async def compute(data: dict):
        pass
#/home/codemation/subprocessescompute.py
import time
from easyjobs.workers.task import subprocess

@subprocess
def work(data: dict):
    print(f"starting heavy (blocking) computation on {data}")
    """
    insert work here
    """
    time.sleep(5) # Blocking
    return {'result': f'I slept for 5 seconds - blocking - with data', **data}

if __name__ == '__main__':
    work()

Start Worker - With 5 Workers

$ uvicorn --host 0.0.0.0 --port 8221 job_worker:server --workers=5

extract started
transform started
load started
deploy_environment - started
prepare_db - started
deploy_environment - completed
prepare_db - completed
pre_compute - starting
pre_compute - finished
task subprocess called with ['/home/josh/Documents/python/easyjobs/compute.py', '0.0.0.0', '8220', 'abcd1234', '90226595-8779-11eb-a5cb-3bbcbb9b64eb', '{"args": [], "kwargs": {"data": {"a": 3, "b": 4, "c": 5}}}']
starting heavy computation on {'a': 3, 'b': 4, 'c': 5}

Try it out"

visit Job Manager uri: 
http://0.0.0.0:8220/docs

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

easyjobs-0.123-py3-none-any.whl (19.1 kB view details)

Uploaded Python 3

File details

Details for the file easyjobs-0.123-py3-none-any.whl.

File metadata

  • Download URL: easyjobs-0.123-py3-none-any.whl
  • Upload date:
  • Size: 19.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.2.0 requests-toolbelt/0.9.1 tqdm/4.47.0 CPython/3.7.8

File hashes

Hashes for easyjobs-0.123-py3-none-any.whl
Algorithm Hash digest
SHA256 beac98fade0ebadaf9b56b9663f459426851d0af60a42e2f4242d2d9214c7205
MD5 a58e9d2cccff296d3288294d891bccb1
BLAKE2b-256 bc2630f595176622b8ead3d71ccb2056e483a9990c2b04b2203163b55d4b8a07

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page