async_pool_executor,its api like the concurrent.futures
Project description
pip install async_pool_executor
主要功能
主要功能是仿照 concurrent.futures 的线程池报的submit shutdown方法。
使得在做生产 消费 时候,无需学习烦人的异步 loop 、 run_until_complete ,可以直接在同步函数中 submit。
生产和消费不需要在同一个loop中,喜欢同步编程思维的人可以用这个。
实现代码
import asyncio
import atexit
import time
import traceback
from threading import Thread
class AsyncPoolExecutor:
"""
使api和线程池一样,最好的性能做法是submit也弄成 async def,生产和消费在同一个线程同一个loop一起运行,但会对调用链路的兼容性产生破坏,从而调用方式不兼容线程池。
"""
def __init__(self, size, loop=None):
"""
:param size: 同时并发运行的协程任务数量。
:param loop:
"""
self._size = size
self.loop = loop or asyncio.new_event_loop()
self._sem = asyncio.Semaphore(self._size, loop=self.loop)
self._queue = asyncio.Queue(maxsize=size, loop=self.loop)
t = Thread(target=self._start_loop_in_new_thread)
t.setDaemon(True) # 设置守护线程是为了有机会触发atexit,使程序自动结束,不用手动调用shutdown
t.start()
self._can_be_closed_flag = False
atexit.register(self.shutdown)
def submit(self, func, *args, **kwargs):
future = asyncio.run_coroutine_threadsafe(self._produce(func, *args, **kwargs), self.loop) # 这个 run_coroutine_threadsafe 方法也有缺点,消耗的性能巨大。
future.result() # 阻止过快放入,放入超过队列大小后,使submit阻塞。
async def _produce(self, func, *args, **kwargs):
await self._queue.put((func, args, kwargs))
async def _consume(self):
while True:
func, args, kwargs = await self._queue.get()
if func == 'stop':
break
try:
await func(*args, **kwargs)
except Exception as e:
traceback.print_exc()
def _start_loop_in_new_thread(self, ):
# self._loop.run_until_complete(self.__run()) # 这种也可以。
# self._loop.run_forever()
# asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(asyncio.wait([self._consume() for _ in range(self._size)], loop=self.loop))
self._can_be_closed_flag = True
def shutdown(self):
for _ in range(self._size):
self.submit('stop', )
while not self._can_be_closed_flag:
time.sleep(0.1)
self.loop.close()
print('关闭循环')
if __name__ == '__main__':
import nb_log
async def async_f(x):
await asyncio.sleep(2)
print(x)
pool =AsyncPoolExecutor(3)
for i in range(30):
pool.submit(async_f,i)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file async_pool_executor-0.1.tar.gz
.
File metadata
- Download URL: async_pool_executor-0.1.tar.gz
- Upload date:
- Size: 3.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.20.1 setuptools/52.0.0 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.6.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9170b912ce8531920c80eef0f752c4e8c96d46840bfd4e08956d478749f72ace |
|
MD5 | 77ca00d2bab98f00bf46a99f2c9e5a56 |
|
BLAKE2b-256 | 36d8be61cec9d5f39ef23eb1ef33725bcbddf4b3564b00500f060cd71bb6b753 |