Skip to main content

Perform durable tasks asynchronously

Project description

Introduction

Goals

The zc.async package provides a way to schedule jobs to be performed out-of-band from your current thread. The job might be done in another thread or another process, possibly on another machine. Here are some example core use cases.

  • You want to let users do something that requires a lot of system resources from your application, such as creating a large PDF. Naively done, six or seven simultaneous PDF requests will consume your application thread pool and could make your application unresponsive to any other users.

  • You want to let users spider a web site; communicate with a credit card company; query a large, slow LDAP database on another machine; or do some other action that generates network requests from the server. System resources might not be a problem, but, again, if something goes wrong, several requests could make your application unresponsive.

  • Perhaps because of resource contention, you want to serialize work that can be done asynchronously, such as updating a single data structure like a catalog index.

  • You want to decompose and parallelize a single job across many machines so it can be finished faster.

  • You have an application job that you discover is taking longer than users can handle, even after you optimize it. You want a quick fix to move the work out-of-band.

Many of these core use cases involve end-users being able to start potentially expensive processes, on demand. Basic scheduled tasks are also provided by this package, though recurrence must be something you arrange.

History

This is a second-generation design. The first generation was zasync, a mission-critical and successful Zope 2 product in use for a number of high-volume Zope 2 installations. [1] It’s worthwhile noting that zc.async has absolutely no backwards compatibility with zasync and zc.async does not require Zope (although it can be used in conjuction with it, details below).

Design Overview

Overview: Usage

Looking at the design from the perspective of regular usage, your code obtains a queue, which is a place to register jobs to be performed asynchronously.

Your application calls put on the queue to register a job. The job must be a pickleable, callable object; a global function, a callable persistent object, a method of a persistent object, or a special zc.async.job.Job object (discussed later) are all examples of suitable objects. The job by default is registered to be performed as soon as possible, but can be registered to be called at a certain time.

The put call will return a zc.async.job.Job object. This object represents both the callable and its deferred result. It has information about the job requested, the current state of the job, and the result of performing the job.

An example spelling for registering a job might be self.pending_result = queue.put(self.performSpider). The returned object can be stored and polled to see when the job is complete; or the job can be configured to do additional work when it completes (such as storing the result in a data structure).

Overview: Mechanism

Multiple processes, typically spread across multiple machines, can connect to the queue and claim and perform work. As with other collections of processes that share pickled objects, these processes generally should share the same software (though some variations on this constraint should be possible).

A process that should claim and perform work, in addition to a database connection and the necessary software, needs a dispatcher with a reactor to provide a heartbeat. The dispatcher will rely on one or more persistent agents in the queue (in the database) to determine which jobs it should perform.

A dispatcher is in charge of dispatching queued work for a given process to worker threads. It works with one or more queues and a single reactor. It has a universally unique identifier (UUID), which is usually an identifier of the application instance in which it is running. The dispatcher starts jobs in dedicated threads.

A reactor is something that can provide an eternal loop, or heartbeat, to power the dispatcher. It can be the main twisted reactor (in the main thread); another instance of a twisted reactor (in a child thread); or any object that implements a small subset of the twisted reactor interface (see discussion in dispatcher.txt, and example testing reactor in testing.py, used below).

An agent is a persistent object in a queue that is associated with a dispatcher and is responsible for picking jobs and keeping track of them. Zero or more agents within a queue can be associated with a dispatcher. Each agent for a given dispatcher in a given queue is identified uniquely with a name [2].

Generally, these work together as follows. The reactor calls the dispatcher. The dispatcher tries to find the mapping of queues in the database root under a key of zc.async (see constant zc.async.interfaces.KEY). If it finds the mapping, it iterates over the queues (the mapping’s values) and asks each queue for the agents associated with the dispatcher’s UUID. The dispatcher then is responsible for seeing what jobs its agents want to do from the queue, and providing threads and connections for the work to be done. The dispatcher then asks the reactor to call itself again in a few seconds.

Reading More

This document continues on with three other main sections: Usage, Configuration without Zope 3, and Configuration with Zope 3.

Other documents in the package are primarily geared as maintainer documentation, though the author has tried to make them readable and understandable.

Usage

Overview and Basics

The basic usage of zc.async does not depend on a particular configuration of the back-end mechanism for getting the jobs done. Moreover, on some teams, it will be the responsibility of one person or group to configure zc.async, but a service available to the code of all team members. Therefore, we begin our detailed discussion with regular usage, assuming configuration has already happened. Subsequent sections discuss configuring zc.async with and without Zope 3.

So, let’s assume we have a queue with dispatchers, reactors and agents all waiting to fulfill jobs placed into the queue. We start with a connection object, conn, and some convenience functions introduced along the way that help us simulate time passing and work being done [3].

Obtaining the queue

First, how do we get the queue? Your installation may have some conveniences. For instance, the Zope 3 configuration described below makes it possible to get the primary queue with an adaptation call like zc.async.interfaces.IQueue(a_persistent_object_with_db_connection).

But failing that, queues are always expected to be in a zc.async.queue.Queues mapping found off the ZODB root in a key defined by the constant zc.async.interfaces.KEY.

>>> import zc.async.interfaces
>>> zc.async.interfaces.KEY
'zc.async'
>>> root = conn.root()
>>> queues = root[zc.async.interfaces.KEY]
>>> import zc.async.queue
>>> isinstance(queues, zc.async.queue.Queues)
True

As the name implies, queues is a collection of queues. As discussed later, it’s possible to have multiple queues, as a tool to distribute and control work. We will assume a convention of a queue being available in the ‘’ (empty string).

>>> queues.keys()
['']
>>> queue = queues['']

queue.put

Now we want to actually get some work done. The simplest case is simple to perform: pass a persistable callable to the queue’s put method and commit the transaction.

>>> def send_message():
...     print "imagine this sent a message to another machine"
>>> job = queue.put(send_message)
>>> import transaction
>>> transaction.commit()

Note that this won’t really work in an interactive session: the callable needs to be picklable, as discussed above, so send_message would need to be a module global, for instance.

The put returned a job. Now we need to wait for the job to be performed. We would normally do this by really waiting. For our examples, we will use a helper method on the testing reactor to wait_for the job to be completed.

>>> reactor.wait_for(job)
imagine this sent a message to another machine

We also could have used the method of a persistent object. Here’s another quick example.

First we define a simple persistent.Persistent subclass and put an instance of it in the database [4].

>>> import persistent
>>> class Demo(persistent.Persistent):
...     counter = 0
...     def increase(self, value=1):
...         self.counter += value
...
>>> root['demo'] = Demo()
>>> transaction.commit()

Now we can put the demo.increase method in the queue.

