Skip to main content

Remote processing queue for Zope3

Project description

This package provides a remote processing queue for Zope3 using the mongodb instead of ZODB.

README

This package offers a remote processor. This remote processor is implemented as a simple object using the mongodb as storage. The processor can execute pre defined jobs in another thread. It is also possible to run jobs at specific time using the different scheduler items.

The RemoteProcessor uses two different processor. One processes jobs and the other pickes items from the scheduler and is adding jobs. This separation is usefull if you implement a distributed concept. This means one or more application can schedule job items based on the given scheduler items. And another application is processing jobs and doesn’t know about how to scheduling next items.

Since we use this remote scheduler for low CPU intensive jobs, we offer multi processing. This is done by running more then one worker in the main worker thread. If you use subprocess for your job processing, you will get a real multiprocessing processor which isn’t limited to the current python process.

You can configure the amount of threads which a job worker can start in the remote processor. See jobWorkerArguments/maxThreads. By default this number uses the amount of CPU installed on your machine.

The implementation uses a mongodb as a storage for it’s component. This means jobs, job factories and scheduler items get stored in the mongodb using the ORM concept given from m01.mongo.

See p01.remote for a ZODB based remote processor implementation but take care the p01.remote implementation doesn’t provide the worker and scheduler processor separation. At least not yet.

Usage

>>> import transaction
>>> from pprint import pprint
>>> from m01.mongo import UTC
>>> import zope.component
>>> import m01.remote.job
>>> from m01.remote import testing

First register our error message adapter:

>>> testing.setUpJobErrorMessageAdapter()

Let’s now start by create two a remote processor. We can use our remote queue site implementation:

>>> from zope.security.proxy import removeSecurityProxy
>>> from m01.remote import interfaces

Our test remote processor should be available as application root:

>>> rp = root
>>> rp
<TestProcessor None>

Let’s discover the available jobs:

>>> dict(root._jobs)
{}

The job container is initially empty, because we have not added any job factory. Let’s now define a job factory that simply echos an input string:

>>> echoJob = testing.EchoJob({})

Now we can set the job input:

>>> echoJob.input = {'foo': u'blah'}

The only API requirement on the job is to be callable. Now we make sure that the job works. Note we call our job with the remote processor instance which is our initialized application root:

>>> echoJob(root)
{'foo': u'blah'}

Let’s add the job to the available job list:

>>> rp.addJobFactory(u'echo', echoJob)

The echo job is now available in the remote processor:

>>> dict(rp._jobFactories)
{u'echo': <EchoJob u'echo'>}

Since the remote processor cannot instantaneously complete a job, incoming jobs are managed by a queue. First we request the echo job to be executed:

>>> jobid1 = rp.addJob(u'echo', {'foo': 'bar'})
>>> jobid1
u'...'
>>> sorted([job.status for job in rp._jobs.values()])
[u'queued']

The addJob() function schedules the job called “echo” to be executed with the specified arguments. The method returns a job id with which we can inquire about the job. The addJob() function marks a job as queued.

>>> rp.getJobStatus(jobid1)
u'queued'

Since the job has not been processed, the status is set to “queued”. Further, there is no result available yet:

>>> rp.getJobResult(jobid1) is None
True

As long as the job is not being processed, it can be cancelled:

>>> rp.cancelJob(jobid1)
>>> rp.getJobStatus(jobid1)
u'cancelled'
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled']

The worker processor isn’t being started by default:

>>> rp.isProcessing
False

To get a clean logging environment let’s clear the logging stack:

>>> log_info.clear()

Now we can start the remote processor by calling startProcessor:

>>> rp.startProcessor()

and voila - the remote processor is processing:

>>> rp.isProcessing
True

Checking out the logging will prove the started remote processor:

>>> print log_info
m01.remote INFO
  Processor 'root-worker' started

Let’s stop the processor again:

>>> rp.stopProcessor()
>>> rp.isProcessing
False

Now let’s get a result from a processed job but first commit the new added job:

