Skip to main content

An easy to use, celerly-like jobs framework, for creating, distributing, and managing workloads

Project description

A jobs framework for managing and distributing async / non-async tasks

Quick Start

$ virtualenv -p python3.7 easy-job-env

$ source easy-jobs-env/bin/activate

(easy-rpc-env)$ pip install easyjobs

Supported Brokers - Pull Jobs

  • rabbitmq
  • easyjobs
  • TODO - Amazon SQS

Supported Producers

  • rabbitmq - Send jobs to rabbitmq first - consume later
  • EasyJobs API - Create jobs directly via EasyJobsManager API

Basic Usage - With Broker

# Manager - Jobs Runner
# job_manager.py

import asyncio
from easyjobs.manager import EasyJobsManager
from fastapi import FastAPI

server = FastAPI()

@server.on_event('startup')
async def startup():

    job_manager = await EasyJobsManager.create(
        server,
        '/ws/jobs',
        server_secret='abcd1234',
        broker_type='rabbitmq',
        broker_path='amqp://guest:guest@127.0.0.1/'
    )

    @job_manager.task()
    async def basic_job(arg1, arg2, arg3, *args):
        print(f"basic_job: {arg1} {arg2} {arg3} - args {args}")
        await asyncio.sleep(2)
        return arg1, arg2, arg3

Basic Usage - No Broker

# Manager - Jobs Runner
# job_manager.py

import asyncio
from easyjobs.manager import EasyJobsManager
from fastapi import FastAPI

server = FastAPI()

@server.on_event('startup')
async def startup():

    job_manager = await EasyJobsManager.create(
        server,
        '/ws/jobs',
        server_secret='abcd1234'
    )

    @job_manager.task()
    async def basic_job(arg1, arg2, arg3, *args):
        print(f"basic_job: {arg1} {arg2} {arg3} - args {args}")
        await asyncio.sleep(2)
        return arg1, arg2, arg3

Start Job Manager

$ uvicorn --host <host_address> --port <tcp_port> job_manager:server

Connect Worker

# job_worker.py

import asyncio
from fastapi import FastAPI
from easyjobs.workers.worker import EasyJobsWorker

server = FastAPI()

@server.on_event('startup')
async def setup():
    worker = await EasyJobsWorker.create(
        server,
        '/ws/jobs',
        server_secret='abcd1234',
        manager_host='192.168.1.18',
        manager_port=8220,
        manager_secret='abcd1234',
        manager_path='/ws/jobs',
        jobs_queue='DEFAULT',
        max_tasks_per_worker=3
    )

    @worker.task()
    async def work_a(a, b, c):
        await asyncio.sleep(5)
        return {'result': [a, b, c]}

Start Worker - With 5 Workers

$ uvicorn --host <host_addr> --port <port> job_worker:server --workers=5

To test it out - visit job manager uri: Example: http://0.0.0.0:8220/docs

Register Tasks

Tasks can be registered on a Manager or Worker by using referencing the .task decorator / function.

Task register arguments:

  • namespace - Defaults to 'DEFAULT' - Determines what queue task is registered within, methods can be registered within multiple namespaces.
  • on_failure - Default Unspecified - Will attempt to create with on_failure=<task_name> if task run resulted in a failure
  • retry_policy - Defaults retry_policy='retry_once', with possible values [retry_always, never]
  • run_after - Defaults Unspecified - Will create job with run_after=<task_name> using results of current task as argument for run_afer task.
  • subprocess - Defaults False - Defines whether a task should be created via a subprocess

@worker.task(namespace='finance')
async def finance_work(employee_id: str, employee_data: dict):
    """
    do finance work
    """
    return finance_results

@manager.task()
async def general_work(general_data: dict):
    """
    do general work
    """
    return general_results

Note: Considerations with non-async Tasks

Worker tasks which do not contain I/O bound tasks (Netowrk, Web Requests / Database querries ) and run beyond 10 seconds, should be placed within a task subprocess definition. This is to allow the current worker thread continue servicing other concurrent tasks.

Tasks created with subprocess=True, will create a new process (using an separate & uncontended python GIL), run until completed / failed, and then report the results back to EasyJobsManager. EasyJobsManager will provide the results to the worker, releasing a task reservation ( allowing more work to complete using results).

Subprocess usage with Blocking Code

#worker

#required env vars
os.environ['WORKER_TASK_DIR'] = '/home/codemation/blocking_funcs/'

@worker.task(subprocess=True)
async def basic_blocking(a, b, c):
    pass   
$ ls /home/codemation/blocking_funcs
advanced_blocking.py basic_blocking.py 
# manager