>>> root['demo'].counter
0
>>> job = queue.put(root['demo'].increase)
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> root['demo'].counter
1

The method was called, and the persistent object modified!

To reiterate, only pickleable callables such as global functions and the methods of persistent objects can be used. This rules out, for instance, lambdas and other functions created dynamically. As we’ll see below, the job instance can help us out there somewhat by offering closure-like features.

queue.pull

If you put a job into a queue and it hasn’t been claimed yet and you want to cancel the job, pull it from the queue.

>>> len(queue)
0
>>> job = queue.put(send_message)
>>> len(queue)
1
>>> job is queue.pull()
True
>>> len(queue)
0

Scheduled Calls

When using put, you can also pass a datetime.datetime to schedule a call. A datetime without a timezone is considered to be in the UTC timezone.

>>> t = transaction.begin()
>>> import datetime
>>> import pytz
>>> datetime.datetime.now(pytz.UTC)
datetime.datetime(2006, 8, 10, 15, 44, 33, 211, tzinfo=<UTC>)
>>> job = queue.put(
...     send_message, begin_after=datetime.datetime(
...         2006, 8, 10, 15, 56, tzinfo=pytz.UTC))
>>> job.begin_after
datetime.datetime(2006, 8, 10, 15, 56, tzinfo=<UTC>)
>>> transaction.commit()
>>> reactor.wait_for(job, attempts=2) # +5 virtual seconds
TIME OUT
>>> reactor.wait_for(job, attempts=2) # +5 virtual seconds
TIME OUT
>>> datetime.datetime.now(pytz.UTC)
datetime.datetime(2006, 8, 10, 15, 44, 43, 211, tzinfo=<UTC>)
>>> zc.async.testing.set_now(datetime.datetime(
...     2006, 8, 10, 15, 56, tzinfo=pytz.UTC))
>>> reactor.wait_for(job)
imagine this sent a message to another machine
>>> datetime.datetime.now(pytz.UTC) >= job.begin_after
True

If you set a time that has already passed, it will be run as if it had been set to run as soon as possible [5]…unless the job has already timed out, in which case the job fails with an abort [6].

The queue’s put method is the essential API. pull is used rarely. Other methods are used to introspect, but are not needed for basic usage.

But what is that result of the put call in the examples above? A job? What do you do with that?

Jobs

Overview

The result of a call to put returns an IJob. The job represents the pending result. This object has a lot of functionality that’s explored in other documents in this package, and demonstrated a bit below, but here’s a summary.

  • You can introspect, and even modify, the call and its arguments.

  • You can specify that the job should be run serially with others of a given identifier.

  • You can specify other calls that should be made on the basis of the result of this call.

  • You can persist a reference to it, and periodically (after syncing your connection with the database, which happens whenever you begin or commit a transaction) check its state to see if it is equal to zc.async.interfaces.COMPLETED. When it is, the call has run to completion, either to success or an exception.

  • You can look at the result of the call (once COMPLETED). It might be the result you expect, or a zc.twist.Failure, which is a subclass of twisted.python.failure.Failure, way to safely communicate exceptions across connections and machines and processes.

Results

So here’s a simple story. What if you want to get a result back from a call? Look at the job.result after the call is COMPLETED.

>>> def imaginaryNetworkCall():
...     # let's imagine this makes a network call...
...     return "200 OK"
...
>>> job = queue.put(imaginaryNetworkCall)
>>> print job.result
None
>>> job.status == zc.async.interfaces.PENDING
True
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> t = transaction.begin()
>>> job.result
'200 OK'
>>> job.status == zc.async.interfaces.COMPLETED
True

Closures

What’s more, you can pass a Job to the put call. This means that you aren’t constrained to simply having simple non-argument calls performed asynchronously, but you can pass a job with a call, arguments, and keyword arguments–effectively, a kind of closure. Here’s a quick example. We’ll use the demo object, and its increase method, that we introduced above, but this time we’ll include some arguments [7].

With positional arguments:

>>> t = transaction.begin()
>>> job = queue.put(
...     zc.async.job.Job(root['demo'].increase, 5))
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> t = transaction.begin()
>>> root['demo'].counter
6

With keyword arguments (value):

>>> job = queue.put(
...     zc.async.job.Job(root['demo'].increase, value=10))
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> t = transaction.begin()
>>> root['demo'].counter
16

Note that arguments to these jobs can be any persistable object.

Failures

What happens if a call raises an exception? The return value is a Failure.

>>> def I_am_a_bad_bad_function():
...     return foo + bar
...
>>> job = queue.put(I_am_a_bad_bad_function)
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> t = transaction.begin()
>>> job.result
<zc.twist.Failure exceptions.NameError>

Failures can provide useful information such as tracebacks.

>>> print job.result.getTraceback()
... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
exceptions.NameError: global name 'foo' is not defined
<BLANKLINE>

Callbacks

You can register callbacks to handle the result of a job, whether a Failure or another result.

Note that, unlike callbacks on a Twisted deferred, these callbacks do not change the result of the original job. Since callbacks are jobs, you can chain results, but generally callbacks for the same job all get the same result as input.

Also note that, during execution of a callback, there is no guarantee that the callback will be processed on the same machine as the main call. Also, some of the local functions, discussed below, will not work as desired.

Here’s a simple example of reacting to a success.

>>> def I_scribble_on_strings(string):
...     return string + ": SCRIBBLED"
...
>>> job = queue.put(imaginaryNetworkCall)
>>> callback = job.addCallback(I_scribble_on_strings)
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> job.result
'200 OK'
>>> callback.result
'200 OK: SCRIBBLED'

Here’s a more complex example of handling a Failure, and then chaining a subsequent callback.

>>> def I_handle_NameErrors(failure):
...     failure.trap(NameError) # see twisted.python.failure.Failure docs
...     return 'I handled a name error'
...
>>> job = queue.put(I_am_a_bad_bad_function)
>>> callback1 = job.addCallbacks(failure=I_handle_NameErrors)
>>> callback2 = callback1.addCallback(I_scribble_on_strings)
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> job.result
<zc.twist.Failure exceptions.NameError>
>>> callback1.result
'I handled a name error'
>>> callback2.result
'I handled a name error: SCRIBBLED'

Advanced Techniques and Tools

Important

The job and its functionality described above are the core zc.async tools.

The following are advanced techniques and tools of various complexities. You can use zc.async very productively without ever understanding or using them. If the following do not make sense to you now, please just move on for now.

zc.async.local

Jobs always run their callables in a thread, within the context of a connection to the ZODB. The callables have access to five special thread-local functions if they need them for special uses. These are available off of zc.async.local.

zc.async.local.getJob()

The getJob function can be used to examine the job, to get a connection off of _p_jar, to get the queue into which the job was put, or other uses.