>>> jobid2 = rp.addJob(u'echo', {'foo': u'bar'})
>>> transaction.commit()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'queued']

Now create a worker and process the new jobs by calling our simple worker:

>>> class FakeWorker(object):
...
...     def __init__(self, rp):
...         self.rp = rp
...
...     def __call__(self):
...         try:
...             result = self.rp.processNextJob()
...             transaction.commit()
...         except Exception, error:
...             transaction.commit()
>>> worker = FakeWorker(rp)
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed']

First check if the job get processed:

>>> rp.getJobStatus(jobid2)
u'completed'
>>> rp.getJobResult(jobid2)
{u'foo': u'bar'}

Now, let’s define a new job that causes an error:

>>> errorJob = testing.ErrorJob()
>>> rp.addJobFactory(u'error', errorJob)

Now add and execute it:

>>> jobid3 = rp.addJob(u'error')
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error']

Let’s now see what happened:

>>> rp.getJobStatus(jobid3)
u'error'
>>> rp.getJobErrors(jobid3)
[u'[...-...-...T...:...:...+...:...] An error occurred.']

Try at also with a not so nice error:

>>> fatalJob = testing.FatalErrorJob()
>>> rp.addJobFactory(u'fatal', fatalJob)

Now add and execute it:

>>> jobid4 = rp.addJob(u'fatal')
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'queued']
>>> job4 = rp._jobs[jobid4]
>>> job4.retryCounter
1
>>> job4.status == u'queued'
True
>>> job4.errors
[u'[...-...-...T...:...:...+...:...] An error occurred.']

And process the job again but first set our retryTime to an outdated value which will simulate that time passes since our last call:

>>> import datetime
>>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC)
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'queued']
>>> job4 = rp._jobs[jobid4]
>>> job4.retryCounter
2
>>> job4.errors
[u'[...-...-...T...:...:...+...:...] An error occurred.',
 u'[...-...-...T...:...:...+...:...] An error occurred.']

And process the job again the 3rd time. Now it does not re-raise the exception but the error message get appended to the error list.

>>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC)
>>> transaction.commit()
>>> worker()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'error']

Let’s now see what happened:

>>> job4 = rp._jobs[jobid4]
>>> job4.retryCounter
3
>>> job4.status
u'error'
>>> rp.getJobStatus(jobid4)
u'error'
>>> job4.errors
[u'[...-...-...T...:...:...+...:...] An error occurred.',
 u'[...-...-...T...:...:...+...:...] An error occurred.',
 u'[...-...-...T...:...:...+...:...] An error occurred.']
>>> rp.getJobErrors(jobid4)
[u'[...-...-...T...:...:...+...:...] An error occurred.',
 u'[...-...-...T...:...:...+...:...] An error occurred.',
 u'[...-...-...T...:...:...+...:...] An error occurred.']

For management purposes, the remote processor also allows you to inspect all jobs:

>>> pprint(dict(rp._jobs))
{u'...': <EchoJob u'...' ...>,
 u'...': <EchoJob u'...' ...>,
 u'...': <ErrorJob u'...' ...>,
 u'...': <FatalErrorJob u'...' ...>}

To get rid of jobs not needed anymore we can use the reomveJobs method.

>>> jobid8 = rp.addJob(u'echo', {'blah': 'blah'})
>>> transaction.commit()
>>> sorted([job.status for job in rp._jobs.values()])
[u'cancelled', u'completed', u'error', u'error', u'queued']
>>> rp.removeJobs()
{u'cancelled': 1, u'completed': 1, u'error': 2}
>>> sorted([job.status for job in rp._jobs.values()])
[u'queued']

Now process the last pending job and make sure we do not get more jobs:

>>> rp.pullNextJob()
<EchoJob u'...' ...>

Threading behavior

Each remote processor runs in a separate thread, allowing them to operate independently. Jobs should be designed to avoid conflict errors.

Let’s start the remote processor we have defined at this point, and see what threads are running as a result:

