Skip to main content

A remotetask client utiltiy for zope 3

Project description

Remote Task Execution

This package provides an implementation of a remote task execution Web service that allows to execute pre-defined tasks on another server. It is also possible to run cron jobs at specific times. Those services are useful in two ways:

  1. They enable us to complete tasks that are not natively available on a particular machine. For example, it is not possible to convert an AVI file to a Flash(R) movie using Linux, the operating system our Web server might run on.

  2. They also allow to move expensive operations to other servers. This is valuable, for example, when converting videos on high-traffic sites.

Installation

Define the remotetasks that should be started on startup in zope.conf like this:

<product-config lovely.remotetask>
  autostart site1@TestTaskService1, site2@TestTaskService2, @RootTaskService
</product-config>

Note that services registered directly in the root folder can be referred to by just prefixing them with the @ symbol. The site name can be omitted. An example of this is RootTaskService referenced above.

This causes the Remotetasks being started upon zope startup.

Usage

>>> STOP_SLEEP_TIME = 0.02

Let’s now start by creating a single service:

>>> from lovely import remotetask
>>> service = remotetask.TaskService()

The object should be located, so it gets a name:

>>> from zope.app.folder import Folder
>>> site1 = Folder()
>>> root['site1'] = site1
>>> from zope.app.component.site import LocalSiteManager
>>> from zope.security.proxy import removeSecurityProxy
>>> sm = LocalSiteManager(removeSecurityProxy(site1))
>>> site1.setSiteManager(sm)
>>> sm['default']['testTaskService1'] = service
>>> service = sm['default']['testTaskService1'] # caution! proxy
>>> service.__name__
u'testTaskService1'
>>> service.__parent__ is sm['default']
True

Let’s register it under the name TestTaskService1:

>>> from zope import component
>>> from lovely.remotetask import interfaces
>>> sm = site1.getSiteManager()
>>> sm.registerUtility(service, interfaces.ITaskService,
...                          name='TestTaskService1')

We can discover the available tasks:

>>> service.getAvailableTasks()
{}

This list is initially empty, because we have not registered any tasks. Let’s now define a task that simply echos an input string:

>>> def echo(input):
...     return input
>>> import lovely.remotetask.task
>>> echoTask = remotetask.task.SimpleTask(echo)

The only API requirement on the converter is to be callable. Now we make sure that the task works:

>>> echoTask(service, 1, input={'foo': 'blah'})
{'foo': 'blah'}

Let’s now register the task as a utility:

>>> import zope.component
>>> zope.component.provideUtility(echoTask, name='echo')

The echo task is now available in the service:

>>> service.getAvailableTasks()
{u'echo': <SimpleTask <function echo ...>>}

Since the service cannot instantaneously complete a task, incoming jobs are managed by a queue. First we request the echo task to be executed:

>>> jobid = service.add(u'echo', {'foo': 'bar'})
>>> jobid
1392637175

The add() function schedules the task called “echo” to be executed with the specified arguments. The method returns a job id with which we can inquire about the job. By default the add() function adds and starts the job ASAP. Sometimes we need to have a jobid but not to start the job yet. See startlater.txt how.

>>> service.getStatus(jobid)
'queued'

Since the job has not been processed, the status is set to “queued”. Further, there is no result available yet:

>>> service.getResult(jobid) is None
True

As long as the job is not being processed, it can be cancelled:

>>> service.cancel(jobid)
>>> service.getStatus(jobid)
'cancelled'

The service isn’t being started by default:

>>> service.isProcessing()
False

The TaskService is being started automatically - if specified in zope.conf - as soon as the IDatabaseOpenedEvent is fired. Let’s emulate the zope.conf settings:

>>> class Config(object):
...     mapping = {}
...     def getSectionName(self):
...         return 'lovely.remotetask'
>>> config = Config()
>>> config.mapping['autostart'] = (
...     'site1@TestTaskService1, site2@TestTaskService2,@RootTaskService')
>>> from zope.app.appsetup.product import setProductConfigurations
>>> setProductConfigurations([config])
>>> from lovely.remotetask.service import getAutostartServiceNames
>>> getAutostartServiceNames()
['site1@TestTaskService1', 'site2@TestTaskService2', '@RootTaskService']

Note that RootTaskService is for a use-case where the service is directly registered at the root. We test this use-case in a separate footnote so that the flow of this document is not broken. [3]

To get a clean logging environment let’s clear the logging stack:

>>> log_info.clear()

On Zope startup the IDatabaseOpenedEvent is fired, and will call the bootStrap() method:

