Skip to main content

async_pool_executor,its api like the concurrent.futures

Project description

pip install async_pool_executor

主要功能

主要功能是仿照 concurrent.futures 的线程池报的submit shutdown方法。

使得在做生产 消费 时候,无需学习烦人的异步 loop 、 run_until_complete ,可以直接在同步函数中 submit。
生产和消费不需要在同一个loop中,喜欢同步编程思维的人可以用这个。

实现代码

import asyncio
import atexit
import time
import traceback
from threading import Thread


class AsyncPoolExecutor:
    """
    使api和线程池一样,最好的性能做法是submit也弄成 async def,生产和消费在同一个线程同一个loop一起运行,但会对调用链路的兼容性产生破坏,从而调用方式不兼容线程池。
    """

    def __init__(self, size, loop=None):
        """

        :param size: 同时并发运行的协程任务数量。
        :param loop:
        """
        self._size = size
        self.loop = loop or asyncio.new_event_loop()
        self._sem = asyncio.Semaphore(self._size, loop=self.loop)
        self._queue = asyncio.Queue(maxsize=size, loop=self.loop)
        t = Thread(target=self._start_loop_in_new_thread)
        t.setDaemon(True)  # 设置守护线程是为了有机会触发atexit,使程序自动结束,不用手动调用shutdown
        t.start()
        self._can_be_closed_flag = False
        atexit.register(self.shutdown)


    def submit(self, func, *args, **kwargs):
        future = asyncio.run_coroutine_threadsafe(self._produce(func, *args, **kwargs), self.loop)  # 这个 run_coroutine_threadsafe 方法也有缺点,消耗的性能巨大。
        future.result()  # 阻止过快放入,放入超过队列大小后,使submit阻塞。

    async def _produce(self, func, *args, **kwargs):
        await self._queue.put((func, args, kwargs))

    async def _consume(self):
        while True:
            func, args, kwargs = await self._queue.get()
            if func == 'stop':
                break
            try:
                await func(*args, **kwargs)
            except Exception as e:
                traceback.print_exc()

    def _start_loop_in_new_thread(self, ):
        # self._loop.run_until_complete(self.__run())  # 这种也可以。
        # self._loop.run_forever()

        # asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(asyncio.wait([self._consume() for _ in range(self._size)], loop=self.loop))
        self._can_be_closed_flag = True

    def shutdown(self):
        for _ in range(self._size):
            self.submit('stop', )
        while not self._can_be_closed_flag:
            time.sleep(0.1)
        self.loop.close()
        print('关闭循环')


if __name__ == '__main__':
    import nb_log
    async def async_f(x):
        await asyncio.sleep(2)
        print(x)

    pool  =AsyncPoolExecutor(3)
    for i in range(30):
        pool.submit(async_f,i)

Project details


Release history Release notifications | RSS feed

This version

0.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_pool_executor-0.1.tar.gz (3.3 kB view details)

Uploaded Source

File details

Details for the file async_pool_executor-0.1.tar.gz.

File metadata

  • Download URL: async_pool_executor-0.1.tar.gz
  • Upload date:
  • Size: 3.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.20.1 setuptools/52.0.0 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.6.6

File hashes

Hashes for async_pool_executor-0.1.tar.gz
Algorithm Hash digest
SHA256 9170b912ce8531920c80eef0f752c4e8c96d46840bfd4e08956d478749f72ace
MD5 77ca00d2bab98f00bf46a99f2c9e5a56
BLAKE2b-256 36d8be61cec9d5f39ef23eb1ef33725bcbddf4b3564b00500f060cd71bb6b753

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page