>>> rp.startProcessor()

>>> import pprint
>>> import threading

>>> def show_threads():
...     threads = [t for t in threading.enumerate()
...                if t.getName().startswith('root')]
...     threads.sort(key=lambda t: t.getName())
...     pprint.pprint(threads)

>>> show_threads()
[<Thread(root-worker, started daemon ...)>]

Let’s stop the remote processor, and give the background threads a chance to get the message:

>>> rp.stopProcessor()

>>> import time
>>> time.sleep(2)

The threads have exited now:

>>> print [t for t in threading.enumerate()
...        if t.getName().startswith('root')]
[]

Job Workers

The actual processing of the jobs in a queue is handled by a spearate component, known as a job worker. This component usually runs in its own thread and provides its own main loop.

>>> import time
>>> import transaction

The worker module provides a job worker which executes one job at a time. Another worker is scheduling new jobs items beased on scheduler item settings. Let’s create the necessary components to test the job worker:

  1. Create the remote processor:

>>> from m01.remote import testing
>>> rp = root
>>> rp.isProcessing
False
>>> rp.isScheduling
False
  1. Register a job that simply sleeps and writes a message:

>>> data = {'retryDelay': 1}
>>> sleepJob = testing.SleepJob(data)
>>> rp.addJobFactory(u'sleep', sleepJob)

SimpleJobWorker

This worker executes one job at a time. It was designed for jobs that would take a long time and use up most of the processing power of a computer.

Let’s first register a few jobs:

>>> jobid1 = rp.addJob(u'sleep', (0.04, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (0.1,  2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0,    3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.08, 4))
>>> time.sleep(0.2)
>>> transaction.commit()

Now let’s first check if we can aceess the jobs:

>>> job = rp._jobs.get(jobid1)
>>> job
<SleepJob u'...' ...>

And let’s try if the job is ready for processing:

>>> rp.getJobStatus(jobid1)
u'queued'
>>> rp.getJobStatus(jobid2)
u'queued'
>>> rp.getJobStatus(jobid3)
u'queued'
>>> rp.getJobStatus(jobid4)
u'queued'

Let’s start by executing a job directly. The first argument to the simple worker constructor is the remote processor instance. All other arguments are optional and can be defined as worker rguments in the RemoteProcessor class, see jobWorkerArguments and schedulerWorkerArguments:

>>> from m01.remote.worker import SimpleJobWorker
>>> worker = SimpleJobWorker(rp, waitTime=0.0)

Let’s now process the first job. We clear the log and we also have to end any existing interactions in order to process the job in this thread:

>>> log_info.clear()
>>> from zope.security import management
>>> management.endInteraction()
>>> worker.doProcessNextJob()
True
>>> print log_info
m01.remote INFO
  Job: 1

Let’s now use the worker from within the remote processor. Since the worker constructors also accept additional arguments, they are specified as well:

>>> rp.jobWorkerFactory = SimpleJobWorker
>>> rp.jobWorkerFactory
<class 'm01.remote.worker.SimpleJobWorker'>
>>> rp.jobWorkerArguments
{'waitTime': 0.0}

The wait time has been set to zero for testing purposes only. It is really set to 1 second by default. Let’s now start processing jobs, wait a little bit for all the jobs to complete and then stop processing again:

>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)

The log shows that all jobs have been processed. But more importantly, they were all completed in the order they were defined. Note the first job get processed before we started the remote processor. And yes this means a remote processor can process jobs if the queue is not started. Starting a remote processor only means that the job get processed as jobs without to do it manualy.

>>> print log_info
m01.remote INFO
  Job: 1
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  Job: 2
m01.remote INFO
  Job: 3
m01.remote INFO
  Job: 4
m01.remote INFO
  Processor 'root-worker' stopped
>>> log_info.clear()

Transactions in jobs

With the SimpleJobWorker, jobs _should_ not change the transaction status, since both the administration of the jobs by the RemoteProcessor and the job itself run in the same transaction, so aborting it from inside the job could mess up the administrative part.

