Skip to main content

Simplify multi tasking with process or thread pool executor.

Project description

Multitasking Library for Python

Introduction

A simple python library to simplify multitasking programming with ProcessPoolExecutor or ThreadPoolExecutor.

Installation

pip install py-multitasking

Usage

Basic Usage

Let's use the with_pool_executor() to run a function in parallel.

import math
import time

from py_multitasking import with_process_pool_executor, Scopes


def is_prime(n: int) -> bool:
    if n <= 1:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True


def find_primes(num_range: range):
    primes = []
    for num in num_range:
        if is_prime(num):
            primes.append(num)
    return primes


def find_primes_0():
    start_time = time.time_ns()
    primes = []
    for num in range(1, 5000000):
        if is_prime(num):
            primes.append(num)
    print(f"Found: {len(primes)} primes in total")
    print(f"Execution time: {(time.time_ns() - start_time) / 1e9} seconds")


def find_primes_1():
    start_time = time.time_ns()
    with with_process_pool_executor(max_workers=5) as manager:
        scopes = Scopes()
        session = (
            manager.session()
            .submit("task1", find_primes, scopes, range(1, 1000000))
            .submit("task2", find_primes, scopes, range(1000000, 2000000))
            .submit("task3", find_primes, scopes, range(2000000, 3000000))
            .submit("task4", find_primes, scopes, range(3000000, 4000000))
            .submit("task5", find_primes, scopes, range(4000000, 5000000))
        )
        found = 0
        session.wait_for_all()
        for task_name, result in session.results().items():
            if not result.successful:
                print(f"{task_name}: failed with {result.exception}")
                continue
            found += len(result.value)
            print(f"{task_name}: found {len(result.value)} primes")

        print(f"Found: {found} primes in total")
        print(f"Execution time: {(time.time_ns() - start_time) / 1e9} seconds")
        session.destroy()


def find_primes_2():
    start_time = time.time_ns()
    with with_process_pool_executor(max_workers=5) as manager:
        session = manager.map(
            "task-",
            find_primes,
            None,
            [
                range(1, 1000000),
                range(1000000, 2000000),
                range(2000000, 3000000),
                range(3000000, 4000000),
                range(4000000, 5000000),
            ],
        )
        session.wait_for_all()
        total_primes = 0
        for task_name, result in session.results().items():
            if not result.successful:
                print(f"{task_name}: failed with {result.exception}")
                continue
            total_primes += len(result.value)
            print(f"{task_name}: found {len(result.value)} primes")
        print(f"Found: {total_primes} primes in total")
        print(f"Execution time: {(time.time_ns() - start_time) / 1e9} seconds")
        session.destroy()


if __name__ == "__main__":
    find_primes_0()
    print("=" * 80)
    find_primes_1()
    print("=" * 80)
    find_primes_2()

with_thread_pool_executor() also available. But it's not recommended to use it if your function is CPU-bound.

with_process_pool_executor() and with_thread_pool_executor() will create a TaskManagerBase object, which uses the TaskSession object to manager the tasks.

To create a TaskSession, we can use the session() method or the map() method of the TaskManagerBase object.

def my_task(a, b, c):
    pass


with with_process_pool_executor(max_workers=5) as manager:
    session = manager.session()
    # or
    session = manager.map("task-", my_task, None, [(1,2,3), (4,5,6)])

Here are the signatures of TaskManagerBase.session() and TaskManagerBase.map():

class TaskManagerBase(object):
    def session(
        self,
        session_input_queue: bool = True,
        session_output_queue: bool = True,
        session_cancel_event: bool = True,
        session_input_queue_lock: bool = True,
        session_output_queue_lock: bool = True,
        input_queue_size: Optional[int] = None,
        output_queue_size: Optional[int] = None,
    ) -> TaskSession:
        ...

    def map(
        self,
        task_name_prefix: str,
        func: TaskFuncTypes,
        scopes: Optional[Scopes] = None,
        *iterables,
    ) -> TaskSession:
        ...

The TaskSession object has the following methods or properties which are very useful:

  • submit(task_name: str, func: TaskFuncTypes, scopes: Optional[Scopes] = None, *args, **kwargs) -> "TaskSession"