>>> from ZODB.tests import util
>>> import transaction
>>> db = util.DB()
>>> from zope.app.publication.zopepublication import ZopePublication
>>> conn = db.open()
>>> conn.root()[ZopePublication.root_name] = root
>>> transaction.commit()

Fire the event:

>>> from zope.app.appsetup.interfaces import DatabaseOpenedWithRoot
>>> from lovely.remotetask.service import bootStrapSubscriber
>>> event = DatabaseOpenedWithRoot(db)
>>> bootStrapSubscriber(event)

and voila - the service is processing:

>>> service.isProcessing()
True

Checking out the logging will prove the started service:

>>> print log_info
lovely.remotetask INFO
  handling event IStartRemoteTasksEvent
lovely.remotetask INFO
  service TestTaskService1 on site site1 started
lovely.remotetask ERROR
  site site2 not found
lovely.remotetask INFO
  service RootTaskService on site root started

The verification for the jobs in the root-level service is done in another footnote [4]

To deal with a lot of services in one sites it will be possible to use asterisks (*) to start services. In case of using site@* means start all services in that site:

But first stop all processing services:

>>> service.stopProcessing()
>>> service.isProcessing()
False
>>> root_service.stopProcessing()
>>> root_service.isProcessing()
False
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)

And reset the logger:

>>> log_info.clear()

Reset the product configuration with the asterisked service names:

>>> config.mapping['autostart'] = 'site1@*'
>>> setProductConfigurations([config])
>>> getAutostartServiceNames()
['site1@*']

Firing the event again will start all services in the configured site:

>>> bootStrapSubscriber(event)
>>> service.isProcessing()
True
>>> root_service.isProcessing()
False

Let’s checkout the logging:

>>> print log_info
lovely.remotetask INFO
  handling event IStartRemoteTasksEvent
lovely.remotetask INFO
  service TestTaskService1 on site site1 started

To deal with a lot of services in a lot of sites it possible to use asterisks (*) to start services. In case of using @ means start all services on all sites:

>>> service.stopProcessing()
>>> service.isProcessing()
False
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)

Reset the product configuration with the asterisked service names:

>>> config.mapping['autostart'] = '*@*'
>>> setProductConfigurations([config])
>>> getAutostartServiceNames()
['*@*']

…and reset the logger:

>>> log_info.clear()

And fire the event again. All services should be started now:

>>> bootStrapSubscriber(event)
>>> service.isProcessing()
True
>>> root_service.isProcessing()
True

Let’s check the logging:

>>> print log_info
lovely.remotetask INFO
  handling event IStartRemoteTasksEvent
lovely.remotetask INFO
  service RootTaskService on site root started
lovely.remotetask INFO
  service TestTaskService1 on site site1 started

To deal with a specific service in a lot of sites it possible to use asterisks (*) to start services. In case of using *@service means start the service called service on all sites:

>>> service.stopProcessing()
>>> service.isProcessing()
False
>>> root_service.stopProcessing()
>>> root_service.isProcessing()
False
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)

Reset the product configuration with the asterisked service names:

>>> config.mapping['autostart'] = '*@TestTaskService1'
>>> setProductConfigurations([config])
>>> getAutostartServiceNames()
['*@TestTaskService1']

…and reset the logger:

>>> log_info.clear()

And fire the event again. All services should be started now:

>>> bootStrapSubscriber(event)
>>> service.isProcessing()
True
>>> root_service.isProcessing()
False

Let’s checkout the logging:

>>> print log_info
lovely.remotetask INFO
  handling event IStartRemoteTasksEvent
lovely.remotetask INFO
  service TestTaskService1 on site site1 started

In case of configuring a directive which does not match any service on any site logging will show a warning message:

>>> service.stopProcessing()
>>> service.isProcessing()
False
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)
>>> config.mapping['autostart'] = '*@Foo'
>>> setProductConfigurations([config])
>>> getAutostartServiceNames()
['*@Foo']
>>> log_info.clear()
>>> bootStrapSubscriber(event)
>>> service.isProcessing()
False
>>> root_service.isProcessing()
False
>>> print log_info
lovely.remotetask INFO
  handling event IStartRemoteTasksEvent
lovely.remotetask WARNING
  no services started by directive *@Foo

Finally stop processing and kill the thread. We’ll call service.process() manually as we don’t have the right environment in the tests.

>>> service.stopProcessing()
>>> service.isProcessing()
False
>>> root_service.stopProcessing()
>>> root_service.isProcessing()
False
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)

Let’s now read a job:

>>> jobid = service.add(u'echo', {'foo': 'bar'})
>>> service.process()
>>> service.getStatus(jobid)
'completed'
>>> service.getResult(jobid)
{'foo': 'bar'}

