Skip to main content

A Python framework integrates running multiple tasks simultaneously with different strategy.

Project description

multirunnable

License Release PyPI version Supported Versions

A Python framework integrates building program which could run multiple tasks with different running strategy.

Overview | Quickly Start | Usage | Code Example


Overview

Python is a high level program language, but it's free to let anyone choose which running strategy you want to use (Parallel, Concurrent or Coroutine).
Below are some example for each strategy:

  • Parallel - with 'multiprocessing':
from multiprocessing import Process

Process_Number = 5

def function():
    print("This is test process function.")


if __name__ == '__main__':

    __process_list = [Process(target=function)for _ in range(Process_Number)]
    for __p in __process_list:
        __p.start()

    for __p in __process_list:
        __p.join()
  • Concurrent - with 'threading':
import threading

Thread_Number = 5

def function():
    print("This is function content ...")


if __name__ == '__main__':
    
    threads_list = [threading.Thread(target=function) for _ in range(Thread_Number)]
    for __thread in threads_list:
        __thread.start()
    
    for __thread in threads_list:
        __thread.join()
  • Coroutine - with 'gevent' (Green Thread):
from gevent import Greenlet

Green_Thread_Number = 5

def function():
    print("This is function content ...")


if __name__ == '__main__':

    greenlets_list = [Greenlet(function) for _ in range(Green_Thread_Number)]
    for __greenlet in greenlets_list:
        __greenlet.start()

    for __greenlet in greenlets_list:
        __greenlet.join()
  • Coroutine - with 'asyncio' (Asynchronous):
import asyncio

async def function():
    print("This is function content ...")

async def running_function():
    task = asyncio.create_task(function())
    await task


if __name__ == '__main__':

    asyncio.run(running_function())

No matter which way you choose to implement, it's Python, it's easy.

However, you may change the way to do something for testing, for efficiency, for resource concern or something else. You need to change to use parallel or coroutine but business logic has been done. The only way is refactoring code. It's not a problem if it has full-fledged testing code (TDD); if not, it must be an ordeal.

Package 'multirunnable' is a framework which could build a program with different running strategy by mode option. Currently, it has 4 options could use: Parallel, Concurrent, GreenThread and Asynchronous.

Here's an example to do the same thing with it:

from multirunnable import SimpleExecutor, RunningMode
import random
import time

Workers_Number = 5

def function(index):
    print(f"This is function with index {index}")
    time.sleep(3)


if __name__ == '__main__':
  
    executor = SimpleExecutor(mode=RunningMode.Concurrent, executors=Workers_Number)
    executor.run(function=function, args={"index": f"test_{random.randrange(1, 10)}"})

How about Parallel? I want to let it be more fast. Only one thing you need to do: change the mode.

... # Any code is the same

executor = SimpleExecutor(mode=RunningMode.Parallel, executors=Workers_Number)

... # Any code is the same

Program still could run without any refactoring and doesn't need to modify anything.
Want change to use other way to run? Change the Running Mode, that's all.

⚠️ Parallel, Concurrent and GreenThread are in common but Asynchronous isn't.
From above all, we could change the mode to run the code as the running strategy we configure. However, it only accepts 'awaitable' function to run asynchronously in Python. In the other word, you must remember add keyword 'async' before function which is the target to run with multirunnable.

Quickly Start

Install this package by pip:

pip install multirunnable

Write a simple code to run it.

>>> from multirunnable import SimpleExecutor, RunningMode
>>> executor = SimpleExecutor(mode=RunningMode.Parallel, executors=3)
>>> def function(index):
...     print(f"This is function with index {index}")
... 
>>> executor.run(function=function, args={"index": f"test_param"})
This is function with index test_param
This is function with index test_param
This is function with index test_param
>>> 

Usage

This package classifies the runnable unit to be Executor and Pool.
It also supports doing some operators with running multiple tasks simultaneously like Lock, Semaphore, Event, etc.


Runnable Components


  • Executor

This is a basic unit for every running strategy.

  • Parallel -> Process
  • Concurrent -> Thread
  • Green Thread -> Greenlet object
  • Asynchronous -> Asynchronous task object

We could run an easy Parallel, Concurrent or Coroutine code with it.

from multirunnable import SimpleExecutor, RunningMode

executor = SimpleExecutor(mode=RunningMode.Parallel, executors=3)
executor.run(function=<Your target function>, args=<The arguments of target function>)
  • Pool

This Pool concept is same as below:

  • Parallel -> multiprocessing.Pool
  • Concurrent -> multiprocessing.ThreadPool
  • Green Thread -> gevent.pool.Pool
  • Asynchronous -> Doesn't support this feature
from multirunnable import SimplePool, RunningMode

pool = SimplePool(mode=RunningMode.Parallel, pool_size=3, tasks_size=10)
pool.async_apply(function=<Your target function>, args=<The arguments of target function>)

Lock Features


  • Lock

With multirunnable, it should initial Lock objects before you use it.

