Skip to main content

Asynchronous jobs worker for TurboGears2

Project description

About AsyncJob

AsyncJob is a TurboGears2 extension made to handle background/synchronous jobs. Permits to quickly return responses to the user while the system performs more work on background, it can be useful for video transcoding, thumbnails generation or other tasks where the user cannot expect the require time before getting an answer.

To perform a task in background simply perform:

from tgext.asyncjob import asyncjob_perform
asyncjob_perform(callable, arg1, arg2, kwarg=value)

Installing

tgext.asyncjob can be installed both from pypi or from bitbucket:

easy_install tgext.asyncjob

should just work for most of the users

Enabling AsyncJob

In your application lib/app_globals.py import start_async_worker:

from tgext.asyncjob import start_async_worker

And call it inside the __init__:

class Globals(object):
    def __init__(self):
        start_async_worker()

You can pass the Globals object itself to the start_async_worker function, which will be used to store the tasks queue, otherwise asyncjob will autodetect the Globals object from the call stack frame getting the object inside where it has been called.

Performing background tasks

To perform a background task you can simply use tgext.asyncjob.asyncjob_perform it called from any context where there is a valid request it will perform the callable passed as first argument in background with the parameters provided:

from tgext.asyncjob import asyncjob_perform

def background_task(number):
    print number*2

asyncjob_perform(background_task, 5)

Tracking Tasks Progress

asyncjob traces tasks status and permits to update it to implement progress bars or other kind of user reporting of long running operations. Each time you call asyncjob_perform it will return an unique id for the action you just scheduled that can be used to retrieve the task status anytime.

You can update progress status anytime from inside the background task itself by calling asyncjob_set_progress(value, data) the value argument is expected to be an int value while the second optional data argument can be anything that you might want to get back later.

To retrieve the progress status you can call asyncjob_get_progress passing it the id of the task for which you want to fetch status. Returned value is a 2 items tuple with the first entry being a the numeric value and the second one being the data you passed to asyncjob_set_progress. If the task has completed it will return None, if it has not yet started you will get (-1, None)

Progress tracking example:

from tgext.asyncjob import asyncjob_perform, asyncjob_get_progress, asyncjob_set_progress

@expose()
def controller_method(self):
    def async_action():
        for i in range(5):
            asyncjob_set_progress(i)
            time.sleep(1)

    taskid = asyncjob_perform(async_action)
    return redirect(url('/state', uid=taskid))

@expose()
def state(self, uid):
    state = asyncjob_get_progress(uid)
    if not state:
        return 'Job Completed'
    elif state[0] < 0:
        return 'Not yet started'
    else:
        return str(state[0])

Accessing the database

By default asyncjob manages SQLAlchemy sessions and transactions by itself. Each background task is encapsulated in a transaction which is reverted in case of any exception.

AsyncJob uses its own SQLAlchemy session, so never pass an object already bound to another session. Query them again.

The only issue that developers might have to keep in mind is that when looking for objects that they just created before starting the background task, they might not yet be available inside the DB. To avoid this issue asyncjob provides asyncjob_timed_query which will perform a query looking for a result until the result itself is found or a timeout is reached (by default 60 seconds).

This can be used to fetch back objects created before starting the background task waiting for them to appear on the database:

from tgext.asyncjob import asyncjob_perform, asyncjob_timed_query

@expose()
def controller_method(self):
    def async_query_action(group_id):
        group = asyncjob_timed_query(DBSession.query(Group).filter_by(group_id=group_id)).first()
        group.display_name = 'Prova'

    g = Group(group_name='test_group')
    DBSession.add(g)
    DBSession.flush()
    asyncjob_perform(async_query_action, g.group_id)
    return 'OK'

To change the timeout you can simply pass different retries and interval parameters to asyncjob_timed_query:

asyncjob_timed_query(DBSession.query(Group).filter_by(group_id=group_id),
                     retries=10, interval=6).first()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tgext.asyncjob-0.2.tar.gz (4.8 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page