Now, let’s define a new task that causes an error:

>>> def error(input):
...     raise remotetask.task.TaskError('An error occurred.')
>>> zope.component.provideUtility(
...     remotetask.task.SimpleTask(error), name='error')

Now add and execute it:

>>> jobid = service.add(u'error')
>>> service.process()

Let’s now see what happened:

>>> service.getStatus(jobid)
'error'
>>> service.getError(jobid)
'An error occurred.'

For management purposes, the service also allows you to inspect all jobs:

>>> dict(service.jobs)
{1392637176: <Job 1392637176>, 1392637177: <Job 1392637177>, 1392637175: <Job 1392637175>}

To get rid of jobs not needed anymore one can use the clean method.

>>> jobid = service.add(u'echo', {'blah': 'blah'})
>>> sorted([job.status for job in service.jobs.values()])
['cancelled', 'completed', 'error', 'queued']
>>> service.clean()
>>> sorted([job.status for job in service.jobs.values()])
['queued']

Cron jobs

Cron jobs execute on specific times.

>>> import time
>>> from lovely.remotetask.job import CronJob
>>> now = 0
>>> time.gmtime(now)
(1970, 1, 1, 0, 0, 0, 3, 1, 0)

We set up a job to be executed once an hour at the current minute. The next call time is the one our from now.

Minutes

>>> cronJob = CronJob(-1, u'echo', (), minute=(0, 10))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 1, 0, 10, 0, 3, 1, 0)
>>> time.gmtime(cronJob.timeOfNextCall(10*60))
(1970, 1, 1, 1, 0, 0, 3, 1, 0)

Hour

>>> cronJob = CronJob(-1, u'echo', (), hour=(2, 13))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 1, 2, 0, 0, 3, 1, 0)
>>> time.gmtime(cronJob.timeOfNextCall(2*60*60))
(1970, 1, 1, 13, 0, 0, 3, 1, 0)

Month

>>> cronJob = CronJob(-1, u'echo', (), month=(1, 5, 12))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 5, 1, 0, 0, 0, 4, 121, 0)
>>> time.gmtime(cronJob.timeOfNextCall(cronJob.timeOfNextCall(0)))
(1970, 12, 1, 0, 0, 0, 1, 335, 0)

Day of week [0..6], jan 1 1970 is a wednesday.

>>> cronJob = CronJob(-1, u'echo', (), dayOfWeek=(0, 2, 4, 5))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 2, 0, 0, 0, 4, 2, 0)
>>> time.gmtime(cronJob.timeOfNextCall(60*60*24))
(1970, 1, 3, 0, 0, 0, 5, 3, 0)
>>> time.gmtime(cronJob.timeOfNextCall(2*60*60*24))
(1970, 1, 5, 0, 0, 0, 0, 5, 0)
>>> time.gmtime(cronJob.timeOfNextCall(4*60*60*24))
(1970, 1, 7, 0, 0, 0, 2, 7, 0)

DayOfMonth [1..31]

>>> cronJob = CronJob(-1, u'echo', (), dayOfMonth=(1, 12, 21, 30))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 12, 0, 0, 0, 0, 12, 0)
>>> time.gmtime(cronJob.timeOfNextCall(12*24*60*60))
(1970, 1, 21, 0, 0, 0, 2, 21, 0)

Combined

>>> cronJob = CronJob(-1, u'echo', (), minute=(10,),
...                                 dayOfMonth=(1, 12, 21, 30))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 1, 0, 10, 0, 3, 1, 0)
>>> time.gmtime(cronJob.timeOfNextCall(10*60))
(1970, 1, 1, 1, 10, 0, 3, 1, 0)
>>> cronJob = CronJob(-1, u'echo', (), minute=(10,),
...                                 hour=(4,),
...                                 dayOfMonth=(1, 12, 21, 30))
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 1, 4, 10, 0, 3, 1, 0)
>>> time.gmtime(cronJob.timeOfNextCall(10*60))
(1970, 1, 1, 4, 10, 0, 3, 1, 0)

A cron job can also be used to delay the execution of a job.

>>> cronJob = CronJob(-1, u'echo', (), delay=10,)
>>> time.gmtime(cronJob.timeOfNextCall(0))
(1970, 1, 1, 0, 0, 10, 3, 1, 0)
>>> time.gmtime(cronJob.timeOfNextCall(1))
(1970, 1, 1, 0, 0, 11, 3, 1, 0)

Creating Delayed Jobs

A delayed job is executed once after the given delay time in seconds.