This method is used to submit a task function to the TaskSession, it will create a Task object and add it to the task list of the TaskSession.

  • any_done

This property returns True if any task in the task list of the TaskSession is done, otherwise it returns False.

  • all_done

This property returns True if all tasks in the task list of the TaskSession are done, otherwise it returns False.

we can use this property to wait for all tasks to finish in our main process(thread):

with with_process_pool_executor() as manager:
    session = manager.session()
    session.submit("task1", my_task, 1, 2, 3)
    session.submit("task2", my_task, 4, 5, 6)
    ...
    
    while not session.all_done:
        # do something in the main process(thread) until all tasks are done
        ...
  • done_tasks

This property returns a list of Task objects which are done.

  • wait_for_all(timeout: Optional[float] = None, return_when: str = "ALL_COMPLETED") -> Tuple[List[Task], List[Task]]

This method waits for all tasks in the task list of the TaskSession to finish, it returns a tuple of two lists, The first one, contains the tasks that completed (is finished or cancelled) before the wait completed. The second one, contains uncompleted tasks.

  • results(timeout: Optional[float] = None) -> Dict[str, TaskResult]

This method returns a dictionary of TaskResult objects, the key is the task name, the value is the result of the task.

The result is not the direct return value of the task function, it's a TaskResult object which is a wrapper of the return value of the task function or the exception raised by the task function.

we can access the actual return value of the task function by doing:

...
results = session.results()
# assuming you have submitted a task named "task_name"
result = results["task_name"]

if result.successful:
    ret_value = result.value
else:
    task_exception = result.exception
...
  • done_results() -> Dict[str, TaskResult]

This method returns a dictionary of TaskResult objects, the key is the task name, the value is the result of the task. Only the tasks that are done at the moment this method is called will be included in the returned dictionary.

  • is_name_available(task_name: str) -> bool

The name of a task should be unique in a TaskSession. This method is used to check if a name is available or not before submitting a task to a TaskSession.

This method returns True if the name is available, otherwise it returns False.

  • cancel_all(with_cancel_event_set: bool = False) -> None

This method is used to cancel all tasks (by calling the cancel() method of the underlying Future object of a task) in the task list of the TaskSession. If with_cancel_event_set is True, it will try to set the cancel_event of all tasks to True.

  • context

This property returns the CombinedContext object which is used to communicate between tasks and main process(thread) with session-wide input_queue, output_queue and cancel_event.

Any task function submitted to a TaskSession will be represented by a Task object, which has the following properties:

  • name

The property holds the name of the task. The name of a task should be unique in a TaskSession.

  • future

The property holds the Future object of the task.

  • submitted

This property indicates whether the task has been submitted to a TaskSession or not.

  • done

This property indicates whether the task is done or not.

  • cancelled

This property indicates whether the task is cancelled or not. cancelled means a CancelledError has been raised in the task function.

  • running

This property indicates whether the task is running or in a pending state.

  • result(timeout: Optional[float] = None) -> Optional[TaskResult]

This method is used to get the result of the task. It returns a TaskResult object which is a wrapper of the return value of the task function or the exception raised by the task function.

This method will wait for the task until it is done or wait for the specified time if timeout is not None.

  • result_or_none() -> Optional[TaskResult]

This method is similar to result() but it never waits for the task to finish, it returns None if the task is not done yet.

  • cancel(with_cancel_event_set: bool = True) -> bool

This method is used to cancel the task. It returns True if the task is cancelled successfully, otherwise it returns False. This method will try to call the cancel() method of the underlying Future object of the task.

If with_cancel_event_set is True, it will also try to set the cancel_event of the task to True.

  • context

This property returns the MainContext object which is used to communicate between tasks and main process(thread) with the input_queue, output_queue and cancel_event of the Task object.

Communicate between tasks and main process(thread)

import time
from random import randint

from py_multitasking import TaskContext, with_process_pool_executor, Scopes


