Skip to main content

Integration package for zc.async allowing asynchronous operations in Plone

Project description


Integration package for zc.async allowing asynchronous operations in Plone 3 and 4.



You will typically run in a ZEO environment, where you will have one or more worker instances carrying out jobs queued by your main zope instances.

For the sake of simplicity it is assumed that you have one instance that can queue new jobs, and one worker instance that consumes them, both operating on a single database. In this case your buildout configuration will look similar to the following:

recipe = plone.recipe.zope2zeoserver
file-storage = ${buildout:directory}/var/filestorage/Data.fs

recipe = plone.recipe.zope2instance
eggs = Plone
zcml =
zcml-additional =
    <include package="" file="single_db_instance.zcml" />
environment-vars =
    ZC_ASYNC_UUID ${buildout:directory}/var/instance-uuid.txt

recipe = plone.recipe.zope2instance
eggs = ${instance:eggs}
zcml = ${instance:zcml}
zcml-additional =
    <include package="" file="single_db_worker.zcml" />
environment-vars =
    ZC_ASYNC_UUID ${buildout:directory}/var/worker-uuid.txt

There are two important stanzas here:

  • Each instance has to set the ZC_ASYNC_UUID environment variable in order to integrate properly with zc.async.

  • Each instance loads the single_db_instance.zcml configuration. The worker instance loads the single_db_worker.zcml configuration in order to setup the queue and configure itself as a dispatcher.

For more details please look at the example buildout configurations included in the package.

Plone 3


Plone 4

Use five.intid


Code from Enfold’s plone.async.core package has been used for setting up the queues.


User Documentation

Basic use

Assuming your setup is done correctly, you can start by obtaining the AsyncService utility:

>>> from zope.component import getUtility
>>> from import IAsyncService
>>> async = getUtility(IAsyncService)
>>> async
< object at ...>
>>> folder = layer['test-folder']
>>> portal = layer['portal']

You can already get the zc.async queues:

>>> async.getQueues()
<zc.async.queue.Queues object at ...>

>>> import zc.async.dispatcher
>>> from import _dispatcher_uuid
>>> zc.async.dispatcher.get(_dispatcher_uuid)
<zc.async.dispatcher.Dispatcher object at ...>
>>> queue = async.getQueues()['']
>>> queue
<zc.async.queue.Queue object at ...>

Let’s define a simple function to be executed asynchronously. Note that the first argument must be a valid Zope object:

>>> from import *

and queue it:

>>> job = async.queueJob(addNumbers, folder, 40, 2)
>>> len(queue)
>>> job.status

In real life the job would be executed by the worker. In the tests we need to commit in order to let the dispatcher become aware of the job and execute it. Also we wait for the job to complete before continuing with the test:

>>> import transaction
>>> from zc.async.testing import wait_for_result
>>> transaction.commit()
>>> wait_for_result(job)

Batches of jobs

Let’s now try some jobs that create persistent objects. First define the tasks to be executed asynchronously:

>>> from Products.CMFCore.utils import getToolByName

Queue a job that creates a document and another that submits it:

>>> job = async.queueJob(createDocument, folder,
...     'foo', 'title', 'description', 'body')
>>> job2 = async.queueJob(submitObject, folder, 'foo')
>>> transaction.commit()

Because by default the jobs are executed with the default quota set to 1, (i.e. only one job can be executed at a time), jobs are executed serially and according to the order by which they were submitted. Hence, waiting for the job that submits the document implies that the one that created it has already been carried out:

>>> wait_for_result(job2)
>>> wt = getToolByName(folder, 'portal_workflow')
>>> doc = folder['foo']
>>> wt.getInfoFor(doc, 'review_state')

You can also queue a batch of jobs to be executed serially as one job by use of queueSerialJobs:

>>> from import makeJob
>>> job = async.queueSerialJobs(
...     makeJob(createDocument, folder,
...             'bar', 'title', 'description', 'body'),
...     makeJob(submitObject, folder, 'bar'))
>>> transaction.commit()
>>> res = wait_for_result(job)
>>> res[0].result
>>> res[1].status
>>> doc = folder['bar']
>>> wt.getInfoFor(doc, 'review_state')