>>> count = 0
>>> def counting(input):
...     global count
...     count += 1
...     return count
>>> countingTask = remotetask.task.SimpleTask(counting)
>>> zope.component.provideUtility(countingTask, name='counter')
>>> jobid = service.addCronJob(u'counter',
...                            {'foo': 'bar'},
...                            delay = 10,
...                           )
>>> service.getStatus(jobid)
'delayed'
>>> service.process(0)
>>> service.getStatus(jobid)
'delayed'
>>> service.process(9)
>>> service.getStatus(jobid)
'delayed'

At 10 seconds the job is executed and completed.

>>> service.process(10)
>>> service.getStatus(jobid)
'completed'

Creating Cron Jobs

Here we create a cron job which runs 10 minutes and 13 minutes past the hour.

>>> count = 0
>>> jobid = service.addCronJob(u'counter',
...                            {'foo': 'bar'},
...                            minute = (10, 13),
...                           )
>>> service.getStatus(jobid)
'cronjob'

We process the remote task but our cron job is not executed because we are too early in time.

>>> service.process(0)
>>> service.getStatus(jobid)
'cronjob'
>>> service.getResult(jobid) is None
True

Now we run the remote task 10 minutes later and get a result.

>>> service.process(10*60)
>>> service.getStatus(jobid)
'cronjob'
>>> service.getResult(jobid)
1

And 1 minutes later it is not called.

>>> service.process(11*60)
>>> service.getResult(jobid)
1

But 3 minutes later it is called again.

>>> service.process(13*60)
>>> service.getResult(jobid)
2

A job can be rescheduled.

>>> job = service.jobs[jobid]
>>> job.update(minute = (11, 13))

After the update the job must be rescheduled in the service.

>>> service.reschedule(jobid)

Now the job is not executed at the old registration minute which was 10.

>>> service.process(10*60+60*60)
>>> service.getResult(jobid)
2

But it executes at the new minute which is set to 11.

>>> service.process(11*60+60*60)
>>> service.getResult(jobid)
3

Threading behavior

Each task service runs in a separate thread, allowing them to operate independently. Tasks should be designed to avoid conflict errors in the database.

Let’s start the task services we have defined at this point, and see what threads are running as a result:

>>> service.startProcessing()
>>> root_service.startProcessing()
>>> import pprint
>>> import threading
>>> def show_threads():
...     threads = [t for t in threading.enumerate()
...                if t.getName().startswith('remotetasks.')]
...     threads.sort(key=lambda t: t.getName())
...     pprint.pprint(threads)
>>> show_threads()
[<Thread(remotetasks.rootTaskService, started daemon)>,
 <Thread(remotetasks.site1.++etc++site.default.testTaskService1, started daemon)>]

Let’s add a second site containing a task service with the same name as the service in the first site:

>>> site2 = Folder()
>>> service2 = remotetask.TaskService()
>>> root['site2'] = site2
>>> sm = LocalSiteManager(removeSecurityProxy(site2))
>>> site2.setSiteManager(sm)
>>> sm['default']['testTaskService1'] = service2
>>> service2 = sm['default']['testTaskService1'] # caution! proxy

Let’s register it under the name TestTaskService1:

>>> sm = site2.getSiteManager()
>>> sm.registerUtility(
...     service2, interfaces.ITaskService, name='TestTaskService1')

The service requires that it’s been committed to the database before it can be used:

>>> transaction.commit()

The new service isn’t currently processing:

>>> service2.isProcessing()
False

If we start the new service, we can see that there are now three background threads:

>>> service2.startProcessing()
>>> show_threads()
[<Thread(remotetasks.rootTaskService, started daemon)>,
 <Thread(remotetasks.site1.++etc++site.default.testTaskService1, started daemon)>,
 <Thread(remotetasks.site2.++etc++site.default.testTaskService1, started daemon)>]

Let’s stop the services, and give the background threads a chance to get the message:

>>> service.stopProcessing()
>>> service2.stopProcessing()
>>> root_service.stopProcessing()
>>> import time
>>> time.sleep(STOP_SLEEP_TIME)

The threads have exited now:

>>> print [t for t in threading.enumerate()
...        if t.getName().startswith('remotetasks.')]
[]

Footnotes

Check Interfaces and stuff