def process_order(ctx: TaskContext, factor: int) -> None:
    print(f"order processor started, factor: {factor}")
    # quit if cancel event is set
    while not ctx.is_cancel_event_set():
        # receive a new order from main process
        new_order, is_empty = ctx.read_input()
        # if is_empty is True, it means the input queue is empty (no new order in the input queue)
        if is_empty:
            time.sleep(0.1)
            continue
        # process the new order, simulate processing time
        time.sleep(randint(1, 10))
        # send the processed order to output queue
        # it will be received by main process later
        success = ctx.write_output(new_order * factor)
        # if the output queue is full, ctx.write_output() may fail with a return value of False
        if not success:
            # wait for a while and try again
            time.sleep(0.1)
            success = ctx.write_output(new_order * factor)
            if not success:
                # if the output queue is still full, drop the order
                print(f"Dropping order {new_order} because output queue is full")


def produce_orders() -> None:
    with with_process_pool_executor(max_workers=5) as manager:
        session = manager.session()
        # start 5 processes to process orders
        for i in range(5):
            session.submit(f"task-{i}", process_order, Scopes.Session(), factor=i * 2)
        # wait for all processes to start
        time.sleep(1)

        while True:
            # get new order from user input
            user_input = input(
                "Enter new order count (q to quit; r to receive processed orders): "
            )
            if user_input.lower() == "q":
                # set cancel event to stop all processes
                session.cancel_all(with_cancel_event_set=True)
                session.wait_for_all()
                break
            elif user_input.lower() == "r":
                print("Waiting for new orders...")
                time.sleep(1)
                # get all processed orders from output queue
                processed_orders = session.context.read_output_until_empty()
                if processed_orders:
                    print(f"Received {len(processed_orders)} processed orders")
                    for order in processed_orders:
                        print(f"Processed order: {order}")
                else:
                    print("No processed orders yet")
                    continue
            else:
                try:
                    user_input = int(user_input)
                except ValueError:
                    print("Invalid input, please enter a number or 'q' to quit")
                    continue
                else:
                    # add new orders to input queue
                    for i in range(user_input):
                        session.context.write_input(i, block=False)

        session.wait_for_all()
        print("All processes have stopped")
        session.destroy()


if __name__ == "__main__":
    produce_orders()

When the task function has the following signatures:

from py_multitasking import TaskContext


def foo1(ctx: TaskContext, *args, **kwargs) -> None:
    ...


def foo2(ctx, *args, **kwargs) -> None:
    ...


def foo3(context, *args, **kwargs) -> None:
    ...


def foo4(task_context, *args, **kwargs) -> None:
    ...


def foo5(task_ctx, *args, **kwargs) -> None:
    ...

The TaskContext object will be created and passed as the first argument to the task function automatically.

mutitasking using 2 queues and 1 event to communicate between tasks and main process(thread), the two queues are:

  • input_queue: used to send msg to tasks from main process(thread).
  • output_queue: used to send msg from tasks to main process(thread).

The event is called cancel_event and it's used to notify the running tasks that the main process(thread) wants to stop them now.

Those 2 queues and 1 event can be accessed from the various places, it depends on the Scopes, the possible values are:

  • Scope.Null: Don't create the queue or the event.
  • Scope.Global: Use the global queue or event, it's shared by all tasks in the same TaskManagerBase object.
  • Scope.Session: Use the queue or event belongs to a session, all tasks in the same session share the same queue or event.
  • Scope.PerTask: Use the queue or event belongs to a task, each task has its own queue or event.

we can use Scopes to specify the scope of the input_queue, output_queue and cancel_event:

from py_multitasking import Scopes, TaskContext, with_process_pool_executor, Scope


def foo(ctx: TaskContext, a: int, b: int) -> None:
    pass


if __name__ == "__main__":
    with with_process_pool_executor(max_workers=5) as manager:
        session = manager.session()
        # create a task with a per-task input_queue, no output_queue and a session-wide cancel_event
        session.submit("task1", foo,
                       Scopes(input_queue=Scope.PerTask, output_queue=Scope.Null, cancel_event=Scope.Session), 1, 2)
        # create a task with a session-wide input_queue, no output_queue and a session-wide cancel_event
        session.submit("task2", foo,
                       Scopes(input_queue=Scope.Session, output_queue=Scope.Null, cancel_event=Scope.Session), 3, 4)

Note: To use global input_queue, output_queue and cancel_event, we should turn on the related options, like:

with_process_pool_executor(global_input_queue=True, global_output_queue=True, global_cancel_event=True)

or we will get a NoSuchObjectException