This is a regression test that aborting the transaction inside the job does not lead to an infinite loop (because SimpleJobWorker pulls the job inside the transaction, so if it is aborted, the job remains on the queue):

>>> testing.testCounter
0
>>> counter = 0
>>> data = {'counter': counter}
>>> abortJob = testing.TransactionAbortJob(data)
>>> rp.addJobFactory(u'abortJob', abortJob)
>>> jobid = rp.addJob(u'abortJob', (1))
>>> time.sleep(0.5)
>>> jobid = rp.addJob(u'abortJob', (2))
>>> transaction.commit()
>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> transaction.abort() # prevent spurious conflict errors
>>> testing.testCounter
2
>>> print log_info
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  Job: 1
m01.remote INFO
  Job: 2
m01.remote INFO
  Processor 'root-worker' stopped

Reset test counter

>>> testing.testCounter = 0

MultiJobProcessor

The multi-threaded job worker executes several jobs at once. It was designed for jobs that would take a long time but use very little processing power.

Let’s add a few new jobs to execute:

>>> jobid1 = rp.addJob(u'sleep', (0.04, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (1.0,  2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0,    3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.2,  4))
>>> time.sleep(0.2)
>>> transaction.commit()

Before testing the worker in the remote processor, let’s have a look at every method by itself. So we instantiate the worker:

>>> from m01.remote.worker import MultiJobWorker
>>> worker = MultiJobWorker(rp, waitTime=0, maxThreads=2)

The maximum amount of threads can be set as well:

>>> worker.maxThreads
2

All working threads can be reviewed at any time:

>>> worker.threads
[]
>>> from zope.security import management
>>> management.endInteraction()

Let’s pull a new job:

>>> job = worker.doPullNextJob()
>>> job
<SleepJob u'...' ...>

We need to pull a job before executing it, so that the database marks the job as processing and no new thread picks up the same job. As you can see the job get marked with the processing status:

>>> job.status
u'processing'

Once we pulled a particular job, we can process it:

>>> log_info.clear()
>>> print log_info
>>> worker.doProcessJob(job.__name__)
>>> print log_info
m01.remote INFO
  Job: 1

Let’s now have a look at using the processor in the task service. This primarily means setting the processor factory:

>>> management.newInteraction()
>>> rp.jobWorkerFactory = MultiJobWorker
>>> rp.jobWorkerArguments = {'waitTime': 1.0, 'maxThreads': 2}
>>> transaction.commit()
>>> log_info.clear()

Let’s now process the remaining jobs:

>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(1.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)

As you can see, this time the jobs are not completed in order anymore, because they all need different time to execute:

>>> print log_info
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 3
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 4
m01.remote INFO
  Job: 2
m01.remote INFO
  Processor 'root-worker' stopped

Let’s now set the thread limit to four and construct a new set of jobs that demonstrate that all jobs will run at the same time:

>>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 4}
>>> jobid1 = rp.addJob(u'sleep', (0.3, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (0.4, 2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0.1, 3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.5, 4))
>>> time.sleep(0.2)
>>> transaction.commit()

If all tasks are processed at once, job 3 should be done first. You can also see that the job 4 get processed ASAP even before the worker logs processing:

>>> log_info.clear()
>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(1.0)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> print log_info
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 3
m01.remote INFO
  Job: 1
m01.remote INFO
  Job: 2
m01.remote INFO
  Job: 4
m01.remote INFO
  Processor 'root-worker' stopped

Let’s now set the thread limit to two and construct a new set of jobs that demonstrate that not more than two threads run at the same time:

>>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 2}
>>> transaction.commit()
>>> jobid1 = rp.addJob(u'sleep', (0.3, 1))
>>> time.sleep(0.2)
>>> jobid2 = rp.addJob(u'sleep', (0.4, 2))
>>> time.sleep(0.2)
>>> jobid3 = rp.addJob(u'sleep', (0.2, 3))
>>> time.sleep(0.2)
>>> jobid4 = rp.addJob(u'sleep', (0.5, 4))
>>> time.sleep(0.2)
>>> transaction.commit()