>>> from zope.interface.verify import verifyClass, verifyObject
>>> verifyClass(interfaces.ITaskService, remotetask.TaskService)
True
>>> verifyObject(interfaces.ITaskService, service)
True
>>> interfaces.ITaskService.providedBy(service)
True
>>> from lovely.remotetask.job import Job
>>> fakejob = Job(1, u'echo', {})
>>> verifyClass(interfaces.IJob, Job)
True
>>> verifyObject(interfaces.IJob, fakejob)
True
>>> interfaces.IJob.providedBy(fakejob)
True
>>> fakecronjob = CronJob(1, u'echo', {})
>>> verifyClass(interfaces.ICronJob, CronJob)
True
>>> verifyObject(interfaces.ICronJob, fakecronjob)
True
>>> interfaces.IJob.providedBy(fakecronjob)
True

Changes for lovely.remotetask

0.5.2 (2010-04-30)

  • Removed unnecessary version requirement for dependency zope.publisher.

0.5.1 (2010-04-14)

  • Convert logged exceptions to str because log messages should be strings.

0.5 (2009-09-10)

  • Fixed a bug with SimpleProcessor: if the job aborted the transaction, it would never be removed from the queue, but re-tried over and over again.

2009/05/20 (0.4):

  • Randomized the generation of new job ids like intid does it: Try to allocate sequential ids so they fall into the same BTree bucket, and randomize if stumble upon a used one.

2009/04/05 (0.3):

  • Use dropdown widget with available tasks in the cron job adding form, instead of text input.

  • Remove dependency on zope.app.zapi by using its wrapped api directly.

  • Use ISite from zope.location instead of zope.app.component

  • Use zc.queue.Queue instead of zc.queue.PersistentQueue because PersistentQueue is only to be used by the CompositeQueue.

  • Changed URL to pypi.

  • Using the correct plural form of status (which is status) in ITaskService.clean

2008/11/07 0.2.15a1:

  • running could cause an AttributeError. added handling for it

2008/02/08 0.2.14:

  • commiting after each 100 jobs during ‘clearAll’ to avoid browser timeouts while canceling a huge amount of jobs

2008/01/28 (new):

  • Some bugs smashed, improved tests.

  • Added startLater to TaskService.add. See startlater.txt for more info. This facilitates to separate jobb add and start timepoints. (Not cron-like)

2007/12/?? (new):

  • Switched index to Zope 3.4 KGS, so that we agree on used package versions.

  • Made the sleep time of the processor variable; this is needed for testing, so that the testing framework is not faster than the processor shutting down.

  • Added a small optimization to isProcessing() to stop looking through the threads once one with the correct name has been found.

2007/11/12 0.2.13:

  • added “cancel all” button

  • fixed bug in associating threads with task service instances

2007/10/28 0.2.12:

  • make the startup more robust If an already registered task service is remove via ZMI it’s registration is not removed. If this happens zope can no longer be restarted if autostart is used.

2007/10/28 0.2.11:

  • allow ‘*’ to select all possible times in the cron job add/edit forms

  • allow to cancel a delayed job

2007/10/24 0.2.10:

  • avoided deprecation warnings

2007/10/08 0.2.9:

  • don’t push a cron job back into the queue if it’s status is ERROR

2007/10/08 0.2.8:

  • enhanced logging during startup

2007/10/02 0.2.7:

  • added index to buildout.cfg

  • enhanced autostart behaviour: Services can be started like: site@*, @service and *@

2007/08/07 0.2.6:

  • fix bug in sorting that causes column headers to never be clickable

2007/08/07 0.2.5:

  • no longer require session support for “Jobs” ZMI view

2007/08/06 0.2.4:

  • fix bug that caused processing thread to keep the process alive unnecessarily

2007/07/26 0.2.3:

  • Now handles the use-case where a task service is registered directly at the root. References to such services in the product configuration must begin with @ instead of the <sitename>@.

2007/07/02 0.2.2:

  • ZMI menu to add cron jobs to a task service

  • named detail views can be registered for jobs specific to the task

  • edit view for cron jobs

  • improved ZMI views

  • catch exception if a job was added for which there is no task registered

  • fixed tests to work in all timezones

2007/06/12 0.2.1:

  • Do not raise IndexError because of performance problems with tracebacks when using eggs.

2007/06/12 0.2.0:

  • added namespace declaration in lovely/__init__.py

  • allow to delay a job

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lovely.remotetask-0.5.2.tar.gz (48.8 kB view details)

Uploaded Source

File details

Details for the file lovely.remotetask-0.5.2.tar.gz.

File metadata

File hashes

Hashes for lovely.remotetask-0.5.2.tar.gz
Algorithm Hash digest
SHA256 6d7fde948d1205c6438d19db22d14833fcbfdbdae1ec4aef39cea743dbc8ce81
MD5 2e7f788bbb7afd841d985a51c27a42de
BLAKE2b-256 3bd9d729b5443e5273ff7631bf01b01a6df374a3d793114a4b2ee61294149800

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page