from multirunnable import SimpleExecutor, RunningMode
from multirunnable.adapter import Lock

# Initial Lock object
lock = Lock()

executor = SimpleExecutor(mode=RunningMode.Parallel, executors=3)
# Pass into executor or pool via parameter 'features'
executor.run(function=<Your target function>, features=lock)

It could use the Lock object via LockOperator.

from multirunnable.api import LockOperator
import time

lock = LockOperator()

def lock_function():
    lock.acquire()
    print("Running process in lock and will sleep 2 seconds.")
    time.sleep(2)
    print(f"Wake up process and release lock.")
    lock.release()

Above code with Lock function is equal to below:

import threading
import time

... # Some logic

lock = threading.Lock()

print(f"Here is sample function running with lock.")
lock.acquire()
print(f"Process in lock and it will sleep 2 seconds.")
time.sleep(2)
print(f"Wake up process and release lock.")
lock.release()

... # Some logic

Or with keyword with:

import threading
import time

... # Some logic

lock = threading.Lock()

print(f"Here is sample function running with lock.")
with lock:
    print(f"Process in lock and it will sleep 2 seconds.")
    time.sleep(2)
    print(f"Wake up process and release lock.")

... # Some logic

✨👀 Using features with Python decorator

It also could use Lock via decorator RunWith (it's AsyncRunWith with Asynchronous).

from multirunnable.api import RunWith
import time

@RunWith.Lock
def lock_function():
    print("Running process in lock and will sleep 2 seconds.")
    time.sleep(2)
    print(f"Wake up process and release lock.")

Only below features support decorator:
Lock, Semaphore, Bounded Semaphore.

  • RLock

Lock only could acquire and release one time but RLock could acquire and release multiple times.

from multirunnable.api import RLockOperator
import time

rlock = RLockOperator()

def lock_function():
    rlock.acquire()
    print("Acquire RLock 1 time")
    rlock.acquire()
    print("Acquire RLock 2 time")
    print("Running process in lock and will sleep 2 seconds.")
    time.sleep(2)
    print(f"Wake up process and release lock.")
    rlock.release()
    print("Acquire Release 1 time")
    rlock.release()
    print("Acquire Release 2 time")
  • Semaphore

Semaphore could accept multiple runnable unit in target function:

from multirunnable.api import RunWith
import time

@RunWith.Semaphore
def lock_function():
    print("Running process in lock and will sleep 2 seconds.")
    time.sleep(2)
    print(f"Wake up process and release lock.")
  • Bounded Semaphore

It's mostly same as Semaphore.


Communication Features


For features Event and Condition, they all don't support using with decorator. So it must use it via operator object.

  • Event

from multirunnable import SimpleExecutor, RunningMode, sleep
from multirunnable.api import EventOperator
from multirunnable.adapter import Event
import random


class WakeupProcess:

    __event_opt = EventOperator()

    def wake_other_process(self, *args):
        print(f"[WakeupProcess] It will keep producing something useless message.")
        while True:
            __sleep_time = random.randrange(1, 10)
            print(f"[WakeupProcess] It will sleep for {__sleep_time} seconds.")
            sleep(__sleep_time)
            self.__event_opt.set()


class SleepProcess:

    __event_opt = EventOperator()

    def go_sleep(self, *args):
        print(f"[SleepProcess] It detects the message which be produced by ProducerThread.")
        while True:
            sleep(1)
            print("[SleepProcess] ConsumerThread waiting ...")
            self.__event_opt.wait()
            print("[SleepProcess] ConsumerThread wait up.")
            self.__event_opt.clear()


if __name__ == '__main__':
    
    __wakeup_p = WakeupProcess()
    __sleep_p = SleepProcess()
  
    # Initialize Event object
    __event = Event()
    
    # # # # Run without arguments
    executor = SimpleExecutor(mode=RunningMode.Parallel, executors=3)
    executor.map_with_function(
        functions=[__wakeup_p.wake_other_process, __sleep_p.go_sleep],
        features=__event)
  • Condition

from multirunnable import SimpleExecutor, RunningMode, QueueTask, sleep
from multirunnable.api import ConditionOperator, QueueOperator
from multirunnable.adapter import Condition
from multirunnable.concurrent import MultiThreadingQueueType
import random


class ProducerProcess:

    __Queue_Name = "test_queue"

    def __init__(self):
        self.__condition_opt = ConditionOperator()
        self.__queue_opt = QueueOperator()

    def send_process(self, *args):
        print("[Producer] args: ", args)
        test_queue = self.__queue_opt.get_queue_with_name(name=self.__Queue_Name)
        print(f"[Producer] It will keep producing something useless message.")
        while True:
            __sleep_time = random.randrange(1, 10)
            print(f"[Producer] It will sleep for {__sleep_time} seconds.")
            test_queue.put(__sleep_time)
            sleep(__sleep_time)
            __condition = self.__condition_opt
            with __condition:
                self.__condition_opt.notify_all()


class ConsumerProcess:

    __Queue_Name = "test_queue"

    def __init__(self):
        self.__condition_opt = ConditionOperator()
        self.__queue_opt = QueueOperator()

    def receive_process(self, *args):
        print("[Consumer] args: ", args)
        test_queue = self.__queue_opt.get_queue_with_name(name=self.__Queue_Name)
        print(f"[Consumer] It detects the message which be produced by ProducerThread.")
        while True:
            __condition = self.__condition_opt
            with __condition:
                sleep(1)
                print("[Consumer] ConsumerThread waiting ...")
                self.__condition_opt.wait()
                __sleep_time = test_queue.get()
                print("[Consumer] ConsumerThread re-start.")
                print(f"[Consumer] ProducerThread sleep {__sleep_time} seconds.")


class ExampleOceanSystem:

    __Executor_Number = 1

    __producer_p = ProducerProcess()
    __consumer_p = ConsumerProcess()

    @classmethod
    def main_run(cls):
        # Initialize Condition object
        __condition = Condition()

        # Initialize Queue object
        __task = QueueTask()
        __task.name = "test_queue"
        __task.queue_type = MultiThreadingQueueType.Queue
        __task.value = []

        # Initialize and run ocean-simple-executor
        __exe = SimpleExecutor(mode=RunningMode.Concurrent, executors=cls.__Executor_Number)
        # # # # Run without arguments
        __exe.map_with_function(
            functions=[cls.__producer_p.send_process, cls.__consumer_p.receive_process],
            queue_tasks=__task,
            features=__condition)

        
if __name__ == '__main__':

    print("[MainProcess] This is system client: ")
    system = ExampleOceanSystem()
    system.main_run()
    print("[MainProcess] Finish. ")

Queue


The Queue in multirunnable classify to different type by running strategy. For usage, it should do 2 things: initial and get.

  • Queue

It must use Queue feature with object QueueTask. It could configure some info like name, type and value. Name is a key of the queue object. Type means which one Queue object type you want to use.

For example, we want to set a Queue with name "test_queue", type is multiprocessing.Queue:

from multirunnable import QueueTask
from multirunnable.parallel import MultiProcessingQueueType

test_queue_task = QueueTask()
test_queue_task.name = "test_queue"
test_queue_task.queue_type = MultiProcessingQueueType.Queue
test_queue_task.value = [f"value_{i}" for i in range(20)]

We could get the queue object via QueueOperator:

from multirunnable.api import QueueOperator

queue = QueueOperator.get_queue_with_name(name="test_queue")

Also, we need to pass it by parameter 'queue_task' before we use it.

from multirunnable import SimpleExecutor, RunningMode

executor = SimpleExecutor(mode=RunningMode.Parallel, executors=3)
executor.run(function=<Your target function>, queue_tasks=test_queue_task)

Others


  • Retry Mechanism

It's possible that occurs unexpected something when running. Sometimes, it needs to catch that exceptions or errors to do some handling, it even needs to do something finally and keep going run the code. That's the reason this feature exists.

Below is the life cycle of runnable unit (worker):

image

It could use the feature via Python decorator retry (It's async_retry with Asynchronous).

from multirunnable.api import retry
import multirunnable

@retry
def target_fail_function(*args, **kwargs):
    print("It will raise exception after 3 seconds ...")
    multirunnable.sleep(3)
    raise Exception("Test for error")

It absolutely could configure timeout time (Default value is 1).

from multirunnable.api import retry
import multirunnable

@retry(timeout=3)
def target_fail_function(*args, **kwargs):
    print("It will raise exception after 3 seconds ...")
    multirunnable.sleep(3)
    raise Exception("Test for error")

It would be decorated as a 'retry' object after adds decorator on it. So we could add some features you need.

  • Initialization

The function which should be run first before run target function.
Default implementation is doing nothing.
The usage is decorating as target function annotation name and call .initialization method.

@target_fail_function.initialization
def initial():
    print("This is testing initialization")
  • Done Handling

It will return value after run completely target function. This feature argument receives the result. You could do some result-handling here to reach your own target, and it will return it out.
Default implementation is doing nothing, just return the result it gets.
The usage is decorating as target function annotation name and call .done_handling method.

@target_fail_function.done_handling
def done(result):
    print("This is testing done process")
    print("Get something result: ", result)
  • Final Handling

It's the feature run something which MUST to do. For example, close IO.
Default implementation is doing nothing.
The usage is decorating as target function annotation name and call .final_handling method.

@target_fail_function.final_handling
def final():
    print("This is final process")
  • Exception & Error - Handling

Target to handle every exception or error. So the function argument absolutely receives exception or error.
Default implementation is raising any exception or error it gets.
The usage is decorating as target function annotation name and call .error_handling method.

@target_fail_function.error_handling
def error(error):
    print("This is error process")
    print("Get something error: ", error)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

multirunnable-0.15.2rc1.tar.gz (88.4 kB view hashes)

Uploaded Source

Built Distribution

multirunnable-0.15.2rc1-py3-none-any.whl (143.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page