Skip to main content

Wrapper of threading module providing Actor interface.

Project description

Wrapper of threading module providing Actor interface.

This module provides decorator to make function and method run in background thread, and thread pool class to pool worker threads. The caller thread can retrieve the return value or the unhandled exception in the background thread using the future object. The finished threads are joined by garbage collection.

Requirements

  • Python 2.6 or 2.7 or 3.2 or later.

  • Unix or Linux platforms which support python thread module.

Tested

  • 2.6.9

  • 2.7.9

  • 3.2.6

  • 3.3.6

  • 3.4.2

Setup

  • Install from pip

    $ sudo pip install thread_utils
  • Install from git.

    $ git clone https://github.com/wbcchsyn/python-thread_utils.git
    $ cd python-thread_utils
    $ sudo python setup.py install

Usage

This module defines the following functions and classes.

thread_utils.actor(daemon=True)

Decorator to create a worker thread and to invoke the callable there.

The decorated callable returns a Future object immediately and invoked callable starts to run in worker thread. If argument `daemon' is True, the worker thread will be daemonic; otherwise not. Python program exits when only daemon threads are left.

In the following example, function sleep_sort print positive numbers in asending order. The main thread will terminate soon, however workers display numbers after that.

"""
Print numbers in asending order using non daemonic workers.
The main thread will terminate soon and after that workers do each task.
"""

import thread_utils
import time

@thread_utils.actor(daemon=False)
def _sleep_print(n):
    time.sleep(n)
    print n

def sleep_sort(un_sorted_list):
    """
    Print positive numbers in asending order.
    """

    for i in un_sorted_list:
        _sleep_print(i)

sleep_sort([3,1,4,2]) # Numbers are displayed in asending this order.

The decorated callable returns a Future object immediately; it monitors invoked callable progress and stores the result. The foreground thread can retrieve the result of invoked callable through the future object like as follows.

import thread_utils
import time

@thread_utils.actor(daemon=True)
def add(m, n):
    time.sleep(m)
    return m + n

future = add(3, 5)
print "Task started"
print future.receive() # Blocks for 3 seconds and display "8".

See Future Objects for more information abaout it.

This function decorates only function and method. In case of classmethod or staticmethod, decorating with this method before make classmethod or staticmethod.

import thread_utils

class Foo(object):
    @classmethod
    @thread_utils.actor(daemon=False)
    def foo(cls):
        pass

This decorator doesn’t affect to thread safty, so it depends only on the invoked callable whether the decorated will be thread safe or not.

thread_utils.async(daemon=True)

Alias to thread_utils.actor

thread_utils.synchronized

Decorator to restrict from simultaneous access from 2 or more than 2 threads.

Decorated callable can be accessible from only one thread. If 2 or more than 2 threads try calling at the same time, only the 1st thread starts to run and the others are blocked. It is after the 1st thread finishes when 2nd threads starts to run.

import thread_utils
import time

@thread_utils.synchronized
def foo():
    time.sleep(1)

@thread_utils.async(daemon=False)
def create_worker():
    print "Worker is started."
    foo()
    print "Worker is finished."


# Text "Worker is started." will be printed 10 times at once.
# On the other hand "Worker is finished." will be printed every second.
for i in xrange(10):
    create_worker()

This function decorates only functino or method. In case of classmethod or staticmethod, decorating with this method before make classmethod or staticmethod.

class Foo(object):
    @staticmethod
    @thread_utils.synchronized
    def foo():
        pass

Future Objects

This class monitors associated callable progress and stores its return value or unhandled exception. Future.is_finished() returns whether the invoked callable is finished or not. Future.receive(timeout=None) blocks until timeout or invoked callable is finished and returns what the callable returns or raises its unhandled exception.

If the future object is generated by thread_utils.Pool.send method, and if the Pool instance is killed forcedly before the invoked task is started, this method raises DeadPoolError.

The instance will be created by thread_utils.Pool.send method or callable decorated by thread_utils.async.

Future.is_finished()

Return True if invoked callable is finished. Otherwise, return False.

Future.receive(timeout=None)

Block until timeout or invoked callable is finished and returns what the callable returned or raises its unhandled exception.

When argument `timeout' is present and is not None, it shoule be int or floating number. This method raises TimeoutError if task won’t be finished before timeout.