If you want to execute jobs in parallel, you can use queueParallelJobs.

Security and user permissions

When a job is queued by some user, it is also executed by the same user, with the same roles and permissions. So for instance:

>>> job = async.queueJob(createDocument, portal,
...     'foo', 'title', 'description', 'body')
>>> transaction.commit()

will fail as the user is not allowed to create content in the Plone root:

>>> wait_for_result(job)

Handling failure and success

If you need to act on the result of a job or handle a failure you can do so by adding callbacks. For instance:

>>> from import funcs
>>> job = async.queueJob(addNumbers, folder, 40, 2)
>>> c = job.addCallback(job_success_callback)
>>> transaction.commit()
>>> r = wait_for_result(job)
>>> funcs.results
['Success: 42']

Failures can be handled in the same way:

>>> job = async.queueJob(failingJob, folder)
>>> c = job.addCallbacks(failure=job_failure_callback)
>>> transaction.commit()
>>> r = wait_for_result(job)
>>> funcs.results

It is also possible to handle all successful/failed jobs (for instance if you want to send an email upon failure) by subscribing to the respective event:

>>> from zope.component import provideHandler
>>> from import IJobSuccess, IJobFailure
>>> provideHandler(successHandler, [IJobSuccess])
>>> provideHandler(failureHandler, [IJobFailure])
>>> funcs.results = []
>>> job1 = async.queueJob(addNumbers, folder, 40, 2)
>>> job2 = async.queueJob(failingJob, folder)
>>> transaction.commit()
>>> r = wait_for_result(job2)
>>> funcs.results
[42, ...RuntimeError...FooBared...

Let’s clean up and unregister the success/failure handlers…:

>>> from zope.component import getGlobalSiteManager
>>> gsm = getGlobalSiteManager()
>>> _ = gsm.unregisterHandler(successHandler, [IJobSuccess])
>>> _ = gsm.unregisterHandler(failureHandler, [IJobFailure])
>>> transaction.commit()


1.7 (unreleased)

  • Add zope.minmax dependency [vangheem]

1.4 (2012-10-09)

  • fix tests, and helpers mainly for testing layer consumers (collective.cron) [kiorky]

  • fix rst markup [kiorky]

1.3 (2012-10-05)

  • buildout infrastructure refresh [kiorky]

  • plone 4.3 compatibility [kiorky]

  • Switch tests from collective.testcaselayer to [kiorky]

  • Merge expemimental work on UI & jobs back to master [kiorky]

  • Add plone UI to view jobs [davisagli]

  • Added support for queued/deferred tasks. [kiork,davisagli,do3cc]

1.2 - 2012-04-26

  • Fix includes to work correctly with dexterity and to include simplejson. [vangheem]

  • Change ping intervals to something more sane so it doesn’t take so long for your workers to come back online after restart. [vangheem]

1.1 - 2011-07-21

  • Add [WouterVH]

  • Change zcml:condition for zope.(app.)keyreference to use the plone-4 feature. [vangheem]

  • Always use loadZCMLFile in testcase layers to not break under Zope 2.13. [stefan]

  • Avoid excessive ZODB growth by increasing the dispatcher ping intervals. [stefan]

1.0 - 2011-01-03

  • Conditionally include ZCML. [vangheem]

  • Fix for async jobs started by anonymous user. [naro]

  • Add full set of example buildouts. [stefan]

  • Make tests pass under Plone 3 and 4. Exception representations have changed for some odd reason. [stefan]

1.0a6 - 2010-10-14

  • First public release. [ggozad]

1.0a5 - 2010-10-14

  • Instead of guessing where a userid may be coming from, record the path of the userfolder and use that to reinstate the user. [mj]

1.0a4 - 2010-09-09

  • Use multi-db setup in tests to keep testcaselayer working as expected. [stefan, ggozad]

1.0a3 - 2010-09-01

  • Separate helper function from test setup so it can be used in non-test code. [witsch]

1.0a2 - 2010-08-30

  • Made separate zcml configurations for single/multi and instance/worker. [stefan, ggozad]

1.0a1 - 2010-08-25

  • zc.async integration for Plone. Initial release. [ggozad, stefan]

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution (59.2 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page