If all tasks are processed at once, job 3 should be done first, but since the job has to wait for an available thread, it will come in third. We can now run the jobs and see the result:

>>> log_info.clear()
>>> rp.startProcessor()
>>> transaction.commit()
>>> time.sleep(1.5)
>>> rp.stopProcessor()
>>> transaction.commit()
>>> time.sleep(0.5)
>>> print log_info
m01.remote INFO
  Processor 'root-worker' started
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 1
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 2
m01.remote INFO
  MultiJobWorker: processing job ...
m01.remote INFO
  Job: 3
m01.remote INFO
  Job: 4
m01.remote INFO
  Processor 'root-worker' stopped

Scheduler

The scheduler concept is implemented as an additional scheduler container which contains scheduler items.

>>> from m01.mongo import UTC
>>> import m01.remote.scheduler
>>> from m01.remote import interfaces
>>> from m01.remote import testing

Usage

Let’s now start by get our test remote procesor which contains our scheduler container:

>>> remoteProcessor = root
>>> remoteProcessor
<TestProcessor None>
>>> scheduler = remoteProcessor._scheduler
>>> tuple(scheduler.values())
()

Delay

We can add a scheduler item for delay a job processing. Let’s add such an item:

>>> import datetime
>>> def getNextTime(dt, seconds):
...     return dt + datetime.timedelta(seconds=seconds)
>>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC)
>>> now10 = getNextTime(now, 10)
>>> delay = 10
>>> data = {'jobName': u'echo 1', 'active': True, 'delay': delay,
...         'retryDelay': 5, 'nextCallTime': now10}
>>> firstEcho = m01.remote.scheduler.Delay(data)
>>> interfaces.IDelay.providedBy(firstEcho)
True

The delay is set to 10:

>>> firstEcho.delay
10

and the retryDelay to 5

>>> firstEcho.retryDelay
5

and we set an explicit nextCallTime of now + 10:

>>> firstEcho.nextCallTime == getNextTime(now, 10)
True

and our retryTime is None:

>>> firstEcho.retryTime is None
True

Now we can add the delay item to the scheduler:

>>> scheduler.add(firstEcho)
u'...'

As you can see the scheduler contains on item:

>>> sorted(scheduler.values())
[<Delay ... for: u'echo 1'>]

As next we’ll test some scheduler AP methods. First check if we can update the retryTime for an item in our adding cache with updateRetryTime:

>>> scheduler.updateRetryTime(firstEcho.dump(), now)
False

As you can see we did not get a new retryTime. This happens because we didn’t use the correct callTime. Let’s try with the correct nextCallTime:

>>> now10 = getNextTime(now, 10)
>>> now15 = getNextTime(now, 15)
>>> retryTime = scheduler.updateRetryTime(firstEcho.dump(), now10)
>>> retryTime == now15
True

As you can see the new retryTime is using the retryDelay of 5 second. This retryTime is used for lock an item. This means an item get not picked as long as this time get passed.

Now let’ try another internal API method hihc is able to get the next item from our adding cache:

>>> scheduler.getNextCachedItem(now)

As you can see the method didn’t return an item, let’s try with the next scheduled call time:

>>> nextCallTime = firstEcho.nextCallTime
>>> scheduler.getNextCachedItem(now10)
<Delay ... for: u'echo 1'>

As you can see the retryTime get set based on the nextCallTime and the retryDelay:

>>> firstEcho.retryTime == getNextTime(nextCallTime, 5)
True

Now the important part. Let’s test our method which is responsible for get a next item including items from mongo. This method uses the two methods above. Of corse with the current time we will not get any item:

>>> scheduler.pullNextSchedulerItem(now) is None
True

But now we need another nextCallTime because the previous call update the items nextCallTime. Let’s first check the nextCallTime:

>>> firstEcho.nextCallTime == now10
True

But as you can see, the retryTime is already set during our previous test. this means we only will get an item if we at least use a larger time if the retryTime:

>>> firstEcho.retryTime == now15
True
>>> scheduler.pullNextSchedulerItem(now10)
>>> scheduler.pullNextSchedulerItem(now15)
<Delay ... for: u'echo 1'>

Now, let’s check our scheduled item times:

>>> now20 = getNextTime(now15, 5)
>>> firstEcho.nextCallTime == now10
True

Note, our retryTime get calculated with the current call time and retryDelay. It whould not make sense if we whould use the callTime as retryTime calculation base:

>>> firstEcho.retryTime == now20
True

The method pullNextSchedulerItem returns a pending item or None since we don’t have one pending:

>>> scheduler.pullNextSchedulerItem(now) is None
True

Now let’s add a second scheduler item within some scheduler time:

>>> import datetime
>>> delay = 10
>>> data = {'jobName': u'echo 2', 'active': True, 'delay': delay,
...         'retryDelay': 5}
>>> secondEcho = m01.remote.scheduler.Delay(data)
>>> scheduler.add(secondEcho)
u'...'
>>> sorted(scheduler.values(), key=lambda x:(x.__name__, x.__name__))
[<Delay ... for: u'echo 1'>, <Delay ... for: u'echo 2'>]
>>> scheduler.remove(firstEcho)
>>> scheduler.remove(secondEcho)
>>> tuple(scheduler.values())
()

adjustCallTime

Before we test our cron item, let’s test test our method which can reset a given datetime to the smalles starting point e.g. if hours are given as a calculation base, we need to start counting within the first minute:

>>> from m01.remote.scheduler import adjustCallTime
>>> now = datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC)
>>> now
datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC)
>>> item = m01.remote.scheduler.Cron({'jobName': u'bar', 'minute': [5]})
>>> adjustCallTime(item, now)
datetime.datetime(2010, 10, 25, 16, 6, 0, 123, tzinfo=UTC)

Cron

A probably more interesting implementation is the cron scheduler item. This cron item can schedule jobs at a specific given time. Let’s setup such a cron item:

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5}
>>> cronItem = m01.remote.scheduler.Cron(data)

The cronItem provides the ISchedulerItem and ICron interface:

>>> interfaces.ISchedulerItem.providedBy(cronItem)
True
>>> interfaces.ICron.providedBy(cronItem)
True

As you can see the cron item also provides a retryDelay:

>>> cronItem.retryDelay
5

Let’s first explain how this works. The cron scheduler provides a next call time stamp. If the calculated next call time is smaller then the last call time, the cron scheduler item will calculate the new next call time and store them as nextCallTime and at the same time the previous nextCallTime get returnd. This will makes sure that we have a minimum of time calculation calls because each time a cron scheduler item get asked about the next call time the stored nextCallTime is used. The cron schdeuler item only calculates the next call time if the existing next call time is smaller then the given call time.

Now let’s test a cron as a scheduler item. Setup a simple corn item with a 5 minute period.

>>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC)
>>> now
datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)
>>> data = {'jobName': u'echo cron', 'active': True, 'retryDelay': 5,
...         'minute': [5], 'nextCallTime': now}
>>> cronEcho = m01.remote.scheduler.Cron(data)

Now add the item to the schdeuler:

>>> scheduler.add(cronEcho)
u'...'

As you can see, our cron item get scheduled based on the given nextCallTime:

>>> cronEcho.nextCallTime
datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)

the retrytime is empty

>>> cronEcho.retryTime is None
True

and the minute list contains our 5 minute:

>>> cronEcho.minute
[5]
>>> cronEcho.hour
[]
>>> cronEcho.dayOfMonth
[]
>>> cronEcho.month
[]
>>> cronEcho.dayOfWeek
[]

And the scheduler contains one cron item:

>>> tuple(scheduler.values())
(<Cron ... for: u'echo cron'>,)