zc.async.local.getQueue()

The getQueue function can be used to examine the queue, to put another task into the queue, or other uses. It is sugar for zc.async.local.getJob().queue.

zc.async.local.setLiveAnnotation(name, value, job=None)

The setLiveAnnotation tells the agent to set an annotation on a job, by default the current job, in another connection. This makes it possible to send messages about progress or for coordination while in the middle of other work.

As a simple rule, only send immutable objects like strings or numbers as values [8].

zc.async.local.getLiveAnnotation(name, default=None, timeout=0, poll=1, job=None)

The getLiveAnnotation tells the agent to get an annotation for a job, by default the current job, from another connection. This makes it possible to send messages about progress or for coordination while in the middle of other work.

As a simple rule, only ask for annotation values that will be immutable objects like strings or numbers [9].

If the timeout argument is set to a positive float or int, the function will wait that at least that number of seconds until an annotation of the given name is available. Otherwise, it will return the default if the name is not present in the annotations. The poll argument specifies approximately how often to poll for the annotation, in seconds (to be more precise, a subsequent poll will be min(poll, remaining seconds until timeout) seconds away).

zc.async.local.getReactor()

The getReactor function returns the job’s dispatcher’s reactor. The getLiveAnnotation and setLiveAnnotation functions use this, along with the zc.twist package, to work their magic; if you are feeling adventurous, you can do the same.

zc.async.local.getDispatcher()

The getDispatcher function returns the job’s dispatcher. This might be used to analyze its non-persistent poll data structure, for instance (described later in configuration discussions).

Let’s give three of those a whirl. We will write a function that examines the job’s state while it is being called, and sets the state in an annotation, then waits for our flag to finish.

>>> def annotateStatus():
...     zc.async.local.setLiveAnnotation(
...         'zc.async.test.status',
...         zc.async.local.getJob().status)
...     zc.async.local.getLiveAnnotation(
...         'zc.async.test.flag', timeout=5)
...     return 42
...
>>> job = queue.put(annotateStatus)
>>> transaction.commit()
>>> import time
>>> def wait_for_annotation(job, key):
...     reactor.time_flies(dispatcher.poll_interval) # starts thread
...     for i in range(10):
...         while reactor.time_passes():
...             pass
...         transaction.begin()
...         if key in job.annotations:
...             break
...         time.sleep(0.1)
...     else:
...         print 'Timed out' + repr(dict(job.annotations))
...
>>> wait_for_annotation(job, 'zc.async.test.status')
>>> job.annotations['zc.async.test.status'] == (
...     zc.async.interfaces.ACTIVE)
True
>>> job.status == zc.async.interfaces.ACTIVE
True

[10]

>>> job.annotations['zc.async.test.flag'] = True
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> job.result
42

[11] getReactor and getDispatcher are for advanced use cases and are not explored further here.

Job Quotas

One class of asynchronous jobs are ideally serialized. For instance, you may want to reduce or eliminate the chance of conflict errors when updating a text index. One way to do this kind of serialization is to use the quota_names attribute of the job.

For example, let’s first show two non-serialized jobs running at the same time, and then two serialized jobs created at the same time. The first part of the example does not use queue_names, to show a contrast.

For our parallel jobs, we’ll do something that would create a deadlock if they were serial. Notice that we are mutating the job arguments after creation to accomplish this, which is supported.

>>> def waitForParallel(other):
...     zc.async.local.setLiveAnnotation(
...         'zc.async.test.flag', True)
...     zc.async.local.getLiveAnnotation(
...         'zc.async.test.flag', job=other, timeout=0.4, poll=0)
...
>>> job1 = queue.put(waitForParallel)
>>> job2 = queue.put(waitForParallel)
>>> job1.args.append(job2)
>>> job2.args.append(job1)
>>> transaction.commit()
>>> reactor.wait_for(job1, job2)
>>> job1.status == zc.async.interfaces.COMPLETED
True
>>> job2.status == zc.async.interfaces.COMPLETED
True
>>> job1.result is job2.result is None
True

On the other hand, for our serial jobs, we’ll do something that would fail if it were parallel. We’ll rely on quota_names.

Quotas verge on configuration, which is not what this section is about, because they must be configured on the queue. However, they also affect usage, so we show them here.

>>> def pause(other):
...     zc.async.local.setLiveAnnotation(
...         'zc.async.test.flag', True)
...     res = zc.async.local.getLiveAnnotation(
...         'zc.async.test.flag', timeout=0.4, poll=0.1, job=other)
...
>>> job1 = queue.put(pause)
>>> job2 = queue.put(imaginaryNetworkCall)

You can’t put a name in quota_names unless the quota has been created in the queue.

>>> job1.quota_names = ('test',)
Traceback (most recent call last):
...
ValueError: ('unknown quota name', 'test')
>>> queue.quotas.create('test')
>>> job1.quota_names = ('test',)
>>> job2.quota_names = ('test',)

Now we can see the two jobs being performed serially.

>>> job1.args.append(job2)
>>> transaction.commit()
>>> reactor.time_flies(dispatcher.poll_interval)
1
>>> for i in range(10):
...     t = transaction.begin()
...     if job1.status == zc.async.interfaces.ACTIVE:
...         break
...     time.sleep(0.1)
... else:
...     print 'TIME OUT'
...
>>> job2.status == zc.async.interfaces.PENDING
True
>>> job2.annotations['zc.async.test.flag'] = False
>>> transaction.commit()
>>> reactor.wait_for(job1)
>>> reactor.wait_for(job2)
>>> print job1.result
None
>>> print job2.result
200 OK

Quotas can be configured for limits greater than one at a time, if desired. This may be valuable when a needed resource is only available in limited numbers at a time.

Note that, while quotas are valuable tools for doing serialized work such as updating a text index, other optimization features sometimes useful for this sort of task, such as collapsing similar jobs, are not provided directly by this package. This functionality could be trivially built on top of zc.async, however [12].

Returning Jobs

Our examples so far have done work directly. What if the job wants to orchestrate other work? One way this can be done is to return another job. The result of the inner job will be the result of the first job once the inner job is finished. This approach can be used to break up the work of long running processes; to be more cooperative to other jobs; and to make parts of a job that can be parallelized available to more workers.

Serialized Work

First, consider a serialized example. This simple pattern is one approach.

>>> def second_job(value):
...     # imagine a lot of work goes on...
...     return value * 2
...
>>> def first_job():
...     # imagine a lot of work goes on...
...     intermediate_value = 21
...     queue = zc.async.local.getJob().queue
...     return queue.put(zc.async.job.Job(
...         second_job, intermediate_value))
...
>>> job = queue.put(first_job)
>>> transaction.commit()
>>> reactor.wait_for(job, attempts=3)
TIME OUT
>>> reactor.wait_for(job, attempts=3)
>>> job.result
42

