Remote processing queue for Zope3
Project description
This package provides a remote processing queue for Zope3 using the mongodb instead of ZODB.
README
This package offers a remote processor. This remote processor is implemented as a simple object using the mongodb as storage. The processor can execute pre defined jobs in another thread. It is also possible to run jobs at specific time using the different scheduler items.
The RemoteProcessor uses two different processor. One processes jobs and the other pickes items from the scheduler and is adding jobs. This separation is usefull if you implement a distributed concept. This means one or more application can schedule job items based on the given scheduler items. And another application is processing jobs and doesn’t know about how to scheduling next items.
Since we use this remote scheduler for low CPU intensive jobs, we offer multi processing. This is done by running more then one worker in the main worker thread. If you use subprocess for your job processing, you will get a real multiprocessing processor which isn’t limited to the current python process.
You can configure the amount of threads which a job worker can start in the remote processor. See jobWorkerArguments/maxThreads. By default this number uses the amount of CPU installed on your machine.
The implementation uses a mongodb as a storage for it’s component. This means jobs, job factories and scheduler items get stored in the mongodb using the ORM concept given from m01.mongo.
See p01.remote for a ZODB based remote processor implementation but take care the p01.remote implementation doesn’t provide the worker and scheduler processor separation. At least not yet.
Setup
>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing
Let’s now start by create two a remote processor. We can use our remote queue site implementation:
>>> from zope.security.proxy import removeSecurityProxy >>> from m01.remote import interfaces
Our test remote processor should be available as application root:
>>> rp = root >>> rp <TestProcessor None>
Let’s discover the available jobs:
>>> dict(root._jobs) {}
The job container is initially empty, because we have not added any job factory. Let’s now define a job factory that simply echos an input string:
>>> echoJob = testing.EchoJob({})
Now we can set the job input:
>>> echoJob.input = {'foo': u'blah'}
The only API requirement on the job is to be callable. Now we make sure that the job works. Note we call our job with the remote processor instance which is our initialized application root:
>>> echoJob(root) {'foo': u'blah'}
Let’s add the job to the available job list:
>>> rp.addJobFactory(u'echo', echoJob)
The echo job is now available in the remote processor:
>>> dict(rp._jobFactories) {u'echo': <EchoJob u'echo'>}
Since the remote processor cannot instantaneously complete a job, incoming jobs are managed by a queue. First we request the echo job to be executed:
>>> jobid1 = rp.addJob(u'echo', {'foo': 'bar'}) >>> jobid1 u'...'>>> sorted([job.status for job in rp._jobs.values()]) [u'queued']
The addJob() function schedules the job called “echo” to be executed with the specified arguments. The method returns a job id with which we can inquire about the job. The addJob() function marks a job as queued.
>>> rp.getJobStatus(jobid1) u'queued'
Since the job has not been processed, the status is set to “queued”. Further, there is no result available yet:
>>> rp.getJobResult(jobid1) is None True
As long as the job is not being processed, it can be cancelled:
>>> rp.cancelJob(jobid1) >>> rp.getJobStatus(jobid1) u'cancelled'>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled']
The worker processor isn’t being started by default:
>>> rp.isProcessing False
To get a clean logging environment let’s clear the logging stack:
>>> logger.clear()
Now we can start the remote processor by calling startProcessor:
>>> rp.startProcessor()
and voila - the remote processor is processing:
>>> rp.isProcessing True
Checking out the logging will prove the started remote processor:
>>> print logger m01.remote INFO Processor 'root-worker' started
Let’s stop the processor again:
>>> rp.stopProcessor() >>> rp.isProcessing False
Now let’s get a result from a processed job but first commit the new added job:
>>> jobid2 = rp.addJob(u'echo', {'foo': u'bar'}) >>> transaction.commit()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'queued']
Now create a worker and process the new jobs by calling our simple worker:
>>> class FakeWorker(object): ... ... def __init__(self, rp): ... self.rp = rp ... ... def __call__(self): ... try: ... result = self.rp.processNextJob() ... transaction.commit() ... except Exception, error: ... transaction.commit()>>> worker = FakeWorker(rp) >>> worker()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed']
First check if the job get processed:
>>> rp.getJobStatus(jobid2) u'completed'>>> rp.getJobResult(jobid2) {u'foo': u'bar'}
Error handling
Now, let’s define a new job that causes an error:
>>> errorJob = testing.RemoteExceptionJob() >>> rp.addJobFactory(u'error', errorJob)
Now add and execute it:
>>> jobid3 = rp.addJob(u'error') >>> transaction.commit() >>> worker()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed', u'error']
Let’s now see what happened:
>>> rp.getJobStatus(jobid3) u'error' >>> errors = rp.getJobErrors(jobid3) >>> errors [<JobError u'...'>]
Such a JobError item provides the following data:
>>> error = tuple(errors)[0] >>> data = error.dump() >>> data = m01.mongo.dictify(data) >>> pprint(data) {'_id': ObjectId('...'), '_type': u'JobError', 'created': datetime.datetime(..., ..., ..., ..., ..., ..., ..., tzinfo=<bson.tz_util.FixedOffset object at ...>), 'tb': u"<p>Traceback (most recent call last):..."}
As you can see the traceback stored as tb is the most important information:
>>> print data['tb'] <p>Traceback (most recent call last):</p> <ul> <li> Module m01.remote.processor, line 297, in _processJob<br /> job.output = job(self)</li> <li> Module m01.remote.testing, line 86, in __call__<br /> raise exceptions.RemoteException('An error occurred.')</li> </ul><p>RemoteException: An error occurred.<br /> </p>
Try at also with a not so nice error:
>>> fatalJob = testing.FatalExceptionJob() >>> rp.addJobFactory(u'fatal', fatalJob)
Now add and execute it:
>>> jobid4 = rp.addJob(u'fatal') >>> transaction.commit() >>> worker()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed', u'error', u'queued']>>> job4 = rp._jobs[jobid4] >>> job4.retryCounter 1 >>> job4.status == u'queued' True>>> job4.errors [<JobError u'...'>]
And process the job again but first set our retryTime to an outdated value which will simulate that time passes since our last call:
>>> import datetime >>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC) >>> transaction.commit() >>> worker()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed', u'error', u'queued']>>> job4 = rp._jobs[jobid4] >>> job4.retryCounter 2>>> job4.errors [<JobError u'...'>, <JobError u'...'>]
And process the job again the 3rd time. Now it does not re-raise the exception but the error message get appended to the error list.
>>> job4.retryTime = datetime.datetime(2000, 1, 1, tzinfo=UTC) >>> transaction.commit() >>> worker()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed', u'error', u'error']
Let’s now see what happened:
>>> job4 = rp._jobs[jobid4] >>> job4.retryCounter 3>>> job4.status u'error'>>> rp.getJobStatus(jobid4) u'error'>>> job4.errors [<JobError u'...'>, <JobError u'...'>, <JobError u'...'>]>>> rp.getJobErrors(jobid4) [<JobError u'...'>, <JobError u'...'>, <JobError u'...'>]
For management purposes, the remote processor also allows you to inspect all jobs:
>>> pprint(dict(rp._jobs)) {u'...': <EchoJob u'...' ...>, u'...': <EchoJob u'...' ...>, u'...': <RemoteExceptionJob u'...' ...>, u'...': <FatalExceptionJob u'...' ...>}
To get rid of jobs not needed anymore we can use the reomveJobs method.
>>> jobid8 = rp.addJob(u'echo', {'blah': 'blah'}) >>> transaction.commit()>>> sorted([job.status for job in rp._jobs.values()]) [u'cancelled', u'completed', u'error', u'error', u'queued']>>> rp.removeJobs() {u'cancelled': 1, u'completed': 1, u'error': 2}>>> sorted([job.status for job in rp._jobs.values()]) [u'queued']
Now process the last pending job and make sure we do not get more jobs:
>>> rp.pullNextJob() <EchoJob u'...' ...>
Threading behavior
Each remote processor runs in a separate thread, allowing them to operate independently. Jobs should be designed to avoid conflict errors.
Let’s start the remote processor we have defined at this point, and see what threads are running as a result:
>>> rp.startProcessor() >>> import pprint >>> import threading >>> def show_threads(): ... threads = [t for t in threading.enumerate() ... if t.getName().startswith('root')] ... threads.sort(key=lambda t: t.getName()) ... pprint.pprint(threads) >>> show_threads() [<Thread(root-worker, started daemon ...)>]
Let’s stop the remote processor, and give the background threads a chance to get the message:
>>> rp.stopProcessor() >>> import time >>> time.sleep(2)
The threads have exited now:
>>> print [t for t in threading.enumerate() ... if t.getName().startswith('root')] []
Job Workers
The actual processing of the jobs in a queue is handled by a spearate component, known as a job worker. This component usually runs in its own thread and provides its own main loop.
>>> import time >>> import transaction
The worker module provides a job worker which executes one job at a time. Another worker is scheduling new jobs items beased on scheduler item settings. Let’s create the necessary components to test the job worker:
Create the remote processor:
>>> from m01.remote import testing >>> rp = root >>> rp.isProcessing False>>> rp.isScheduling False
Register a job that simply sleeps and writes a message:
>>> data = {'retryDelay': 1} >>> sleepJob = testing.SleepJob(data) >>> rp.addJobFactory(u'sleep', sleepJob)
SimpleJobWorker
This worker executes one job at a time. It was designed for jobs that would take a long time and use up most of the processing power of a computer.
Let’s first register a few jobs:
>>> jobid1 = rp.addJob(u'sleep', (0.04, 1)) >>> time.sleep(0.2) >>> jobid2 = rp.addJob(u'sleep', (0.1, 2)) >>> time.sleep(0.2) >>> jobid3 = rp.addJob(u'sleep', (0, 3)) >>> time.sleep(0.2) >>> jobid4 = rp.addJob(u'sleep', (0.08, 4)) >>> time.sleep(0.2) >>> transaction.commit()
Now let’s first check if we can aceess the jobs:
>>> job = rp._jobs.get(jobid1) >>> job <SleepJob u'...' ...>
And let’s try if the job is ready for processing:
>>> rp.getJobStatus(jobid1) u'queued'>>> rp.getJobStatus(jobid2) u'queued'>>> rp.getJobStatus(jobid3) u'queued'>>> rp.getJobStatus(jobid4) u'queued'
Let’s start by executing a job directly. The first argument to the simple worker constructor is the remote processor instance. All other arguments are optional and can be defined as worker rguments in the RemoteProcessor class, see jobWorkerArguments and schedulerWorkerArguments:
>>> from m01.remote.worker import SimpleJobWorker >>> worker = SimpleJobWorker(rp, waitTime=0.0)
Let’s now process the first job. We clear the log and we also have to end any existing interactions in order to process the job in this thread:
>>> logger.clear()>>> from zope.security import management >>> management.endInteraction()>>> worker.doProcessNextJob() True>>> print logger m01.remote INFO Job: 1
Let’s now use the worker from within the remote processor. Since the worker constructors also accept additional arguments, they are specified as well:
>>> rp.jobWorkerFactory = SimpleJobWorker >>> rp.jobWorkerFactory <class 'm01.remote.worker.SimpleJobWorker'>>>> rp.jobWorkerArguments {'waitTime': 0.0}
The wait time has been set to zero for testing purposes only. It is really set to 1 second by default. Let’s now start processing jobs, wait a little bit for all the jobs to complete and then stop processing again:
>>> rp.startProcessor() >>> transaction.commit()>>> time.sleep(0.5)>>> rp.stopProcessor() >>> transaction.commit()>>> time.sleep(0.5)
The log shows that all jobs have been processed. But more importantly, they were all completed in the order they were defined. Note the first job get processed before we started the remote processor. And yes this means a remote processor can process jobs if the queue is not started. Starting a remote processor only means that the job get processed as jobs without to do it manualy.
>>> print logger m01.remote INFO Job: 1 m01.remote INFO Processor 'root-worker' started m01.remote INFO Job: 2 m01.remote INFO Job: 3 m01.remote INFO Job: 4 m01.remote INFO Processor 'root-worker' stopped>>> logger.clear()
Transactions in jobs
With the SimpleJobWorker, jobs _should_ not change the transaction status, since both the administration of the jobs by the RemoteProcessor and the job itself run in the same transaction, so aborting it from inside the job could mess up the administrative part.
This is a regression test that aborting the transaction inside the job does not lead to an infinite loop (because SimpleJobWorker pulls the job inside the transaction, so if it is aborted, the job remains on the queue):
>>> testing.testCounter 0>>> counter = 0 >>> data = {'counter': counter} >>> abortJob = testing.TransactionAbortJob(data) >>> rp.addJobFactory(u'abortJob', abortJob) >>> jobid = rp.addJob(u'abortJob', (1)) >>> time.sleep(0.5) >>> jobid = rp.addJob(u'abortJob', (2)) >>> transaction.commit()>>> rp.startProcessor() >>> transaction.commit() >>> time.sleep(0.5)>>> rp.stopProcessor() >>> transaction.commit() >>> time.sleep(0.5)>>> transaction.abort() # prevent spurious conflict errors >>> testing.testCounter 2>>> print logger m01.remote INFO Processor 'root-worker' started m01.remote INFO Job: 1 m01.remote INFO Job: 2 m01.remote INFO Processor 'root-worker' stopped
Reset test counter
>>> testing.testCounter = 0
MultiJobProcessor
The multi-threaded job worker executes several jobs at once. It was designed for jobs that would take a long time but use very little processing power.
Let’s add a few new jobs to execute:
>>> jobid1 = rp.addJob(u'sleep', (0.04, 1)) >>> time.sleep(0.2) >>> jobid2 = rp.addJob(u'sleep', (1.0, 2)) >>> time.sleep(0.2) >>> jobid3 = rp.addJob(u'sleep', (0, 3)) >>> time.sleep(0.2) >>> jobid4 = rp.addJob(u'sleep', (0.2, 4)) >>> time.sleep(0.2) >>> transaction.commit()
Before testing the worker in the remote processor, let’s have a look at every method by itself. So we instantiate the worker:
>>> from m01.remote.worker import MultiJobWorker >>> worker = MultiJobWorker(rp, waitTime=0, maxThreads=2)
The maximum amount of threads can be set as well:
>>> worker.maxThreads 2
All working threads can be reviewed at any time:
>>> worker.threads []>>> from zope.security import management >>> management.endInteraction()
Let’s pull a new job:
>>> job = worker.doPullNextJob() >>> job <SleepJob u'...' ...>
We need to pull a job before executing it, so that the database marks the job as processing and no new thread picks up the same job. As you can see the job get marked with the processing status:
>>> job.status u'processing'
Once we pulled a particular job, we can process it:
>>> logger.clear() >>> print logger>>> worker.doProcessJob(job.__name__)>>> print logger m01.remote INFO Job: 1
Let’s now have a look at using the processor in the task service. This primarily means setting the processor factory:
>>> management.newInteraction()>>> rp.jobWorkerFactory = MultiJobWorker >>> rp.jobWorkerArguments = {'waitTime': 1.0, 'maxThreads': 2} >>> transaction.commit()>>> logger.clear()
Let’s now process the remaining jobs:
>>> rp.startProcessor() >>> transaction.commit() >>> time.sleep(1.5)>>> rp.stopProcessor() >>> transaction.commit() >>> time.sleep(0.5)
As you can see, this time the jobs are not completed in order anymore, because they all need different time to execute:
>>> print logger m01.remote INFO Processor 'root-worker' started m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 3 m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 4 m01.remote INFO Job: 2 m01.remote INFO Processor 'root-worker' stopped
Let’s now set the thread limit to four and construct a new set of jobs that demonstrate that all jobs will run at the same time:
>>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 4}>>> jobid1 = rp.addJob(u'sleep', (0.3, 1)) >>> time.sleep(0.2) >>> jobid2 = rp.addJob(u'sleep', (0.4, 2)) >>> time.sleep(0.2) >>> jobid3 = rp.addJob(u'sleep', (0.1, 3)) >>> time.sleep(0.2) >>> jobid4 = rp.addJob(u'sleep', (0.5, 4)) >>> time.sleep(0.2) >>> transaction.commit()
If all tasks are processed at once, job 3 should be done first. You can also see that the job 4 get processed ASAP even before the worker logs processing:
>>> logger.clear()>>> rp.startProcessor() >>> transaction.commit()>>> time.sleep(1.0)>>> rp.stopProcessor() >>> transaction.commit() >>> time.sleep(0.5)>>> print logger m01.remote INFO Processor 'root-worker' started m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 3 m01.remote INFO Job: 1 m01.remote INFO Job: 2 m01.remote INFO Job: 4 m01.remote INFO Processor 'root-worker' stopped
Let’s now set the thread limit to two and construct a new set of jobs that demonstrate that not more than two threads run at the same time:
>>> rp.jobWorkerArguments = {'waitTime': 0.0, 'maxThreads': 2} >>> transaction.commit()>>> jobid1 = rp.addJob(u'sleep', (0.3, 1)) >>> time.sleep(0.2) >>> jobid2 = rp.addJob(u'sleep', (0.4, 2)) >>> time.sleep(0.2) >>> jobid3 = rp.addJob(u'sleep', (0.2, 3)) >>> time.sleep(0.2) >>> jobid4 = rp.addJob(u'sleep', (0.5, 4)) >>> time.sleep(0.2) >>> transaction.commit()
If all tasks are processed at once, job 3 should be done first, but since the job has to wait for an available thread, it will come in third. We can now run the jobs and see the result:
>>> logger.clear()>>> rp.startProcessor() >>> transaction.commit()>>> time.sleep(1.5)>>> rp.stopProcessor() >>> transaction.commit() >>> time.sleep(0.5)>>> print logger m01.remote INFO Processor 'root-worker' started m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 1 m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 2 m01.remote INFO MultiJobWorker: processing job ... m01.remote INFO Job: 3 m01.remote INFO Job: 4 m01.remote INFO Processor 'root-worker' stopped
Scheduler
The scheduler concept is implemented as an additional scheduler container which contains scheduler items.
>>> from m01.mongo import UTC >>> import m01.remote.scheduler >>> from m01.remote import interfaces >>> from m01.remote import testing
Let’s now start by get our test remote procesor which contains our scheduler container:
>>> remoteProcessor = root >>> remoteProcessor <TestProcessor None>>>> scheduler = remoteProcessor._scheduler>>> tuple(scheduler.values()) ()
Delay
We can add a scheduler item for delay a job processing. Let’s add such an item:
>>> import datetime >>> def getNextTime(dt, seconds): ... return dt + datetime.timedelta(seconds=seconds)>>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC) >>> now10 = getNextTime(now, 10) >>> delay = 10 >>> data = {'jobName': u'echo 1', 'active': True, 'delay': delay, ... 'retryDelay': 5, 'nextCallTime': now10} >>> firstEcho = m01.remote.scheduler.Delay(data) >>> interfaces.IDelay.providedBy(firstEcho) True
The delay is set to 10:
>>> firstEcho.delay 10
and the retryDelay to 5
>>> firstEcho.retryDelay 5
and we set an explicit nextCallTime of now + 10:
>>> firstEcho.nextCallTime == getNextTime(now, 10) True
and our retryTime is None:
>>> firstEcho.retryTime is None True
Now we can add the delay item to the scheduler:
>>> scheduler.add(firstEcho) u'...'
As you can see the scheduler contains on item:
>>> sorted(scheduler.values()) [<Delay ... for: u'echo 1'>]
As next we’ll test some scheduler AP methods. First check if we can update the retryTime for an item in our adding cache with updateRetryTime:
>>> scheduler.updateRetryTime(firstEcho.dump(), now) False
As you can see we did not get a new retryTime. This happens because we didn’t use the correct callTime. Let’s try with the correct nextCallTime:
>>> now10 = getNextTime(now, 10) >>> now15 = getNextTime(now, 15) >>> retryTime = scheduler.updateRetryTime(firstEcho.dump(), now10) >>> retryTime == now15 True
As you can see the new retryTime is using the retryDelay of 5 second. This retryTime is used for lock an item. This means an item get not picked as long as this time get passed.
Now let’ try another internal API method hihc is able to get the next item from our adding cache:
>>> scheduler.getNextCachedItem(now)
As you can see the method didn’t return an item, let’s try with the next scheduled call time:
>>> nextCallTime = firstEcho.nextCallTime >>> scheduler.getNextCachedItem(now10) <Delay ... for: u'echo 1'>
As you can see the retryTime get set based on the nextCallTime and the retryDelay:
>>> firstEcho.retryTime == getNextTime(nextCallTime, 5) True
Now the important part. Let’s test our method which is responsible for get a next item including items from mongo. This method uses the two methods above. Of corse with the current time we will not get any item:
>>> scheduler.pullNextSchedulerItem(now) is None True
But now we need another nextCallTime because the previous call update the items nextCallTime. Let’s first check the nextCallTime:
>>> firstEcho.nextCallTime == now10 True
But as you can see, the retryTime is already set during our previous test. this means we only will get an item if we at least use a larger time if the retryTime:
>>> firstEcho.retryTime == now15 True>>> scheduler.pullNextSchedulerItem(now10)>>> scheduler.pullNextSchedulerItem(now15) <Delay ... for: u'echo 1'>
Now, let’s check our scheduled item times:
>>> now20 = getNextTime(now15, 5) >>> firstEcho.nextCallTime == now10 True
Note, our retryTime get calculated with the current call time and retryDelay. It whould not make sense if we whould use the callTime as retryTime calculation base:
>>> firstEcho.retryTime == now20 True
The method pullNextSchedulerItem returns a pending item or None since we don’t have one pending:
>>> scheduler.pullNextSchedulerItem(now) is None True
Now let’s add a second scheduler item within some scheduler time:
>>> import datetime >>> delay = 10 >>> data = {'jobName': u'echo 2', 'active': True, 'delay': delay, ... 'retryDelay': 5} >>> secondEcho = m01.remote.scheduler.Delay(data)>>> scheduler.add(secondEcho) u'...'>>> sorted(scheduler.values(), key=lambda x:(x.__name__, x.__name__)) [<Delay ... for: u'echo 1'>, <Delay ... for: u'echo 2'>]>>> scheduler.remove(firstEcho) >>> scheduler.remove(secondEcho) >>> tuple(scheduler.values()) ()
adjustCallTime
Before we test our cron item, let’s test test our method which can reset a given datetime to the smalles starting point e.g. if hours are given as a calculation base, we need to start counting within the first minute:
>>> from m01.remote.scheduler import adjustCallTime>>> now = datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC) >>> now datetime.datetime(2010, 10, 25, 16, 6, 5, 123, tzinfo=UTC)>>> item = m01.remote.scheduler.Cron({'jobName': u'bar', 'minute': [5]}) >>> adjustCallTime(item, now) datetime.datetime(2010, 10, 25, 16, 6, 0, 123, tzinfo=UTC)
Cron
A probably more interesting implementation is the cron scheduler item. This cron item can schedule jobs at a specific given time. Let’s setup such a cron item:
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5} >>> cronItem = m01.remote.scheduler.Cron(data)
The cronItem provides the ISchedulerItem and ICron interface:
>>> interfaces.ISchedulerItem.providedBy(cronItem) True>>> interfaces.ICron.providedBy(cronItem) True
As you can see the cron item also provides a retryDelay:
>>> cronItem.retryDelay 5
Let’s first explain how this works. The cron scheduler provides a next call time stamp. If the calculated next call time is smaller then the last call time, the cron scheduler item will calculate the new next call time and store them as nextCallTime and at the same time the previous nextCallTime get returnd. This will makes sure that we have a minimum of time calculation calls because each time a cron scheduler item get asked about the next call time the stored nextCallTime is used. The cron schdeuler item only calculates the next call time if the existing next call time is smaller then the given call time.
Now let’s test a cron as a scheduler item. Setup a simple corn item with a 5 minute period.
>>> now = datetime.datetime(2010, 10, 1, 0, 0, 0, tzinfo=UTC) >>> now datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)>>> data = {'jobName': u'echo cron', 'active': True, 'retryDelay': 5, ... 'minute': [5], 'nextCallTime': now} >>> cronEcho = m01.remote.scheduler.Cron(data)
Now add the item to the schdeuler:
>>> scheduler.add(cronEcho) u'...'
As you can see, our cron item get scheduled based on the given nextCallTime:
>>> cronEcho.nextCallTime datetime.datetime(2010, 10, 1, 0, 0, tzinfo=UTC)
the retrytime is empty
>>> cronEcho.retryTime is None True
and the minute list contains our 5 minute:
>>> cronEcho.minute [5]>>> cronEcho.hour []>>> cronEcho.dayOfMonth []>>> cronEcho.month []>>> cronEcho.dayOfWeek []
And the scheduler contains one cron item:
>>> tuple(scheduler.values()) (<Cron ... for: u'echo cron'>,)
Now we can get the job based on the jobName echo defined by our cron scheduler item if we call pullNextSchedulerItem.
>>> scheduler.pullNextSchedulerItem(now) <Cron ... for: u'echo cron'>
During this call the retryTime get set based on the retryDelay:
>>> cronEcho.retryTime datetime.datetime(2010, 10, 1, 0, 0, 5, tzinfo=UTC)
Now let’s test the the different cron settings. Note that we provide a list of values for minutes, hours, month, dayOfWeek and dayOfMonth. This means you can schedule a job for every 15 minutes if you will set the minutes to (0, 15, 30, 45) or if you like to set a job only each 15 minutes after an hour you can set minutes to (15,). If you will set more then one argument e.g. minute, hours or days etc. all arguments must fit the given time.
Let’s start with a cron scheduler for every first and second minute per hour. Normaly the corn scheduler item will set now int(time.time()) as nextCallTime value. For test our cron scheduler items, we use a explicit startTime value of 0 (zero):
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'minute': [0, 1]} >>> cronItem = m01.remote.scheduler.Cron(data)
The next call time is set based on the given startTime value. This means the first call will be at 0 (zero) minute:
>>> cronItem.nextCallTime is None True
Now let’s call getNextCallTime, as you can see we will get None as nextCallTime because we ddn’t set a nextCallTime during cron initialization and the nextCallTime is set to the next minute:
>>> cronItem.getNextCallTime(now) is None True>>> cronItem.nextCallTime datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)
Now let’s call getNextCallTime again, as you can see we will get the nextCallTime we calculated during object initialization which is the given call time and the nextCallTime is set to the next minute:
If we use a call time + 5 seconds, we still will get the cached next call time of 1 minute and we will not generate a new next call time since this time is already in the future:
>>> cronItem.getNextCallTime(getNextTime(now, 5)) datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)>>> cronItem.nextCallTime datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)
If we call the cron scheduler item with a call time equal or larger then our 1 minute delay from the cached next call time, we will get the cached call time as value as we whould get similar to a smaller call time (see sample above).
>>> cronItem.getNextCallTime(getNextTime(now, 65)) datetime.datetime(2010, 10, 1, 0, 1, tzinfo=UTC)>>> cronItem.nextCallTime datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)
All future calls with a smaller time then the nextCallTime will return the current nextCallTime and not calculate any new time.
>>> cronItem.getNextCallTime(getNextTime(now, 125)) datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)>>> cronItem.getNextCallTime(getNextTime(now, 1*60*60)) datetime.datetime(2010, 10, 1, 1, 0, tzinfo=UTC)
Remember, getNextCallTime returns the previous calculated nextCallTime and the new calculated nextCallTime get stored as nextCallTime. For a simpler test output we define a test method which shows the time calculation:
Minutes
Let’s start testing the time tables.
>>> def getNextCallTime(cron, dt, seconds=None): ... """Return stored and new calculated nextCallTime""" ... if seconds is None: ... callTime = dt ... else: ... callTime = getNextTime(dt, seconds) ... nextCallTime = cron.getNextCallTime(callTime) ... return '%s --> %s' % (nextCallTime, cron.nextCallTime)>>> now = datetime.datetime(1970, 1, 1, 0, 3, 0, tzinfo=UTC) >>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'minute': [0, 10], 'nextCallTime':now} >>> item = m01.remote.scheduler.Cron(data)>>> str(now) '1970-01-01 00:03:00+00:00'>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'>>> getNextCallTime(item, now, 1) '1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'>>> getNextCallTime(item, now, 2*60) '1970-01-01 00:10:00+00:00 --> 1970-01-01 00:10:00+00:00'>>> getNextCallTime(item, now, 51*60) '1970-01-01 00:10:00+00:00 --> 1970-01-01 01:00:00+00:00'>>> getNextCallTime(item, now, 55*60) '1970-01-01 01:00:00+00:00 --> 1970-01-01 01:00:00+00:00'
Hour
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'hour': [2, 13], 'nextCallTime':now} >>> item = m01.remote.scheduler.Cron(data)>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-01 02:00:00+00:00'>>> getNextCallTime(item, now, 2*60*60) '1970-01-01 02:00:00+00:00 --> 1970-01-01 13:00:00+00:00'>>> getNextCallTime(item, now, 4*60*60) '1970-01-01 13:00:00+00:00 --> 1970-01-01 13:00:00+00:00'>>> getNextCallTime(item, now, 13*60*60) '1970-01-01 13:00:00+00:00 --> 1970-01-02 02:00:00+00:00'>>> getNextCallTime(item, now, 15*60*60) '1970-01-02 02:00:00+00:00 --> 1970-01-02 02:00:00+00:00'
Month
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'month': [1, 2, 5, 12], 'nextCallTime':now} >>> item = m01.remote.scheduler.Cron(data)>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-02-01 00:03:00+00:00'>>> getNextCallTime(item, now, 90*24*60*60) '1970-02-01 00:03:00+00:00 --> 1970-05-01 00:03:00+00:00'>>> getNextCallTime(item, now, 120*24*60*60) '1970-05-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'>>> getNextCallTime(item, now, 130*24*60*60) '1970-12-01 00:03:00+00:00 --> 1970-12-01 00:03:00+00:00'>>> getNextCallTime(item, now, 360*24*60*60) '1970-12-01 00:03:00+00:00 --> 1971-01-01 00:03:00+00:00'
dayOfWeek [0..6]
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'dayOfWeek': [0, 2, 4, 5], 'nextCallTime':now} >>> item = m01.remote.scheduler.Cron(data)
The current weekday of now is:
>>> now.weekday() 3
this means our nextCallTime should get changed using day 4 as our nextCallTime if we call them with now:
>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-02 00:03:00+00:00'
with a day more, we will get the weekday 4 (skip):
>>> getNextCallTime(item, now, 24*60*60) '1970-01-02 00:03:00+00:00 --> 1970-01-03 00:03:00+00:00'
with another day more, we will get the weekday 5 (incr):
>>> getNextCallTime(item, now, 2*24*60*60) '1970-01-03 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'
with another day more, we will get the weekday 6 (skip):
>>> getNextCallTime(item, now, 3*24*60*60) '1970-01-05 00:03:00+00:00 --> 1970-01-05 00:03:00+00:00'
with another day more, we will get the weekday 0 (inc):
>>> getNextCallTime(item, now, 4*24*60*60) '1970-01-05 00:03:00+00:00 --> 1970-01-07 00:03:00+00:00'
dayOfMonth [1..31]
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'dayOfMonth': [2, 12, 21, 30], 'nextCallTime': now} >>> item = m01.remote.scheduler.Cron(data)>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-02 00:00:00+00:00'>>> getNextCallTime(item, now, 12*24*60*60) '1970-01-02 00:00:00+00:00 --> 1970-01-21 00:00:00+00:00'>>> getNextCallTime(item, now, 31*24*60*60) '1970-01-21 00:00:00+00:00 --> 1970-02-02 00:00:00+00:00'
Combined
combine some attributes:
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'minute': [10], 'dayOfMonth': [1, 10, 20, 30], ... 'nextCallTime': now} >>> item = m01.remote.scheduler.Cron(data)>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-01 00:10:00+00:00'>>> getNextCallTime(item, now, 10*60) '1970-01-01 00:10:00+00:00 --> 1970-01-01 01:10:00+00:00'>>> getNextCallTime(item, now, 10*24*60*60) '1970-01-01 01:10:00+00:00 --> 1970-01-20 00:10:00+00:00'>>> getNextCallTime(item, now, 20*24*60*60) '1970-01-20 00:10:00+00:00 --> 1970-01-30 00:10:00+00:00'
another sample:
>>> data = {'jobName': u'bar', 'active': True, 'retryDelay': 5, ... 'minute': [10], 'hour': [4], 'dayOfMonth': [1, 12, 21, 30], ... 'nextCallTime': now} >>> item = m01.remote.scheduler.Cron(data)>>> getNextCallTime(item, now) '1970-01-01 00:03:00+00:00 --> 1970-01-01 04:10:00+00:00'>>> getNextCallTime(item, now, 10*60) '1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'>>> getNextCallTime(item, now, 4*60*60) '1970-01-01 04:10:00+00:00 --> 1970-01-01 04:10:00+00:00'>>> getNextCallTime(item, now, 5*60*60) '1970-01-01 04:10:00+00:00 --> 1970-01-12 04:10:00+00:00'
CHANGES
3.0.0 (2015-11-10)
support pymongo >= 3.0.0 and use 3.0.0 as package version and reflect pymongo >= 3.0.0 compatibility
0.6.0 (2013-06-28)
feature: implemented JobError as Job sub item. And rename previous JobError to RemoteException. This changes requires that you delete all previous JobError jobs in the job list before update. Also raise RemoteException instead of JobError in your code. The new JobError sub item provides a better error traceback message and a created date.
feature: implement better error handling, save formatted traceback string
0.5.1 (2012-11-18)
added MANIFEST.in files
remove p01.i18n package dependency
allow to remove jobs with all stati
split scheduler and container and move scheduler part into mixin class
switch to bson import
reflect changes in getBatchData signature
fix dateime compare, round milliseconds
adjust different schema description, user the same message id as used in title
removed unused id
0.5.0 (2011-08-19)
initial release
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.