Asynchronous jobs worker for TurboGears2
AsyncJob is a TurboGears2 extension made to handle background/synchronous jobs. Permits to quickly return responses to the user while the system performs more work on background, it can be useful for video transcoding, thumbnails generation or other tasks where the user cannot expect the require time before getting an answer.
To perform a task in background simply perform:
from tgext.asyncjob import asyncjob_perform asyncjob_perform(callable, arg1, arg2, kwarg=value)
tgext.asyncjob can be installed both from pypi or from bitbucket:
should just work for most of the users
In your application config/app_cfg.py append the following lines:
import tgext.asyncjob tgext.asyncjob.plugme(base_config)
Or use the pluggable application interface if tgext.pluggable is available:
from tgext.pluggable import plug plug(base_config, 'tgext.asyncjob')
You can pass the Globals object itself to the plug function:
plug(base_config, 'tgext.asyncjob', app_globals=app_globals)
which will be used to store the tasks queue, otherwise asyncjob will autodetect the Globals object from the call stack frame getting the object inside where it has been called.
Performing background tasks
To perform a background task you can simply use tgext.asyncjob.asyncjob_perform it called from any context where there is a valid request it will perform the callable passed as first argument in background with the parameters provided:
from tgext.asyncjob import asyncjob_perform def background_task(number): print number*2 asyncjob_perform(background_task, 5)
Tracking Tasks Progress
asyncjob traces tasks status and permits to update it to implement progress bars or other kind of user reporting of long running operations. Each time you call asyncjob_perform it will return an unique id for the action you just scheduled that can be used to retrieve the task status anytime.
You can update progress status anytime from inside the background task itself by calling asyncjob_set_progress(value, data) the value argument is expected to be an int value while the second optional data argument can be anything that you might want to get back later.
To retrieve the progress status you can call asyncjob_get_progress passing it the id of the task for which you want to fetch status. Returned value is a 2 items tuple with the first entry being a the numeric value and the second one being the data you passed to asyncjob_set_progress. If the task has completed it will return None, if it has not yet started you will get (-1, None)
Progress tracking example:
from tgext.asyncjob import asyncjob_perform, asyncjob_get_progress, asyncjob_set_progress @expose() def controller_method(self): def async_action(): for i in range(5): asyncjob_set_progress(i) time.sleep(1) taskid = asyncjob_perform(async_action) return redirect(url('/state', uid=taskid)) @expose() def state(self, uid): state = asyncjob_get_progress(uid) if not state: return 'Job Completed' elif state < 0: return 'Not yet started' else: return str(state)
Progress Tracking with Multiple Processes
When using multiple processes to serve your web application, the default task tracking system won’t be able to track the process state between different processes.
To solve this issue you must rely on an alternative progress tracker that stores the state in a shared storage. By default the tgext.asyncjob.tracker.redisdb.RedisProgressTracker is provided, which stores state in a Redis database.
Using it is as simple as:
from tgext.asyncjob import start_async_worker tgext.asyncjob.tracker.redisdb import RedisProgressTracker class Globals(object): def __init__(self): start_async_worker(progress_tacker=RedisProgressTracker(host='localhost'))
The RedisProgressTracker class accepts host, port, db and timeout arguments which set the database connection options and the timeout of progress tracking keys (to avoid leaving stray keys behind). By default db 15 with a timeout of 1 hour is used.
Custom Progress Trackers
Writing a custom progress tracker is as simple as providing a class with following methods:
class MyProgressTracker(object): def track(self, entryid): pass def remove(self, entryid): pass def set_progress(self, entryid, value, message): pass def get_progress(self, entryid): pass
Your custom progress tracker can then be passed as argument to start_async_worker to store status on SQL database, MongoDB or any other system.
Refer to tgext.asyncjob.tracker.redisdb.RedisProgressTracker or tgext.asyncjob.tracker.memory.MemoryProgressTracker for implementation examples.
Accessing the database
By default asyncjob manages SQLAlchemy sessions and transactions by itself. Each background task is encapsulated in a transaction which is reverted in case of any exception.
AsyncJob uses its own SQLAlchemy session, so never pass an object already bound to another session. Query them again.
The only issue that developers might have to keep in mind is that when looking for objects that they just created before starting the background task, they might not yet be available inside the DB. To avoid this issue asyncjob provides asyncjob_timed_query which will perform a query looking for a result until the result itself is found or a timeout is reached (by default 60 seconds).
This can be used to fetch back objects created before starting the background task waiting for them to appear on the database:
from tgext.asyncjob import asyncjob_perform, asyncjob_timed_query @expose() def controller_method(self): def async_query_action(group_id): group = asyncjob_timed_query(DBSession.query(Group).filter_by(group_id=group_id)).first() group.display_name = 'Prova' g = Group(group_name='test_group') DBSession.add(g) DBSession.flush() asyncjob_perform(async_query_action, g.group_id) return 'OK'
To change the timeout you can simply pass different retries and interval parameters to asyncjob_timed_query:
asyncjob_timed_query(DBSession.query(Group).filter_by(group_id=group_id), retries=10, interval=6).first()