#required env vars
os.environ['MANAGER_HOST'] = '0.0.0.0'
os.environ['MANAGER_PORT'] = '8220'
os.environ['WORKER_TASK_DIR'] = /home/codemation/blocking_funcs/

@manager.task(subprocess=True)
async def advanced_blocking(a, b, c):
    pass   
  • Methods registered with 'subprocess=True' do not contain logic
  • Arguments improve readability, but do not affect functionality (except in template.)

Subprocess Template - Example

# /home/codemation/blocking_funcs/basic_blocking.py
import time
from easyjobs.workers.task import subprocess

@subprocess
def work(a, b, c):
    """
    insert blocking / non-blocking work here
    """
    time.sleep(5) # Blocking
    return {'result': 'I slept for 5 seconds - blocking with {a} {b} {c}'}

if __name__ == '__main__':
    work()

Creating Jobs

Jobs can be created 1 of 2 ways:

EasyJobsManager API

  • Methods registered locally for connected workers will be visible from OpenAPI and can be invoked via REST Method.
  • Invocation of a job via the API will return a request_id indicating the request was added to the persistent queue.
  • A job will be created as soon as a worker is able to respond to the job creation request.
  • Created Jobs are then added to a secondary queue with the associated job parameters(run_after, retry_policy, etc..)
  • Workers with free task reservations will start and run job ASAP.

The jobs visible in the OpenAPI are dynamic added, even after startup. Newly added Workers Namespaces or registered Functions will be visible by simply refreshing the /docs page.

An important feature of EasyJobs is signature cloning of registered functions locally & for remote workers. This allows for immediate argument verification before the request is queued.

See Also - Adding Jobs to a Message Queue

See Producers - to review how to create jobs.

Job Life Cycle

  1. A Job is created if pulled from a Message Queue or added via the EasyJobsManager API.
  2. The Job is added to the jobs database and queued for worker consumtion.
  3. A Job is selected by a worker and invoked with the provided args / kwargs parameters( if any ).
  4. Job Failures result in triggering a retry followed by any task on_failure tasks ( if any ), then reported as failed to EasyJobsManager's results database / queue.
  5. Job Successes result in creating any task run_after tasks using the results of the last job, then reporting the results to EasyJobsManager's results database / queue.
  6. Results are stored within the EasyJobsManager Database

Consuming Results

When a job request is created via the API, a request_id is returned right-away.

A result can be consumed / querried via the Pull Job Result API. Pull implies Removal - Only with this in mind - otherwise use View Job Result

Pull Job Result will wait up to 5 seconds for a job to complete, returning the consumed result.

Creating Jobs via Message Queues

Jobs should be created in the following format, using json serializable data. If you can run json.dumps(data) on the data, you can use it in a job.

# Job Format 

job = {
    'namespace': 'name' # also known as queue 
    'name': 'name',
    'args': [args],
    'kwargs': {'kwarg': 'val'}
}

Tip: Think about how you would invoke he job if local, then create the syntax using a Producer.

When a Job is added ( either pulled from a broker, or pushed via producer) the job is first added to a persistent database, then added to a gloabal queue to be run by workers monitoring the queue.

Terminology

EasyJobsManager

  • responsible for pulling jobs from a broker
  • adds jobs to persistent database & global queue
  • provides workers access to global queue for pulling jobs
  • provides workers ability to store results to persistent database which can be pulled or pushed to a specificed message queue.
  • can act as a worker if task is defined locally within namespace
  • Should NOT be forked

Note: Work performed on a Manager should be as non-blocking as possible, since the main thread cannot be forked, long running / blocking code on a Manager will have adverse affects. When in doubt, put it on a separate worker.

EasyJobsWorker

  • Connects to a running EasyJobsManager and pulls jobs to run within a specified queue
  • Runs Jobs and pushes results back to EasyJobsManager
  • Process can be forked

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

easyjobs-0.119-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file easyjobs-0.119-py3-none-any.whl.

File metadata

  • Download URL: easyjobs-0.119-py3-none-any.whl
  • Upload date:
  • Size: 19.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.2.0 requests-toolbelt/0.9.1 tqdm/4.47.0 CPython/3.7.8

File hashes

Hashes for easyjobs-0.119-py3-none-any.whl
Algorithm Hash digest
SHA256 cbba3c2ab40912cbcc955e237784dc54da2f510a18e6e22ef35c2aba1b03802f
MD5 db69c3484eb232bbfb51c89429cb5a7b
BLAKE2b-256 2436dd95d4984ab9b8e3b3d4a56cde43486603c02873a2818d7a21dc32928a7e

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page