Skip to main content

async_pool_executor,its api like the concurrent.futures

Project description

pip install async_pool_executor

主要功能

主要功能是仿照 concurrent.futures 的线程池报的submit shutdown方法。

使得在做生产 消费 时候,无需学习烦人的异步 loop 、 run_until_complete ,可以直接在同步函数中 submit。
生产和消费不需要在同一个loop中,喜欢同步编程思维的人可以用这个。

实现代码

import asyncio
import atexit
import time
import traceback
from threading import Thread


class AsyncPoolExecutor:
    """
    使api和线程池一样,最好的性能做法是submit也弄成 async def,生产和消费在同一个线程同一个loop一起运行,但会对调用链路的兼容性产生破坏,从而调用方式不兼容线程池。
    """

    def __init__(self, size, loop=None):
        """

        :param size: 同时并发运行的协程任务数量。
        :param loop:
        """
        self._size = size
        self.loop = loop or asyncio.new_event_loop()
        self._sem = asyncio.Semaphore(self._size, loop=self.loop)
        self._queue = asyncio.Queue(maxsize=size, loop=self.loop)
        t = Thread(target=self._start_loop_in_new_thread)
        t.setDaemon(True)  # 设置守护线程是为了有机会触发atexit,使程序自动结束,不用手动调用shutdown
        t.start()
        self._can_be_closed_flag = False
        atexit.register(self.shutdown)


    def submit(self, func, *args, **kwargs):
        future = asyncio.run_coroutine_threadsafe(self._produce(func, *args, **kwargs), self.loop)  # 这个 run_coroutine_threadsafe 方法也有缺点,消耗的性能巨大。
        future.result()  # 阻止过快放入,放入超过队列大小后,使submit阻塞。

    async def _produce(self, func, *args, **kwargs):
        await self._queue.put((func, args, kwargs))

    async def _consume(self):
        while True:
            func, args, kwargs = await self._queue.get()
            if func == 'stop':
                break
            try:
                await func(*args, **kwargs)
            except Exception as e:
                traceback.print_exc()

    def _start_loop_in_new_thread(self, ):
        # self._loop.run_until_complete(self.__run())  # 这种也可以。
        # self._loop.run_forever()

        # asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(asyncio.wait([self._consume() for _ in range(self._size)], loop=self.loop))
        self._can_be_closed_flag = True

    def shutdown(self):
        for _ in range(self._size):
            self.submit('stop', )
        while not self._can_be_closed_flag:
            time.sleep(0.1)
        self.loop.close()
        print('关闭循环')


if __name__ == '__main__':
    import nb_log
    async def async_f(x):
        await asyncio.sleep(2)
        print(x)

    pool  =AsyncPoolExecutor(3)
    for i in range(30):
        pool.submit(async_f,i)

Project details


Release history Release notifications | RSS feed

This version

0.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_pool_executor-0.1.tar.gz (3.3 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page