Now we can get the job based on the jobName echo defined by our cron scheduler item if we call pullNextSchedulerItem.

>>> scheduler.pullNextSchedulerItem(now)
<Cron ... for: u'echo cron'>

During this call the retryTime get set based on the retryDelay:

>>> cronEcho.retryTime
datetime.datetime(2010, 10, 1, 0, 0, 5, tzinfo=UTC)

Now let’s test the the different cron settings. Note that we provide a list of values for minutes, hours, month, dayOfWeek and dayOfMonth. This means you can schedule a job for every 15 minutes if you will set the minutes to (0, 15, 30, 45) or if you like to set a job only each 15 minutes after an hour you can set minutes to (15,). If you will set more then one argument e.g. minute, hours or days etc. all arguments must fit the given time.

Let’s start with a cron scheduler for every first and second minute per hour. Normaly the corn scheduler item will set now int(time.time()) as nextCallTime value. For test our cron scheduler items, we use a explicit startTime value of 0 (zero):

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [0, 1]}
>>> cronItem = m01.remote.scheduler.Cron(data)

The next call time is set based on the given startTime value. This means the first call will be at 0 (zero) minute:

>>> cronItem.nextCallTime is None
True

Now let’s call getNextCallTime, as you can see we will get None as nextCallTime because we ddn’t set a nextCallTime during cron initialization and the nextCallTime is set to the next minute:

>>> cronItem.getNextCallTime(now) is None
True
>>> cronItem.nextCallTime
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)

Now let’s call getNextCallTime again, as you can see we will get the nextCallTime we calculated during object initialization which is the given call time and the nextCallTime is set to the next minute:

If we use a call time + 5 seconds, we still will get the cached next call time of 1 minute and we will not generate a new next call time since this time is already in the future:

>>> cronItem.getNextCallTime(getNextTime(now, 5))
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)
>>> cronItem.nextCallTime
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)

If we call the cron scheduler item with a call time equal or larger then our 1 minute delay from the cached next call time, we will get the cached call time as value as we whould get similar to a smaller call time (see sample above).

>>> cronItem.getNextCallTime(getNextTime(now, 65))
datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)
>>> cronItem.nextCallTime
datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)

All future calls with a smaller time then the nextCallTime will return the current nextCallTime and not calculate any new time.

>>> cronItem.getNextCallTime(getNextTime(now, 125))
datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)
>>> cronItem.getNextCallTime(getNextTime(now, 1*60*60))
datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)

Remember, getNextCallTime returns the previous calculated nextCallTime and the new calculated nextCallTime get stored as nextCallTime. For a simpler test output we define a test method which shows the time calculation:

Minutes

Let’s start testing the time tables.

>>> def getNextCallTime(cron, dt, seconds=None):
...     """Return stored and new calculated nextCallTime"""
...     if seconds is None:
...         callTime = dt
...     else:
...         callTime = getNextTime(dt, seconds)
...     nextCallTime = cron.getNextCallTime(callTime)
...     return '%s --> %s' % (nextCallTime, cron.nextCallTime)
>>> now = datetime.datetime(1970, 1, 1, 0, 3, 0, tzinfo=UTC)
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [0, 10], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)
>>> str(now)
'1970-01-01 00:03:00+00:00'
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 1)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 2*60)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 51*60)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 01:00:00+00:00'
>>> getNextCallTime(item, now, 55*60)
'1970-01-01 01:00:00+00:00 --> 1970-01-01 01:00:00+00:00'

Hour

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'hour': [2, 13], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 02:00:00+00:00'
>>> getNextCallTime(item, now, 2*60*60)
'1970-01-01 02:00:00+00:00 --> 1970-01-01 13:00:00+00:00'
>>> getNextCallTime(item, now, 4*60*60)
'1970-01-01 13:00:00+00:00 --> 1970-01-01 13:00:00+00:00'
>>> getNextCallTime(item, now, 13*60*60)
'1970-01-01 13:00:00+00:00 --> 1970-01-02 02:00:00+00:00'
>>> getNextCallTime(item, now, 15*60*60)
'1970-01-02 02:00:00+00:00 --> 1970-01-02 02:00:00+00:00'

