Mixing Twisted and ZODB
Twist: Talking to the ZODB in Twisted Reactor Calls
The twist package contains a few functions and classes, but primarily a helper for having a deferred call on a callable persistent object, or on a method on a persistent object. This lets you have a Twisted reactor call or a Twisted deferred callback affect the ZODB. Everything can be done within the main thread, so it can be full-bore Twisted usage, without threads. There are a few important “gotchas”: see the Gotchas section below for details.
The main API is Partial. You can pass it a callable persistent object, a method of a persistent object, or a normal non-persistent callable, and any arguments or keyword arguments of the same sort. DO NOT use non-persistent data structures (such as lists) of persistent objects with a database connection as arguments. This is your responsibility.
If nothing is persistent, the partial will not bother to get a connection, and will behave normally.
>>> from zc.twist import Partial >>> def demo(): ... return 42 ... >>> Partial(demo)() 42
Now let’s imagine a demo object that is persistent and part of a database connection. It has a count attribute that starts at 0, a __call__ method that increments count by an amount that defaults to 1, and an decrement method that reduces count by an amount that defaults to 1 . Everything returns the current value of count.
>>> demo.count 0 >>> demo() 1 >>> demo(2) 3 >>> demo.decrement() 2 >>> demo.decrement(2) 0 >>> import transaction >>> transaction.commit()
Now we can make some deferred calls with these examples. We will use transaction.begin() to sync our connection with what happened in the deferred call. Note that we need to have some adapters set up for this to work. The twist module includes implementations of them that we will also assume have been installed .
>>> call = Partial(demo) >>> demo.count # hasn't been called yet 0 >>> deferred = call() >>> demo.count # we haven't synced yet 0 >>> t = transaction.begin() # sync the connection >>> demo.count # ah-ha! 1
We can use the deferred returned from the call to do something with the return value. In this case, the deferred is already completed, so adding a callback gets instant execution.
>>> def show_value(res): ... print res ... >>> ignore = deferred.addCallback(show_value) 1
We can also pass the method.
>>> call = Partial(demo.decrement) >>> deferred = call() >>> demo.count 1 >>> t = transaction.begin() >>> demo.count 0
This also works for slot methods.
>>> import BTrees >>> tree = root['tree'] = BTrees.family32.OO.BTree() >>> transaction.commit() >>> call = Partial(tree.__setitem__, 'foo', 'bar') >>> deferred = call() >>> len(tree) 0 >>> t = transaction.begin() >>> tree['foo'] 'bar'
Arguments are passed through.
>>> call = Partial(demo) >>> deferred = call(2) >>> t = transaction.begin() >>> demo.count 2 >>> call = Partial(demo.decrement) >>> deferred = call(amount=2) >>> t = transaction.begin() >>> demo.count 0
They can also be set during instantiation.
>>> call = Partial(demo, 3) >>> deferred = call() >>> t = transaction.begin() >>> demo.count 3 >>> call = Partial(demo.decrement, amount=3) >>> deferred = call() >>> t = transaction.begin() >>> demo.count 0
Arguments themselves can be persistent objects. Let’s assume a new demo2 object as well.
>>> demo2.count 0 >>> def mass_increment(d1, d2, value=1): ... d1(value) ... d2(value) ... >>> call = Partial(mass_increment, demo, demo2, value=4) >>> deferred = call() >>> t = transaction.begin() >>> demo.count 4 >>> demo2.count 4 >>> demo.count = demo2.count = 0 # cleanup >>> transaction.commit()
ConflictErrors make it retry.
In order to have a chance to simulate a ConflictError, this time imagine we have a runner that can switch execution from the call to our code using pause, retry and resume (this is just for tests–remember, calls used in non-threaded Twisted should be non-blocking!) .
>>> import sys >>> demo.count 0 >>> call = Partial(demo) >>> runner = Runner(call) # it starts paused in the middle of an attempt >>> call.attempt_count 1 >>> demo.count = 5 # now we will make a conflicting transaction... >>> transaction.commit() >>> runner.retry() >>> call.attempt_count # so it has to retry 2 >>> t = transaction.begin() >>> demo.count # our value hasn't changed... 5 >>> runner.resume() # but now call will be successful on the second attempt >>> call.attempt_count 2 >>> t = transaction.begin() >>> demo.count 6
By default, after five ConflictError retries, the partial fails, raising the last ConflictError. This is returned to the deferred. The failure put on the deferred will have a sanitized traceback. Here, imagine we have a deferred (named deferred) created from such a an event .
>>> res = None >>> def get_result(r): ... global res ... res = r # we return None to quiet Twisted down on the command line ... >>> d = deferred.addErrback(get_result) >>> print res.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE Traceback (most recent call last): ... ZODB.POSException.ConflictError: database conflict error...
You can control how many ConflictError (and other transaction error) retries should be performed by setting the max_transaction_errors attribute .
Other errors are returned to the deferred, like a transaction error that has exceeded its available retries, as sanitized failures.
>>> call = Partial(demo) >>> d = call('I do not add well with integers') >>> d = d.addErrback(get_result) >>> print res.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE Traceback (most recent call last): ... ...TypeError: unsupported operand type(s) for +=: 'int' and 'str'
The failure is sanitized in that the traceback is gone and the frame values are turned in to reprs. If you pickle the failure then it truncates the reprs to a maximum of 20 characters plus “[…]” to indicate the truncation .
The call tries to be a good connection citizen, waiting for a connection if the pool is at its maximum size. This code relies on the twisted reactor; we’ll use a time_flies function, which takes seconds to move ahead, to simulate time passing in the reactor.
We use powers of 2 for the floating-points numbers (e.g. 0.125) to avoid a floating-point additive accumulation error that happened in the tests when values such as 0.1 were used.
>>> db.setPoolSize(1) >>> db.getPoolSize() 1 >>> demo.count = 0 >>> transaction.commit() >>> call = Partial(demo) >>> res = None >>> deferred = call() >>> d = deferred.addCallback(get_result) >>> call.attempt_count 0 >>> time_flies(.125) >= 1 # returns number of connection attempts True >>> call.attempt_count 0 >>> res # None >>> db.setPoolSize(2) >>> db.getPoolSize() 2 >>> time_flies(.25) >= 1 True >>> call.attempt_count > 0 True >>> res 1 >>> t = transaction.begin() >>> demo.count 1
If it takes more than a second or two, it will eventually just decide to grab one. This behavior may change.
>>> db.setPoolSize(1) >>> db.getPoolSize() 1 >>> call = Partial(demo) >>> res = None >>> deferred = call() >>> d = deferred.addCallback(get_result) >>> call.attempt_count 0 >>> time_flies(.125) >= 1 True >>> call.attempt_count 0 >>> res # None >>> time_flies(2) >= 2 # for a total of at least 3 True >>> res 2 >>> t = transaction.begin() >>> demo.count 2
Without a running reactor, this functionality will not work .
You can also specify a reactor for the partial using setReactor, if you don’t want to use the standard one installed by twisted in twisted.internet.reactor. 
For a certain class of jobs, you won’t have to think much about using the twist Partial. For instance, if you are putting a result gathered by work done by deferreds into the ZODB, and that’s it, everything should be pretty simple. However, unfortunately, you have to think a bit harder for other common use cases.
As already mentioned, do not use arguments that are non-persistent collections (or even persistent objects without a connection) that hold any persistent objects with connections.
Using persistent objects with connections but that have not been committed to the database will cause problems when used (as callable or argument), perhaps intermittently (if a commit happens before the partial is called, it will work). Don’t do this.
Do not return values that are persistent objects tied to a connection.
If you plan on firing off another reactor call on the basis of your work in the callable, realize that the work hasn’t really “happened” until you commit the transaction. The partial typically handles commits for you, committing if you return any result and aborting if you raise an error. But if you want to send off a reactor call on the basis of a successful transaction, you’ll want to (a) do the work, then (b) commit, then (c) send off the reactor call. If the commit fails, you’ll get the standard abort and retry.
If you want to handle your own transactions, do not use the thread transaction manager that you get from importing transaction. This will cause intermittent, hard-to-debug, unexpected problems. Instead, adapt any persistent object you get to transaction.interfaces.ITransactionManager, and use that manager for commits and aborts.
If it takes more than a second or two, it will eventually just decide to grab one. This behavior may change.
>>> db.setPoolSize(1) >>> db.getPoolSize() 1 >>> call = Partial(demo).setReactor(faux) >>> res = None >>> deferred = call() >>> d = deferred.addCallback(get_result) >>> call.attempt_count 0 >>> time_flies(.125) >= 1 True >>> call.attempt_count 0 >>> res # None >>> time_flies(2) >= 2 # for a total of at least 3 True >>> res 2 >>> t = transaction.begin() >>> demo.count 2
Added missing import of twisted.python.failure.
Use db.pool with ZODB >= 3.9 and db._pools with ZODB < 3.9. The tests now pass with ZODB 3.9.
Tests pass in Python 2.6.
Handle ZEO.Exceptions.ClientDisconnected errors: retry forever, with a backoff, defaulting to a max backoff of 60 seconds.
Make number of times that ConflictErrors are retried configurable.
New subclass of twisted.python.failure.Failure begins with only reprs, and it pickles to exclude the stack, exclude the global vars in the frames, and truncate the reprs of the local vars in the frames. The goal is to keep the pickle size of Failures down to a manageable size. sanitize now uses this class.
Now depends on twisted 8.0.1 or higher, which is newly setuptools compatible. The twisted build is a little frightening, at least with Py2.4, with multiple warnings and errors, but works. The dependency change is the reason for the bump to 1.1; this release has no new features.
setup.py uses os.path
C extension uses older comment style and has less cruft.
Bugfix: if you passed a slot method like a BTree.__setitem__, bad things would happen.
Add ability to specify an alternate reactor
Use bootstrap external
use zc.twisted, not twisted in setup.py, until twisted is setuptools-friendly
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.