The second_job could also have returned a job, allowing for additional legs. Once the last job returns a real result, it will cascade through the past jobs back up to the original one.

A different approach could have used callbacks. Using callbacks can be somewhat more complicated to follow, but can allow for a cleaner separation of code: dividing code that does work from code that orchestrates the jobs. We’ll see an example of the idea below.

Parallelized Work

Now how can we set up parallel jobs? There are other good ways, but we can describe one way that avoids potential problems with the current-as-of-this-writing (ZODB 3.8 and trunk) default optimistic MVCC serialization behavior in the ZODB. The solution uses callbacks, which also allows us to cleanly divide the “work” code from the synchronization code, as described in the previous paragraph.

First, we’ll define the jobs that do work. job_A, job_B, and job_C will be jobs that can be done in parallel, and post_process will be a function that assembles the job results for a final result.

>>> def job_A():
...     # imaginary work...
...     return 7
...
>>> def job_B():
...     # imaginary work...
...     return 14
...
>>> def job_C():
...     # imaginary work...
...     return 21
...
>>> def post_process(*args):
...     # this callable represents one that needs to wait for the
...     # parallel jobs to be done before it can process them and return
...     # the final result
...     return sum(args)
...

Now this code works with jobs to get everything done. Note, in the callback function, that mutating the same object we are checking (job.args) is the way we are enforcing necessary serializability with MVCC turned on.

>>> def callback(job, result):
...     job.args.append(result)
...     if len(job.args) == 3: # all results are in
...         zc.async.local.getJob().queue.put(job)
...
>>> def main_job():
...     job = zc.async.job.Job(post_process)
...     queue = zc.async.local.getJob().queue
...     for j in (job_A, job_B, job_C):
...         queue.put(j).addCallback(
...             zc.async.job.Job(callback, job))
...     return job
...

That may be a bit mind-blowing at first. The trick to catch here is that, because the main_job returns a job, the result of that job will become the result of the main_job once the returned (post_process) job is done.

Now we’ll put this in and let it cook.

>>> job = queue.put(main_job)
>>> transaction.commit()
>>> for i in range(10):
...     reactor.wait_for(job, attempts=3)
...     if job.status == zc.async.interfaces.COMPLETED:
...         break
... else:
...     assert False, 'never completed'
... # doctest: +ELLIPSIS
TIME OUT...
>>> job.result
42

Ta-da!

For real-world usage, you’d also probably want to deal with the possibility of one or more of the jobs generating a Failure, among other edge cases.

Returning Deferreds

What if you want to do work that doesn’t require a ZODB connection? You can also return a Twisted deferred (twisted.internet.defer.Deferred). When you then callback the deferred with the eventual result, the agent will be responsible for setting that value on the original deferred and calling its callbacks. This can be a useful trick for making network calls using Twisted or zc.ngi, for instance.

>>> def imaginaryNetworkCall2(deferred):
...     # make a network call...
...     deferred.callback('200 OK')
...
>>> import twisted.internet.defer
>>> import threading
>>> def delegator():
...     deferred = twisted.internet.defer.Deferred()
...     t = threading.Thread(
...         target=imaginaryNetworkCall2, args=(deferred,))
...     t.run()
...     return deferred
...
>>> job = queue.put(delegator)
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> job.result
'200 OK'

Conclusion

This concludes our discussion of zc.async usage. The next section shows how to configure zc.async without Zope 3 [13].

Configuration without Zope 3

This section discusses setting up zc.async without Zope 3. Since Zope 3 is ill-defined, we will be more specific: this describes setting up zc.async without ZCML, without any zope.app packages, and with as few dependencies as possible. A casual way of describing the dependencies is “ZODB, Twisted and zope.component,” though we directly depend on some smaller packages and indirectly on others [14].

You may have one or two kinds of configurations for your software using zc.async. The simplest approach is to have all processes able both to put items in queues, and to perform them with a dispatcher. You can then use on-the-fly ZODB configuration to determine what jobs, if any, each process’ dispatcher performs. If a dispatcher has no agents in a given queue, as we’ll discuss below, the dispatcher will not perform any job for that queue.

However, if you want to create some processes that can only put items in a queue, and do not have a dispatcher at all, that is easy to do. We’ll call this a “client” process, and the full configuration a “client/server process”. As you might expect, the configuration of a client process is a subset of the configuration of the client/server process.

We will first describe setting up a client, non-dispatcher process, in which you only can put items in a zc.async queue; and then describe setting up a dispatcher client/server process that can be used both to request and to perform jobs.

Configuring a Client Process

Generally, zc.async configuration has four basic parts: component registrations, ZODB setup, ZODB configuration, and process configuration. For a client process, we’ll discuss required component registrations; ZODB setup; minimal ZODB configuration; process configuration; and then circle back around for some optional component registrations.

Required Component Registrations

The required registrations can be installed for you by the zc.async.configure.base function. Most other examples in this package, such as those in the Usage section, use this in their test setup.

Again, for a quick start, you might just want to use the helper zc.async.configure.base function, and move on to the Required ZODB Set Up section below.

Here, though, we will go over each required registration to briefly explain what they are.

You must have three adapter registrations: IConnection to ITransactionManager, IPersistent to IConnection, and IPersistent to ITransactionManager.

The zc.twist package provides all of these adapters. However, zope.app.keyreference also provides a version of the connection adapter that is identical or very similar, and that should work fine if you are already using that package in your application.

>>> import zc.twist
>>> import zope.component
>>> zope.component.provideAdapter(zc.twist.transactionManager)
>>> zope.component.provideAdapter(zc.twist.connection)
>>> import ZODB.interfaces
>>> zope.component.provideAdapter(
...     zc.twist.transactionManager, adapts=(ZODB.interfaces.IConnection,))

We also need to be able to adapt functions and methods to jobs. The zc.async.job.Job class is the expected implementation.

>>> import types
>>> import zc.async.interfaces
>>> import zc.async.job
>>> zope.component.provideAdapter(
...     zc.async.job.Job,
...     adapts=(types.FunctionType,),
...     provides=zc.async.interfaces.IJob)
>>> zope.component.provideAdapter(
...     zc.async.job.Job,
...     adapts=(types.MethodType,),
...     provides=zc.async.interfaces.IJob)
>>> zope.component.provideAdapter( # optional, rarely used
...     zc.async.job.Job,
...     adapts=(zc.twist.METHOD_WRAPPER_TYPE,),
...     provides=zc.async.interfaces.IJob)

The queue looks for the UUID utility to set the assignerUUID job attribute, and may want to use it to optionally filter jobs during claim in the future. Also, the dispatcher will look for a UUID utility if a UUID is not specifically provided to its constructor.

