Skip to main content

The Simplest and Most Powerful Concurrent Helper

Project description

The Simplest and Most Powerful Concurrent Helper

Setup

pip install concurrent-helper

Key Features

  • Simplest and powerful, very easy to use, only 2 core functions.
  • Works well both on Python2 and Python3.
  • Support for multiple concurrent modes: thread pool, process pool and independent multi-processes.
  • Support the mode of Message Queue + Service.
  • Multiple progress bar display modes.

Quick Start

import concurrent_helper
import os


def init(gpu_id):
    os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)


def work(task_id, gpu_id=None):
    if gpu_id is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)

    print("{}: I am working on {} for {}".format(
        os.getpid(),
        os.environ.get("CUDA_VISIBLE_DEVICES"),
        task_id)
    )
    return task_id * 2


total_gpu_num = 3
todos = [(x, x % total_gpu_num) for x in range(10)]

rtvs = concurrent_helper.run_with_concurrent(work, todos, "thread")
rtvs = concurrent_helper.run_with_concurrent(work, todos, "process")
rtvs = concurrent_helper.run_with_concurrent(work, todos, "x-process")

rtvs = concurrent_helper.run_with_message_queue(
    init, list(range(3)),   # start 3 services
    work, list(range(10))   # has 10 tasks to do
)

Core Function: run_with_concurrent

def run_with_concurrent(
    func,
    args_list,
    concurrent_type="thread",  # ["single", "thread", "process", "x-process"]
    concurrent_num=1,
    show_process="print",  # ["", "tqdm", "print"]
    show_interval=0.01,
):
    pass

Run a function by concurrent mode.

Key Params

concurrent_type:

Param Value Description
single like normal for-loop
thread thread pool
process process pool
x-process independent multi-processes

Warning:

Arrocding to this issue: https://github.com/agronholm/pythonfutures/issues/29, there is a bug in concurrent.futures of Python2. The relevant fix upstream uses Python 3 features and cannot be backported.

This bug only happen when child-process killed by system (for exapmle, memory overflow). If you encounter this problem, use the x-process instead of process when you are using Python2.

show_process:

Param Value Description
"" don't show process
tqdm use tqdm style process bar
print print process bar info

Warning:

Please note that tqdm is not thread safe, use print if you need the guarantee of thread safe.

show_interval:

Param Value Description
>= 1 update progress bar by every N task
< 1 update progress bar by percentage

Core Function: run_with_message_queue

def run_with_message_queue(
    init_func,
    init_args_list,  # concurrent_num == len(init_args_list)
    func,
    args_list,
    show_process="print",  # ["", "tqdm", "print"]
    show_interval=0.01,
):
    pass

Run function by Message Queue + Service mode.

Fist, start N (N=len(init_args_list)) services, these services will inited by init_func.

After that, these services will obtain M (M=len(args_list)) tasks from message queue and run these by func.

Why we need Message Queue + Service mode?

In order to maximize resource utilization (like GPU), we should to start a certain number of services according to the number of resources. Then, these services will obtain tasks from the message queue and run them.

Examples

import concurrent_helper
import os


def init(gpu_id):
    os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)


def work(task_id, gpu_id=None):
    if gpu_id is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)

    print("{}: I am working on {} for {}".format(
        os.getpid(),
        os.environ.get("CUDA_VISIBLE_DEVICES"),
        task_id)
    )
    return task_id * 2


total_gpu_num = 3
todos = [(x, x % total_gpu_num) for x in range(5)]

rtvs = concurrent_helper.run_with_concurrent(work, todos, "thread", 3)
print("----")
rtvs = concurrent_helper.run_with_concurrent(work, todos, "process", 3)
print("----")
rtvs = concurrent_helper.run_with_concurrent(work, todos, "x-process", 3, "tqdm")
print("----")
rtvs = concurrent_helper.run_with_message_queue(
    init, list(range(3)),
    work, list(range(5))
)
print(rtvs)

outputs:

37059: I am working on 0 for 0
37059: I am working on 1 for 1
37059: I am working on 2 for 2
[    1/5    ] ...... Fns work with thread ...... in     0.0001 seconds.
37059: I am working on 0 for 3
37059: I am working on 1 for 4
[    2/5    ] ...... Fns work with thread ...... in     0.0001 seconds.
[    3/5    ] ...... Fns work with thread ...... in     0.0003 seconds.
[    4/5    ] ...... Fns work with thread ...... in     0.0002 seconds.
[    5/5    ] ...... Fns work with thread ...... in     0.0001 seconds.
>>>>>> Fns 5 work with thread total use     0.0020 seconds.
----
37063: I am working on 0 for 0
37064: I am working on 1 for 1
37064: I am working on 0 for 3
37065: I am working on 2 for 2
37063: I am working on 1 for 4
[    1/5    ] ...... Fns work with process ...... in     0.0003 seconds.
[    2/5    ] ...... Fns work with process ...... in     0.0003 seconds.
[    3/5    ] ...... Fns work with process ...... in     0.0000 seconds.
[    4/5    ] ...... Fns work with process ...... in     0.0000 seconds.
[    5/5    ] ...... Fns work with process ...... in     0.0006 seconds.
>>>>>> Fns 5 work with process total use     0.0126 seconds.
----
37066: I am working on 0 for 0
37067: I am working on 1 for 1
37068: I am working on 2 for 2
37069: I am working on 0 for 3
37070: I am working on 1 for 4
[work / x-process]: 100%|█████████████████| 5/5 [00:00<00:00, 346.26it/s]
----
37074: I am working on 0 for 0
37075: I am working on 1 for 1
37076: I am working on 2 for 2
[    1/1    ] ...... Fns work with x-process ...... in     0.0003 seconds.
>>>>>> Fns 1 work with x-process total use     0.0085 seconds.
[    1/1    ] ...... Fns work with x-process ...... in     0.0004 seconds.
[    1/5    ] ...... Fns work with run_with_message_queue ...... in     0.0090 seconds.
>>>>>> Fns 1 work with x-process total use     0.0090 seconds.
[    1/1    ] ...... Fns work with x-process ...... in     0.0003 seconds.
>>>>>> Fns 1 work with x-process total use     0.0087 seconds.
[    2/5    ] ...... Fns work with run_with_message_queue ...... in     0.0093 seconds.
[    3/5    ] ...... Fns work with run_with_message_queue ...... in     0.0090 seconds.
37077: I am working on 0 for 3
37078: I am working on 1 for 4
[    1/1    ] ...... Fns work with x-process ...... in     0.0003 seconds.
>>>>>> Fns 1 work with x-process total use     0.0061 seconds.
[    4/5    ] ...... Fns work with run_with_message_queue ...... in     0.0063 seconds.
[    1/1    ] ...... Fns work with x-process ...... in     0.0003 seconds.
>>>>>> Fns 1 work with x-process total use     0.0060 seconds.
[    5/5    ] ...... Fns work with run_with_message_queue ...... in     0.0061 seconds.
>>>>>> Fns 5 work with run_with_message_queue total use     0.0182 seconds.
[0, 2, 4, 6, 8]

TODO

  • [DONE] Test codes.
  • [DONE] Detail docs & English describe about run_with_message_queue & More code examples.
  • [DONE] Add params show_process, show_interval to run_with_message_queue.
  • [DONE] Remove raise_exception param, it will be default action.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

concurrent_helper-1.0.2.tar.gz (6.9 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page