producer/consumer with exception handling
Proconex is a module to simplify the implementation of the producer/consumer idiom. In addition to simple implementations based on Python’s Queue.Queue, proconex also takes care of exceptions raised during producing or consuming items and ensures that all the work shuts down in a clean manner without leaving zombie threads.
In order to use proconex, we need a few preparations.
First, set up Python’s logging:
>>> import logging >>> logging.basicConfig(level=logging.INFO)
In case you want to use the
with statement to clean up and still use Python
2.5, you need to import it:
>>> from __future__ import with_statement
And finally, we of course need to import proconex itself:
>>> import proconex
Here is a simple producer that reads lines from a file:
>>> class LineProducer(proconex.Producer): ... def __init__(self, fileToReadPath): ... super(LineProducer, self).__init__() ... self._fileToReadPath = fileToReadPath ... def items(self): ... with open(self._fileToReadPath, 'rb') as fileToRead: ... for lineNumber, line in enumerate(fileToRead, start=1): ... yield (lineNumber, line.rstrip('\n\r'))
The constructor can take any parameters you need to set up the producer. In this case, all we need is the path to the file to read, fileToReadPath. The constructor simply stores the value in an attribute for later reference.
The function items() typically is implemented as generator and yields the produced items one after another until there are no more items to produce. In this case, we just return the file line by line as a tuple of line number and line contents without trailing newlines.
Next, we need a consumer. Here is a simple one that processes the lines read by the producer above and prints its number and text:
>>> class LineConsumer(proconex.Consumer): ... def consume(self, item): ... lineNumber, line = item ... if "self" in line: ... print u"line %d: %s" % (lineNumber, line)
With classes for producer and consumer defined, we can create a producer and a list of consumers:
>>> producer = LineProducer(__file__) >>> consumers = [LineConsumer("consumer#%d" % consumerId) ... for consumerId in xrange(3)]
To actually start the production process, we need a worker to control the producer and consumers:
>>> with proconex.Worker(producer, consumers) as lineWorker: ... lineWorker.work() # doctest: +ELLIPSIS line ...
The with statement makes sure that all threads are terminated once the worker finished or failed. Alternatively you can use try ... except ... finally to handle errors and cleanup:
>>> producer = LineProducer(__file__) >>> consumers = [LineConsumer("consumer#%d" % consumerId) ... for consumerId in xrange(3)] >>> lineWorker = proconex.Worker(producer, consumers) >>> try: ... lineWorker.work() ... except Exception, error: ... print error ... finally: ... lineWorker.close() # doctest: +ELLIPSIS line ...
Additionally to Worker there also is a Converter, which not only produces and consumes items but also yields converted items. While Converter``s use the same ``Producer``s as ``Worker``s, they require different consumers based on ``ConvertingConsumer. Such a consumer has a addItem() which consume() should use to add the converted item.
Here is an example for a consumer that converts consumed integer numbers to their square value:
>>> class SquareConvertingIntegerConsumer(proconex.ConvertingConsumer): ... def consume(self, item): ... self.addItem(item * item)
A fitting producer for integer numbers between 0 and 4 is:
>>> class IntegerProducer(proconex.Producer): ... def items(self): ... for item in xrange(5): ... yield item
Combining these in a converter, we get:
>>> with proconex.Converter(IntegerProducer("producer"), ... SquareConvertingIntegerConsumer("consumer")) as converter: ... for item in converter.items(): ... print item 0 1 4 9 16
When using proconex, there are a few things you should be aware of:
- Due to Python’s Global Interpreter Lock (GIL), at least one of producer and
- consumer should be I/O bound in order to allow thread switches.
- The code contains a few polling loops because Queue does
- not support canceling
put(). The polling does not drain the CPU because it uses a timeout when waiting for events to happen. Still, there is room for improvement and contributions are welcome.
- The only way to recover from errors during production is to restart the
- whole process from the beginning.
If you need more flexibility and control than proconex offers, try celery.
Proconex is distributed under the GNU Lesser General Public License, version 3 or later.
The source code is available from <https://github.com/roskakori/proconex>.
Version 0.4, 2012-04-14
- Fixed occasional premature termination of Converter which could lead to the consumers ignoring the last few items put on the queue by the producer.
Version 0.3, 2012-01-06
- Added Converter class, which is similar to Worker but expects consumers to yield results the caller can process.
- Changed exceptions raised by producer and consumer to preserve their stack trace when passed to the Worker or Converter.
Version 0.2, 2012-01-04
- Added support for multiple producers.
- Added limit for queue size. By default it is twice the number of consumers.
Version 0.1, 2012-01-03
- Initial public release.