How to access the input_queue, output_queue and cancel_event? Besides the Scopes, it also depends on where we want to access them.

  1. In the task function, we can access the input_queue, output_queue and cancel_event using the TaskContext object:
from py_multitasking import TaskContext


def foo(ctx: TaskContext, a: int, b: int) -> None:
    while True:
        # check if the cancel_event is set
        if ctx.is_cancel_event_set():
            # stop the task
            break
        # read a value from the input_queue
        value, is_empty = ctx.read_input()
        if not is_empty:
            # write a value to the output_queue
            success = ctx.write_output(value * a * b)
            if not success:
                # if the output_queue is full, drop the value
                print("Output queue is full, dropping value")
        else:
            # is_empty is True, it means the input_queue is empty (no value in the input_queue)
            print("Input queue is empty, waiting for new values")
  1. In the main process(thread):

For global input_queue, output_queue and cancel_event, we can access them using TaskManagerBase.context

if __name__ == "__main__":
    with with_process_pool_executor(max_workers=5, global_input_queue=True, global_output_queue=True,
                                    global_cancel_event=True) as manager:
        session = manager.session()
        session.submit("task1", foo, Scopes.Global(), 1, 2)
        
        global_ctx = manager.context
        # write a value to the global input_queue
        global_ctx.write_input(100)
        time.sleep(0.1)
        # read a value from the global output_queue
        value, is_empty = global_ctx.read_output()
        if not is_empty:
            print(f"Received value: {value}")
        else:
            print("No value in output queue")
        # set the global cancel_event to stop all tasks
        global_ctx.set_cancel_event()
        session.wait_for_all()
        session.destroy()

For session-wide input_queue, output_queue and cancel_event, we can access them using TaskSession.context

if __name__ == "__main__":
    with with_process_pool_executor(max_workers=5) as manager:
        session = manager.session()
        session.submit("task1", foo, Scopes.Session(), 1, 2)
        
        session_ctx = session.context
        # write a value to the session input_queue
        session_ctx.write_input(100)
        time.sleep(0.1)
        # read a value from the session output_queue
        value, is_empty = session_ctx.read_output()
        if not is_empty:
            print(f"Received value: {value}")
        else:
            print("No value in output queue")
        # set the session cancel_event to stop all tasks in the session
        session_ctx.set_cancel_event()
        session.wait_for_all()
        session.destroy()

For per-task input_queue, output_queue and cancel_event, we can access them using Task.context

if __name__ == "__main__":
    with with_process_pool_executor(max_workers=5) as manager:
        session = manager.session()
        session.submit("task1", foo, Scopes.PerTask(), 1, 2)
        
        task = session.get_task("task1")
        
        task_ctx = task.context
        # write a value to the task input_queue
        task_ctx.write_input(100)
        time.sleep(0.1)
        # read a value from the task output_queue
        value, is_empty = task_ctx.read_output()
        if not is_empty:
            print(f"Received value: {value}")
        else:
            print("No value in output queue")
        # set the task cancel_event to stop the task
        task_ctx.set_cancel_event()
        session.wait_for_all()
        session.destroy()

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py_multitasking-0.1.1.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py_multitasking-0.1.1-py3-none-any.whl (15.7 kB view details)

Uploaded Python 3

File details

Details for the file py_multitasking-0.1.1.tar.gz.

File metadata

  • Download URL: py_multitasking-0.1.1.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.10 Windows/10

File hashes

Hashes for py_multitasking-0.1.1.tar.gz
Algorithm Hash digest
SHA256 dd762d59b116287726a324bc6ffea58af3ec423e115a810598ef52befcf936e8
MD5 424abbe7eac9a855acdb278780b0e9a3
BLAKE2b-256 53170de0325d7ba275a90fba357f67c415b91a74dc4d945b7e4c25e9afe8c929

See more details on using hashes here.

File details

Details for the file py_multitasking-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: py_multitasking-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 15.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.10 Windows/10

File hashes

Hashes for py_multitasking-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 06ec4eabe3b625b16367afe112e7511c21a5962426eeabf8d9d43f30fa6c1b51
MD5 2f73711f703a3bf3b3b84b476652fe4e
BLAKE2b-256 d7787d842b5a29abcd149fc7d3f308841cdeed986211775ad05e3b096e26f8c1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page