Pool Objects

This class pools worker threads and do tasks parallel using them.

The worker threads are reused many times for the performance.

`send' method queues specified callable with the arguments and returns a Future object immediately. The returned future object monitors the invoked callable progress and stores the result.

class thread_utils.Pool(worker_size=1, loop_count=sys.maxint, daemon=True)

All arguments are optional. Argument `worker_size' specifies the number of the worker thread. The object can do this number of tasks at the same time parallel. Each worker will invoke callable `loop_count' times. After that, the worker kill itself and a new worker is created.

If the argument `daemon' is True, the worker threads will be daemonic, or not. Python program exits when only daemon threads are left.

This constructor is thread safe.

Pool.send(func, *args, **kwargs)

Queue specified callable with the arguments and returns a Future object.

Argument `func ' is a callable object invoked by workers, and *args and **kwargs are arguments to be passed to the callable.

The returned Future object monitors the progress of invoked callable and stores the result.

See Future Objects for more detail abaout the return value.

This method raises DeadPoolError if called after kill method is called.

This method is thread safe.

Pool.kill(force=False, block=False)

Set internal flag and make worker threads stop.

If the argument `force' is True, the workers will stop after their current task is finished. In this case, some tasks could be left undone, and DeadPoolError will be raised if receive method of the future object is called. On the other hand, if the argument `force' is False, the workers will do all queued tasks and finish after that. The default value is False.

If the argument `block' is True, it blocks until all workers finished their tasks. Otherwise, it returns immediately. The default is False.

If this class is used in `with' statement, this method is called when block exited with default arguments, i.e. force=False and block=False. Otherwise, this method must be called after finished using the object, or the worker threads will not end till the program ends. (Or, if the workers are daemonic, dead lock occurs and program will never ends.)

This method is thread safe. If this method is called twice or more than twice, sets the flag only the first time and do noghing after that.

For example, the following program creates pool with worker_size = 3. so display 3 messages every seconds. The Pool will be killed soon, but the worker do all tasks to be sent.

import thread_utils
import time

def message(msg):
    time.sleep(1)
    return msg

pool = thread_utils.Pool(worker_size=3)
futures = []
for i in xrange(7):
    futures.append(pool.send(message, "Message %d." % i))
pool.kill()

# First, sleep one second and "Message 0", "Message 1", "Message 2"
# will be displayed.
# After one second, Message 3 - 5 will be displayed.
# Finally, "Message 6" will be displayed and program will exit.
for f in futures:
    print f.receive()

It is not necessary to call kill method if using with statement.

import thread_utils
import time

def message(msg):
    time.sleep(1)
    return msg

pool = thread_utils.Pool(worker_size=3)
futures = []
with thread_utils.Pool(worker_size=3) as pool:
    for i in xrange(7):
        futures.append(pool.send(message, "Message %d." % i))

for f in futures:
    print f.receive()

Development

Install requirements to developing and set pre-commit hook.

$ git clone https://github.com/wbcchsyn/python-thread_utils.git
$ cd python-thread_utils
$ pip install -r dev_utils/requirements.txt
$ ln -s ../../dev_utils/pre-commit .git/hooks/pre-commit

CHANGELOG

0.1.3 (2015/03/15)

  • Bug Fix: Pool.kill method could raise Queue.Empty error.

  • Performance tuning.

0.1.2 (2014/09/14)

  • Create actor alias to async decorator.

  • Add optional arguments ‘force’ and ‘block’ to Pool.kill method.

  • Future.receive method raise DeadPoolError if the Pool is killed before task is done.

  • Update documents.

0.1.1 (2014/06/13)

  • Delete unused files.

0.1.0 (2014/06/12)

  • First release.

Project details


Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page