flexible_thread_pool ,auto expand thread and reduce threads. both support sync and asyncio,fast than concurrent.futures.ThreadpoolExecutor
Project description
flexible_thread_pool
自动弹性伸缩线程池,完美兼容 sync / async 函数,性能是官方
concurrent.futures.ThreadPoolExecutor的 200%+。
特性
- ✅ 弹性伸缩 — 根据任务提交频率和执行耗时,自动扩缩线程数,无需人工估算
- ✅ 返回 Future —
submit()返回标准concurrent.futures.Future,支持result()获取返回值 - ✅ 支持 map — 内置
Executor.map方法,批量提交更便捷 - ✅ 完美兼容 asyncio — 自动检测
async def函数并使用事件循环调度,无需手动await - ✅ 协程跨 Loop 安全 — 每个工作线程拥有独立事件循环(
threading.local),彻底解决 "attached to a different loop" 错误 - ✅ 指定外部 Loop — 支持通过
specify_async_loop参数绑定外部事件循环,满足连接池等特殊场景 - ✅ 有界队列背压 — 严格队列大小限制,防止内存溢出
- ✅ 自然生命周期 — 利用
daemon=False+MIN_WORKERS,脚本任务完成后自动退出
安装
pip install flexible_thread_pool
依赖:nb_log
快速开始
基本用法
from flexible_thread_pool import FlexibleThreadPool
def sync_task(x):
return x * 2
pool = FlexibleThreadPool(max_workers=100)
# submit 返回 Future,可获取结果
fut = pool.submit(sync_task, 42)
print(fut.result()) # 84
# 批量提交(自动处理超时)
results = pool.map(sync_task, [1, 2, 3, 4], timeout=5)
for res in results:
print(res)
异步函数支持
import asyncio
from flexible_thread_pool import FlexibleThreadPool
async def async_task(x):
await asyncio.sleep(0.1)
return x * 2
pool = FlexibleThreadPool(max_workers=100)
# 自动调度 async def 函数
fut = pool.submit(async_task, 21)
print(fut.result()) # 42
无需手动创建事件循环,线程池自动为每个工作线程分配独立的
asyncio.new_event_loop(),避免跨线程 loop 冲突。
指定外部事件循环
某些异步库(如 aiohttp、asyncpg)要求连接池与事件循环绑定在同一线程。可以通过 specify_async_loop 解决:
import asyncio
from flexible_thread_pool import FlexibleThreadPool
loop = asyncio.new_event_loop()
pool = FlexibleThreadPool(
max_workers=100,
specify_async_loop=loop,
is_auto_start_specify_async_loop_in_child_thread=True,
)
自动退出 vs 常驻进程
| 类名 | MIN_WORKERS | 行为 |
|---|---|---|
FlexibleThreadPool |
0 |
任务全部完成后自动退出 |
FlexibleThreadPoolMinWorkers0 |
0 |
同上,显式别名 |
FlexibleThreadPoolMinWorkers1 |
1 |
至少保留 1 个常驻线程,进程永不退出 |
from flexible_thread_pool import (
FlexibleThreadPool,
FlexibleThreadPoolMinWorkers1,
)
# 脚本结束后自动退出
pool = FlexibleThreadPool(max_workers=100)
pool.MIN_WORKERS = 0 # 默认就是 0
# 常驻进程,永不结束
pool = FlexibleThreadPoolMinWorkers1(max_workers=100)
# 或动态设置
pool.MIN_WORKERS = 1
为什么选择 FlexibleThreadPool?
官方线程池的缺陷
import time
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(500)
def f(x):
time.sleep(10) # 模拟 IO 阻塞
print(x)
for i in range(10000):
time.sleep(100) # 模拟低频提交
pool.submit(f, i)
- 低峰期浪费:即使每隔 100 秒才提交一个任务,官方线程池依然会创建 500 个线程
- 无自动缩容:流量高峰过后,线程数不会自动降低
FlexibleThreadPool 的智能策略
| 场景 | 官方 ThreadPoolExecutor | FlexibleThreadPool |
|---|---|---|
| 任务稀疏(隔 100s 提交一次) | 盲目扩容到 500 线程 | 只开 1 个线程 |
| 流量高峰(9:00-10:00) → 低峰 | 始终保持 500 线程 | 高峰 500,低峰自动降至 ~5 |
| 函数耗时降低(100s → 1s) | 保持 500 线程 | 自动从 500 降至 ~100 |
| 函数耗时增加(1s → 100s) | 保持 500 线程 | 自动从 ~100 升至 500 |
FlexibleThreadPool 无需用户分析函数耗时,自动计算出最合理的线程数量。
性能对比
在 Win11 + AMD R5 4600U 单核单进程测试:
线程池 | 吞吐量 (ops/s)
-------------------------------|---------------
concurrent.futures.ThreadPoolExecutor | ~10000
FlexibleThreadPool | ~30000+ 🚀
性能提升约 200%,且线程数越高峰值吞吐优势越明显。
API
FlexibleThreadPool(max_workers, work_queue_maxsize, specify_async_loop, is_auto_start_specify_async_loop_in_child_thread)
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
max_workers |
int |
cpu_count * 5 |
最大线程数 |
work_queue_maxsize |
int |
10 |
工作队列最大长度(背压控制) |
specify_async_loop |
AbstractEventLoop |
None |
指定外部事件循环 |
is_auto_start_specify_async_loop_in_child_thread |
bool |
True |
是否自动启动指定 loop |
实例方法
submit(func, *args, **kwargs)→Future— 提交任务,返回 Future 对象map(func, *iterables, timeout, chunksize)— 批量提交(继承自Executor.map)shutdown(wait=True)— 优雅关闭(当前依赖 KEEP_ALIVE_TIME 自动清理,无需显式调用)
实例属性
| 属性 | 类型 | 默认值 | 说明 |
|---|---|---|---|
KEEP_ALIVE_TIME |
float |
5.0 |
空闲线程存活超时(秒) |
MIN_WORKERS |
int |
0 |
最小常驻线程数 |
max_workers |
int |
— | 最大线程数(构造函数传入) |
链接
- GitHub: https://github.com/ydf0509/flexible_thread_pool
- PyPI: https://pypi.org/project/flexible_thread_pool/
- 另一个弹性线程池实现: threadpool_executor_shrink_able
如有问题或建议,欢迎提交 Issue 或 PR。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file flexible_thread_pool-0.3.tar.gz.
File metadata
- Download URL: flexible_thread_pool-0.3.tar.gz
- Upload date:
- Size: 9.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.25
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
19ecb69754872c58ff2139dc883d877f8ba8e03dc90c503d10fbcd926d8f570f
|
|
| MD5 |
fc7bde4d729cdb5cd82e777953d0598e
|
|
| BLAKE2b-256 |
aadc7792040326a2a894060383a54c4e8ce929a0b28222da96fa24b7667e6a92
|