async_pool_executor,its api like the concurrent.futures
Project description
pip install async_pool_executor
主要功能
主要功能是仿照 concurrent.futures 的线程池报的submit shutdown方法。
使得在做生产 消费 时候,无需学习烦人的异步 loop 、 run_until_complete ,可以直接在同步函数中 submit。
生产和消费不需要在同一个loop中,喜欢同步编程思维的人可以用这个。
实现代码
import asyncio
import atexit
import time
import traceback
from threading import Thread
class AsyncPoolExecutor:
"""
使api和线程池一样,最好的性能做法是submit也弄成 async def,生产和消费在同一个线程同一个loop一起运行,但会对调用链路的兼容性产生破坏,从而调用方式不兼容线程池。
"""
def __init__(self, size, loop=None):
"""
:param size: 同时并发运行的协程任务数量。
:param loop:
"""
self._size = size
self.loop = loop or asyncio.new_event_loop()
self._sem = asyncio.Semaphore(self._size, loop=self.loop)
self._queue = asyncio.Queue(maxsize=size, loop=self.loop)
t = Thread(target=self._start_loop_in_new_thread)
t.setDaemon(True) # 设置守护线程是为了有机会触发atexit,使程序自动结束,不用手动调用shutdown
t.start()
self._can_be_closed_flag = False
atexit.register(self.shutdown)
def submit(self, func, *args, **kwargs):
future = asyncio.run_coroutine_threadsafe(self._produce(func, *args, **kwargs), self.loop) # 这个 run_coroutine_threadsafe 方法也有缺点,消耗的性能巨大。
future.result() # 阻止过快放入,放入超过队列大小后,使submit阻塞。
async def _produce(self, func, *args, **kwargs):
await self._queue.put((func, args, kwargs))
async def _consume(self):
while True:
func, args, kwargs = await self._queue.get()
if func == 'stop':
break
try:
await func(*args, **kwargs)
except Exception as e:
traceback.print_exc()
def _start_loop_in_new_thread(self, ):
# self._loop.run_until_complete(self.__run()) # 这种也可以。
# self._loop.run_forever()
# asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(asyncio.wait([self._consume() for _ in range(self._size)], loop=self.loop))
self._can_be_closed_flag = True
def shutdown(self):
for _ in range(self._size):
self.submit('stop', )
while not self._can_be_closed_flag:
time.sleep(0.1)
self.loop.close()
print('关闭循环')
if __name__ == '__main__':
import nb_log
async def async_f(x):
await asyncio.sleep(2)
print(x)
pool =AsyncPoolExecutor(3)
for i in range(30):
pool.submit(async_f,i)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.