>>> from zc.async.instanceuuid import UUID
>>> zope.component.provideUtility(
...     UUID, zc.async.interfaces.IUUID, '')

The UUID we register here is a UUID of the instance, which is expected to uniquely identify the process when in production. It is stored in the file specified by the ZC_ASYNC_UUID environment variable (or in os.join(os.getcwd(), 'uuid.txt') if this is not specified, for easy initial experimentation with the package).

>>> import uuid
>>> import os
>>> f = open(os.environ["ZC_ASYNC_UUID"])
>>> uuid_hex = f.readline().strip()
>>> f.close()
>>> uuid = uuid.UUID(uuid_hex)
>>> UUID == uuid
True

The uuid.txt file is intended to stay in the instance home as a persistent identifier.

Again, all of the required registrations above can be accomplished quickly with zc.async.configure.base.

Required ZODB Set Up

On a basic level, zc.async needs a setup that supports good conflict resolution. Most or all production ZODB storages now have the necessary APIs to support MVCC.

Of course, if you want to run multiple processes, you need ZEO. You should also then make sure that your ZEO server installation has all the code that includes conflict resolution, such as zc.queue, because, as of this writing, conflict resolution happens in the ZEO server, not in clients.

A more subtle decision is whether to use multiple databases. The zc.async dispatcher can generate a lot of database churn. It may be wise to put the queue in a separate database from your content database(s).

The downsides to this option include the fact that you must be careful to specify to which database objects belong; and that broken cross-database references are not handled gracefully in the ZODB as of this writing.

We will use multiple databases for our example here, because we are trying to demonstrate production-quality examples. We will show this with a pure-Python approach, rather than the ZConfig approach usually used by Zope. If you know ZConfig, that will be a reasonable approach as well; see zope.app.appsetup for how Zope uses ZConfig to set up multidatabases.

In our example, we create two file storages. In production, you might likely use ZEO; hooking ClientStorage up instead of FileStorage should be straight forward.

>>> databases = {}
>>> import ZODB.FileStorage
>>> storage = ZODB.FileStorage.FileStorage(
...     'main.fs', create=True)
>>> async_storage = ZODB.FileStorage.FileStorage(
...     'async.fs', create=True)
>>> from ZODB.DB import DB
>>> databases[''] = db = DB(storage)
>>> databases['async'] = async_db = DB(async_storage)
>>> async_db.databases = db.databases = databases
>>> db.database_name = ''
>>> async_db.database_name = 'async'
>>> conn = db.open()
>>> root = conn.root()

ZODB Configuration

A Queue

All we must have for a client to be able to put jobs in a queue is…a queue.

For a quick start, the zc.async.subscribers module provides a subscriber to a DatabaseOpened event that does the right dance. See multidb_queue_installer and queue_installer in that module, and you can see that in use in Configuration with Zope 3. For now, though, we’re taking things step by step and explaining what’s going on.

Dispatchers look for queues in a mapping off the root of the database in a key defined as a constant: zc.async.interfaces.KEY. This mapping should generally be a zc.async.queue.Queues object.

If we were not using a multi-database for our example, we could simply install the queues mapping with this line: root[zc.async.interfaces.KEY] = zc.async.queue.Queues(). We will need something a bit more baroque. We will add the queues mapping to the ‘async’ database, and then make it available in the main database (‘’) with the proper key.

>>> conn2 = conn.get_connection('async')
>>> import zc.async.queue
>>> queues = conn2.root()['mounted_queues'] = zc.async.queue.Queues()

Note that the ‘mounted_queues’ key in the async database is arbitrary: what we care about is the key in the database that the dispatcher will see.

Now we add the object explicitly to conn2, so that the ZODB will know the “real” database in which the object lives, even though it will be also accessible from the main database.

>>> conn2.add(queues)
>>> root[zc.async.interfaces.KEY] = queues
>>> import transaction
>>> transaction.commit()

Now we need to put a queue in the queues collection. We can have more than one, as discussed below, but we suggest a convention of the primary queue being available in a key of ‘’ (empty string).

>>> queue = queues[''] = zc.async.queue.Queue()
>>> transaction.commit()
Quotas

We touched on quotas in the usage section. Some jobs will need to access resoources that are shared across processes. A central data structure such as an index in the ZODB is a prime example, but other examples might include a network service that only allows a certain number of concurrent connections. These scenarios can be helped by quotas.

Quotas are demonstrated in the usage section. For configuration, you should know these characteristics:

  • you cannot add a job with a quota name that is not defined in the queue [15];

  • you cannot add a quota name to a job in a queue if the quota name is not defined in the queue [16];

  • you can create and remove quotas on the queue [17];

  • you can remove quotas if pending jobs have their quota names–the quota name is then ignored [18];

  • quotas default to a size of 1 [19];

  • this can be changed at creation or later [20]; and

  • decreasing the size of a quota while the old quota size is filled will not affect the currently running jobs [21].

Multiple Queues

Since we put our queues in a mapping of them, we can also create multiple queues. This can make some scenarios more convenient and simpler to reason about. For instance, while you might have agents filtering jobs as we describe above, it might be simpler to say that you have a queue for one kind of job–say, processing a video file or an audio file–and a queue for other kinds of jobs. Then it is easy and obvious to set up simple FIFO agents as desired for different dispatchers. The same kind of logic could be accomplished with agents, but it is easier to picture the multiple queues.

Another use case for multiple queues might be for specialized queues, like ones that broadcast jobs. You could write a queue subclass that broadcasts copies of jobs they get to all dispatchers, aggregating results. This could be used to send “events” to all processes, or to gather statistics on certain processes, and so on.

Generally, any time the application wants to be able to assert a kind of job rather than letting the agents decide what to do, having separate queues is a reasonable tool.

Process Configuration

Daemonization