Month

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'month': [1, 2, 5, 12], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-02-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 90*24*60*60)
'1970-02-01 00:03:00+00:00 --> 1970-05-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 120*24*60*60)
'1970-05-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 130*24*60*60)
'1970-12-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'
>>> getNextCallTime(item, now, 360*24*60*60)
'1970-12-01 00:03:00+00:00 --> 1971-01-01 00:03:00+00:00'

dayOfWeek [0..6]

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'dayOfWeek': [0, 2, 4, 5], 'nextCallTime':now}
>>> item = m01.remote.scheduler.Cron(data)

The current weekday of now is:

>>> now.weekday()
3

this means our nextCallTime should get changed using day 4 as our nextCallTime if we call them with now:

>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-02 00:03:00+00:00'

with a day more, we will get the weekday 4 (skip):

>>> getNextCallTime(item, now, 24*60*60)
'1970-01-02 00:03:00+00:00 --> 1970-01-03 00:03:00+00:00'

with another day more, we will get the weekday 5 (incr):

>>> getNextCallTime(item, now, 2*24*60*60)
'1970-01-03 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'

with another day more, we will get the weekday 6 (skip):

>>> getNextCallTime(item, now, 3*24*60*60)
'1970-01-05 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'

with another day more, we will get the weekday 0 (inc):

>>> getNextCallTime(item, now, 4*24*60*60)
'1970-01-05 00:03:00+00:00 --> 1970-01-07 00:03:00+00:00'

dayOfMonth [1..31]

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'dayOfMonth': [2, 12, 21, 30], 'nextCallTime': now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-02 00:00:00+00:00'
>>> getNextCallTime(item, now, 12*24*60*60)
'1970-01-02 00:00:00+00:00 --> 1970-01-21 00:00:00+00:00'
>>> getNextCallTime(item, now, 31*24*60*60)
'1970-01-21 00:00:00+00:00 --> 1970-02-02 00:00:00+00:00'

Combined

combine some attributes:

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [10], 'dayOfMonth': [1, 10, 20, 30],
...         'nextCallTime': now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'
>>> getNextCallTime(item, now, 10*60)
'1970-01-01 00:10:00+00:00 --> 1970-01-01 01:10:00+00:00'
>>> getNextCallTime(item, now, 10*24*60*60)
'1970-01-01 01:10:00+00:00 --> 1970-01-20 00:10:00+00:00'
>>> getNextCallTime(item, now, 20*24*60*60)
'1970-01-20 00:10:00+00:00 --> 1970-01-30 00:10:00+00:00'

another sample:

>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5,
...         'minute': [10], 'hour': [4], 'dayOfMonth': [1, 12, 21, 30],
...         'nextCallTime': now}
>>> item = m01.remote.scheduler.Cron(data)
>>> getNextCallTime(item, now)
'1970-01-01 00:03:00+00:00 --> 1970-01-01 04:10:00+00:00'
>>> getNextCallTime(item, now, 10*60)
'1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'
>>> getNextCallTime(item, now, 4*60*60)
'1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'
>>> getNextCallTime(item, now, 5*60*60)
'1970-01-01 04:10:00+00:00 --> 1970-01-12 04:10:00+00:00'

CHANGES

0.5.1 (2012-11-18)

  • added MANIFEST.in files

  • remove p01.i18n package dependency

  • allow to remove jobs with all stati

  • split scheduler and container and move scheduler part into mixin class

  • switch to bson import

  • reflect changes in getBatchData signature

  • fix dateime compare, round milliseconds

  • adjust different schema description, user the same message id as used in title

  • removed unused id

0.5.0 (2011-08-19)

  • initial release

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

m01.remote-0.5.1.zip (65.4 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page