You often want to daemonize your software, so that you can restart it if there’s a problem, keep track of it and monitor it, and so on. ZDaemon (http://pypi.python.org/pypi/zdaemon) and Supervisor (http://supervisord.org/) are two fairly simple-to-use ways of doing this for both client and client/server processes. If your main application can be packaged as a setuptools distribution (egg or source release or even development egg) then you can have your main application as a zc.async client and your dispatchers running a separate zc.async-only main loop that simply includes your main application as a dependency, so the necessary software is around. You may have to do a bit more configuration on the client/server side to mimic global registries such as zope.component registrations and so on between the client and the client/servers, but this shouldn’t be too bad.

UUID File Location

As discussed above, the instanceuuid module will look for an environmental variable ZC_ASYNC_UUID to find the file name to use, and failing that will use os.join(os.getcwd(), 'uuid.txt'). It’s worth noting that daemonization tools such as ZDaemon and Supervisor (3 or greater) make setting environment values for child processes an easy (and repeatable) configuration file setting.

Optional Component Registrations for a Client Process

The only optional component registration potentially valuable for client instances that only put jobs in the queue is registering an adapter from persistent objects to a queue. The zc.async.queue.getDefaultQueue adapter does this for an adapter to the queue named ‘’ (empty string). Since that’s what we have from the ZODB Configuration above section, we’ll register it. Writing your own adapter is trivial, as you can see if you look at the implementation of this function.

>>> zope.component.provideAdapter(zc.async.queue.getDefaultQueue)
>>> zc.async.interfaces.IQueue(root) is queue
True

Configuring a Client/Server Process

Configuring a client/server process–something that includes a running dispatcher–means doing everything described above, plus a bit more. You need to set up and start a reactor and dispatcher; configure agents as desired to get the dispatcher to do some work; and optionally configure logging.

For a quick start, the zc.async.subscribers module has some conveniences to start a threaded reactor and dispatcher, and to install agents. You might want to look at those to get started. They are also used in the Zope 3 configuration (README_3). Meanwhile, this document continues to go step-by-step instead, to try and explain the components and configuration.

Even though it seems reasonable to first start a dispatcher and then set up its agents, we’ll first define a subscriber to create an agent. As we’ll see below, the dispatcher fires an event when it registers with a queue, and another when it activates the queue. These events give you the opportunity to register subscribers to add one or more agents to a queue, to tell the dispatcher what jobs to perform. zc.async.agent.addMainAgentActivationHandler is a reasonable starter: it adds a single agent named ‘main’ if one does not exist. The agent has a simple indiscriminate FIFO policy for the queue. If you want to write your own subscriber, look at this, or at the more generic subscriber in the zc.async.subscribers module.

Agents are an important part of the ZODB configuration, and so are described more in depth below.

>>> import zc.async.agent
>>> zope.component.provideHandler(
...     zc.async.agent.addMainAgentActivationHandler)

This subscriber is registered for the IDispatcherActivated event; another approach might use the IDispatcherRegistered event.

Starting the Dispatcher

Now we can start the reactor, and start the dispatcher. In some applications this may be done with an event subscriber to DatabaseOpened, as is done in zc.async.subscribers. Here, we will do it inline.

Any object that conforms to the specification of zc.async.interfaces.IReactor will be usable by the dispatcher. For our example, we will use our own instance of the Twisted select-based reactor running in a separate thread. This is separate from the Twisted reator installed in twisted.internet.reactor, and so this approach can be used with an application that does not otherwise use Twisted (for instance, a Zope application using the “classic” zope publisher).

The testing module also has a reactor on which the Usage section relies, if you would like to see a minimal contract.

Configuring the basics is fairly simple, as we’ll see in a moment. The trickiest part is to handle signals cleanly. It is also optional! The dispatcher will eventually figure out that there was not a clean shut down before and take care of it. Here, though, essentially as an optimization, we install signal handlers in the main thread using reactor._handleSignals. reactor._handleSignals may work in some real-world applications, but if your application already needs to handle signals you may need a more careful approach. Again, see zc.async.subscribers for some options you can explore.

>>> import twisted.internet.selectreactor
>>> reactor = twisted.internet.selectreactor.SelectReactor()
>>> reactor._handleSignals()

Now we are ready to instantiate our dispatcher.

>>> dispatcher = zc.async.dispatcher.Dispatcher(db, reactor)

Notice it has the uuid defined in instanceuuid.

>>> dispatcher.UUID == UUID
True

Now we can start the reactor and the dispatcher in a thread.

>>> import threading
>>> def start():
...     dispatcher.activate()
...     reactor.run(installSignalHandlers=0)
...
>>> thread = threading.Thread(target=start)
>>> thread.setDaemon(True)
>>> thread.start()

The dispatcher should be starting up now. Let’s wait for it to activate. We’re using a test convenience, get_poll, defined in the testing module.

>>> from zc.async.testing import get_poll
>>> poll = get_poll(dispatcher, 0)

We’re off! The events have been fired for registering and activating the dispatcher. Therefore, our subscriber to add our agent has fired.

We need to begin our transaction to synchronize our view of the database.

>>> t = transaction.begin()

We get the collection of dispatcher agents from the queue, using the UUID.

>>> dispatcher_agents = queue.dispatchers[UUID]

It has one agent–the one placed by our subscriber.

>>> dispatcher_agents.keys()
['main']
>>> agent = dispatcher_agents['main']

Now we have our agent! But…what is it [22]?

Agents

Agents are the way you control what a dispatcher’s worker threads do. They pick the jobs and assign them to their dispatcher when the dispatcher asks.

If a dispatcher does not have any agents in a give queue, it will not perform any tasks for that queue.

We currently have an agent that simply asks for the next available FIFO job. We are using an agent implementation that allows you to specify a callable to choose the job. That callable is now zc.async.agent.chooseFirst.

>>> agent.chooser is zc.async.agent.chooseFirst
True

Here’s the entire implementation of that function:

def chooseFirst(agent):
    return agent.queue.claim()

What would another agent do? Well, it might pass a filter function to claim. This function takes a job and returns a value evaluated as a boolean. For instance, let’s say we always wanted a certain number of threads available for working on a particular call; for the purpose of example, we’ll use operator.mul, though a more real-world example might be a network call or a particular call in your application.

>>> import operator
>>> def chooseMul(agent):
...     return agent.queue.claim(lambda job: job.callable is operator.mul)
...

Another variant would prefer operator.mul, but if one is not in the queue, it will take any.

>>> def preferMul(agent):
...     res = agent.queue.claim(lambda job: job.callable is operator.mul)
...     if res is None:
...         res = agent.queue.claim()
...     return res
...

Other approaches might look at the current jobs in the agent, or the agent’s dispatcher, and decide what jobs to prefer on that basis. The agent should support many ideas.

Let’s set up another agent, in addition to the chooseFirst one, that has the preferMul policy.

>>> agent2 = dispatcher_agents['mul'] = zc.async.agent.Agent(preferMul)

Another characteristic of agents is that they specify how many jobs they should pick at a time. The dispatcher actually adjusts the size of the ZODB connection pool to accommodate its agents’ size. The default is 3.

>>> agent.size
3
>>> agent2.size
3

We can change that at creation or later.

Finally, it’s worth noting that agents contain the jobs that are currently worked on by the dispatcher, on their behalf; and have a completed collection of the more recent completed jobs, beginning with the most recently completed job.

Logging and Monitoring

Logs are sent to the zc.async.events log for big events, like startup and shutdown, and errors. Poll and job logs are sent to zc.async.trace. Confugure the standard Python logging module as usual to send these logs where you need. Be sure to auto-rotate the trace logs.

The package supports monitoring using zc.z3monitor, but using this package includes more Zope 3 dependencies, so it is not included here. If you would like to use it, see monitor.txt in the package and our next section: Configuration with Zope 3. Otherwise, if you want to roll your own monitoring, glance at monitor.py–you’ll see that most of the heavy lifting for the monitor support is done in the dispatcher, so it should be pretty easy to hook up the basic data another way.

>>> reactor.stop()

Configuration with Zope 3

Our last main section can be the shortest yet, both because we’ve already introduced all of the main concepts, and because we will be leveraging conveniences to automate much of the configuration shown in the section discussing configuration without Zope 3.

Client Set Up

If you want to set up a client alone, without a dispatcher, include the egg in your setup.py, include the configure.zcml in your applications zcml, make sure you share the database in which the queues will be held, and make sure that either the zope.app.keyreference.persistent.connectionOfPersistent adapter is registered, or zc.twist.connection.

That should be it.

Client/Server Set Up

For a client/server combination, use zcml that is something like the basic_dispatcher_policy.zcml, make sure you have access to the database with the queues, configure logging and monitoring as desired, configure the ZC_ASYNC_UUID environmental variable in zdaemon.conf if you are in production, and start up! Getting started is really pretty easy. You can even start a dispatcher-only version by not starting any servers in zcml.

In comparison to the non-Zope 3 usage, an important difference in your setup.py is that, if you want the full set up described below, including zc.z3monitor, you’ll need to specify “zc.async [z3]” as the desired package in your install_requires, as opposed to just “zc.async” [23].

We’ll look at this by making a zope.conf-alike and a site.zcml-alike. We’ll need a place to put some files, so we’ll use a temporary directory. This, and the comments in the files that we set up, are the primary differences between our examples and a real set up.

We’ll do this in two versions. The first version uses a single database, as you might do to get started quickly, or for a small site. The second version has one database for the main application, and one database for the async data, as will be more appropriate for typical production usage.

Shared Single Database Set Up

As described above, using a shared single database will probably be the quickest way to get started. Large-scale production usage will probably prefer to use the Two Database Set Up described later.

So, without further ado, here is the text of our zope.conf-alike, and of our site.zcml-alike [24].

>>> zope_conf = """
... site-definition %(site_zcml_file)s
...
... <zodb main>
...   <filestorage>
...     create true
...     path %(main_storage_path)s
...   </filestorage>
... </zodb>
...
... <product-config zc.z3monitor>
...   port %(monitor_port)s
... </product-config>
...
... <logger>
...   level debug
...   name zc.async
...   propagate no
...
...   <logfile>
...     path %(async_event_log)s
...   </logfile>
... </logger>
...
... <logger>
...   level debug
...   name zc.async.trace
...   propagate no
...
...   <logfile>
...     path %(async_trace_log)s
...   </logfile>
... </logger>
...
... <eventlog>
...   <logfile>
...     formatter zope.exceptions.log.Formatter
...     path STDOUT
...   </logfile>
...   <logfile>
...     formatter zope.exceptions.log.Formatter
...     path %(event_log)s
...   </logfile>
... </eventlog>
... """ % {'site_zcml_file': site_zcml_file,
...        'main_storage_path': os.path.join(dir, 'main.fs'),
...        'async_storage_path': os.path.join(dir, 'async.fs'),
...        'monitor_port': monitor_port,
...        'event_log': os.path.join(dir, 'z3.log'),
...        'async_event_log': os.path.join(dir, 'async.log'),
...        'async_trace_log': os.path.join(dir, 'async_trace.log'),}
...

In a non-trivial production system of you will also probably want to replace the file storage with a <zeoclient> stanza.

Also note that an open monitor port should be behind a firewall, of course.

We’ll assume that zdaemon.conf has been set up to put ZC_ASYNC_UUID in the proper place too. It would have looked something like this in the zdaemon.conf:

<environment>
  ZC_ASYNC_UUID /path/to/uuid.txt
</environment>

(Other tools, such as supervisor, also can work, of course; their spellings are different and are “left as an exercise to the reader” at the moment.)

We’ll do that by hand:

>>> os.environ['ZC_ASYNC_UUID'] = os.path.join(dir, 'uuid.txt')

Now let’s define our site-zcml-alike.

>>> site_zcml = """
... <configure xmlns='http://namespaces.zope.org/zope'
...            xmlns:meta="http://namespaces.zope.org/meta"
...            >
... <include package="zope.component" file="meta.zcml" />
... <include package="zope.component" />
... <include package="zc.z3monitor" />
... <include package="zc.async" file="basic_dispatcher_policy.zcml" />
...
... <!-- this is usually handled in Zope applications by the
...      zope.app.keyreference.persistent.connectionOfPersistent adapter -->
... <adapter factory="zc.twist.connection" />
... </configure>
... """

Now we’re done.

If we process these files, and wait for a poll, we’ve got a working set up [25].

>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> import pprint
>>> pprint.pprint(get_poll(dispatcher, 0))
{'': {'main': {'active jobs': [],
               'error': None,
               'len': 0,
               'new jobs': [],
               'size': 3}}}
>>> bool(dispatcher.activated)
True

We can ask for a job to be performed, and get the result.

>>> conn = db.open()
>>> root = conn.root()
>>> import zc.async.interfaces
>>> queue = zc.async.interfaces.IQueue(root)
>>> import operator
>>> import zc.async.job
>>> job = queue.put(zc.async.job.Job(operator.mul, 21, 2))
>>> import transaction
>>> transaction.commit()
>>> wait_for_result(job)
42

We can connect to the monitor server with telnet.

>>> import telnetlib
>>> tn = telnetlib.Telnet('127.0.0.1', monitor_port)
>>> tn.write('async status\n') # immediately disconnects
>>> print tn.read_all() # doctest: +ELLIPSIS
{
    "poll interval": {
        "seconds": ...
    },
    "status": "RUNNING",
    "time since last poll": {
        "seconds": ...
    },
    "uptime": {
        "seconds": ...
    },
    "uuid": "..."
}
<BLANKLINE>

Now we’ll “shut down” with a CTRL-C, or SIGINT, and clean up.

>>> import signal
>>> if getattr(os, 'getpid', None) is not None: # UNIXEN, not Windows
...     pid = os.getpid()
...     try:
...         os.kill(pid, signal.SIGINT)
...     except KeyboardInterrupt:
...         if dispatcher.activated:
...             assert False, 'dispatcher did not deactivate'
...     else:
...         print "failed to send SIGINT, or something"
... else:
...     dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
...     for i in range(30):
...         if not dispatcher.activated:
...             break
...         time.sleep(0.1)
...     else:
...         assert False, 'dispatcher did not deactivate'
...
>>> import transaction
>>> t = transaction.begin() # sync
>>> import zope.component
>>> import zc.async.interfaces
>>> uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
>>> da = queue.dispatchers[uuid]
>>> bool(da.activated)
False
>>> db.close()
>>> import shutil
>>> shutil.rmtree(dir)

These instructions are very similar to the Two Database Set Up.

Two Database Set Up

Even though it is a bit more trouble to set up, large-scale production usage will probably prefer to use this approach, over the shared single database described above.

For our zope.conf, we only need one additional stanza to the one seen above:

<zodb async>
  <filestorage>
    create true
    path REPLACE_THIS_WITH_PATH_TO_STORAGE
  </filestorage>
</zodb>

(You would replace “REPLACE_THIS_WITH_PATH_TO_STORAGE” with the path to the storage file.)

As before, you will probably prefer to use ZEO rather than FileStorage in production

The zdaemon.conf instructions are the same: set the ZC_ASYNC_UUID environment variable properly in the zdaemon.conf file.

For our site.zcml, the only difference is that we use the multidb_dispatcher_policy.zcml file rather than the basic_dispatcher_policy.zcml file.

If you want to change policy, change “multidb_dispatcher_policy.zcml” to “dispatcher.zcml” in the example above and register your replacement bits for the policy in “multidb_dispatcher_policy.zcml”. You’ll see that most of that comes from code in subscribers.py, which can be adjusted easily.

If we process the files described above, and wait for a poll, we’ve got a working set up [26].

>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> import pprint
>>> pprint.pprint(get_poll(dispatcher, 0))
{'': {'main': {'active jobs': [],
               'error': None,
               'len': 0,
               'new jobs': [],
               'size': 3}}}
>>> bool(dispatcher.activated)
True

As before, we can ask for a job to be performed, and get the result.

>>> conn = db.open()
>>> root = conn.root()
>>> import zc.async.interfaces
>>> queue = zc.async.interfaces.IQueue(root)
>>> import operator
>>> import zc.async.job
>>> job = queue.put(zc.async.job.Job(operator.mul, 21, 2))
>>> import transaction
>>> transaction.commit()
>>> wait_for_result(job)
42

Hopefully zc.async will be an easy-to-configure, easy-to-use, and useful tool for you! Good luck! [27]

Changes

1.2 (2008-06-20)

  • made the log for finding an activated agent report the pertinent queue’s oid as an unpacked integer, rather than the packed string blob. Use ZODB.utils.p64 to convert back to an oid that the ZODB will recognize.

  • Bugfix: in failing a job, the job thought it was in its old agent, and the fail call failed. This is now tested by the first example in new doctest catastrophes.txt.

  • jobs no longer default to a begin_by value of one hour after the begin_after. The default now is no limit.

  • Made dispatcher much more robust to transaction errors and ZEO ClientDisconnected errors.

  • Jobs now use an IRetryPolicy to decide what to do on failure within a job, within the commit of the result, and if the job is interrupted. This allows support of transactional jobs, transactional jobs that critically must be run to completion, and non-transactional jobs such as communicating with an external service.

  • The default retry policy supports retries for ClientDisconnected errors, transaction errors, and interruptions.

  • job.txt has been expanded significantly to show error handling and the use of retry policies. New file catastrophes.txt shows handling of other catastrophes, such as interruptions to polling.

  • job errors now go in the main zc.async.event log rather than in the zc.async.trace log. Successes continue to go in the trace log.

  • callback failures go to the main log as a CRITICAL error, by default.

  • handleInterrupt is the new protocol on jobs to inform them that they were active in a dispatcher that is now dead. They either fail or reschedule, depending on the associated IRetryPolicy for the job. If they reschedule, this should either be a datetime or timedelta. The job calls the agent’s reschedule method. If the timedelta is empty or negative, or the datetime is earlier than now, the job is put back in the queue with a new putBack method on the queue. This is intended to be the opposite of claim. Jobs put in the queue with putBack will be pulled out before any others.

  • convert to using zope.minmax rather than locally defined Atom.

  • Fix (and simplify) last_ping code so as to reduce unnecessarily writing the state of the parent DispatcherAgents collection to the database whenever the atom changed.

  • Depends on new release of zc.twist (1.3)

  • Switched dispatcher’s in-memory storage of job and poll information to be per job or per poll, respectively, rather than per time period, so as to try and make memory usage more predictable (for instance, whether a dispatcher is whipping through lots of jobs quickly, or doing work more slowly).

1.1.1 (2008-05-14)

  • more README tweaks.

  • converted all reports from the dispatcher, including the monitor output, to use “unpacked” integer oids. This addresses a problem that simplejson was having in trying to interpret the packed string blobs as unicode, and then making zc.ngi fall over. To get the object, then, you’ll need to use ZODB.utils.p64, like this: connection.get(ZODB.utils.p64(INTEGER_OID)), where INTEGER_OID indicates the integer oid of the object you want to examine.

  • added several more tests for the monitor code.

  • made the async jobs monitor command be “up to the minute”. Before, it included all of the new and active jobs from the previous poll; now, it also filters out those that have since completed.

  • The async job command was broken, as revealed by a new monitor test. Fixed, which also means we need a new version of zope.bforest (1.2) for a new feature there.

1.1 (2008-04-24)

  • Fired events when the IQueues and IQueue objects are installed by the QueueInstaller (thanks to Fred Drake).

  • Dispatchers make agent threads keep their connections, so each connection’s object cache use is optimized if the agent regularly requests jobs with the same objects.

  • README improved (thanks to Benji York and Sebastian Ware).

  • Callbacks are logged at start in the trace log.

  • All job results (including callbacks) are logged, including verbose tracebacks if the callback generated a failure.

  • Had the ThreadedDispatcherInstaller subscriber stash the thread on the dispatcher, so you can shut down tests like this:

    >>> import zc.async.dispatcher
    >>> dispatcher = zc.async.dispatcher.get()
    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
    >>> dispatcher.thread.join(3)
    
  • Added getQueue to zc.async.local as a convenience (it does what you could already do: zc.async.local.getJob().queue).

  • Clarified that IQueue.pull is the approved way of removing scheduled jobs from a queue in interfaces and README.

  • reports in the logs of a job’s success or failure come before callbacks are started.

  • Added a section showing how the basic_dispatcher_policy.zcml worked, which then pushed the former README_3 examples into README_3b.

  • Put ZPL everywhere I was supposed to.

  • Moved a number of helpful testing functions out of footnotes and into zc.async.testing, both so that zc.async tests don’t have to redefine them and client packages can reuse them.

1.0 (2008-04-09)

Initial release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zc.async-1.2.tar.gz (171.2 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page