Skip to main content

flexible_thread_pool ,auto expand thread and reduce threads. both support sync and asyncio,fast than concurrent.futures.ThreadpoolExecutor

Project description

flexible_thread_pool

自动弹性伸缩线程池,完美兼容 sync / async 函数,性能是官方 concurrent.futures.ThreadPoolExecutor200%+

PyPI Python License


特性

  • 弹性伸缩 — 根据任务提交频率和执行耗时,自动扩缩线程数,无需人工估算
  • 返回 Futuresubmit() 返回标准 concurrent.futures.Future,支持 result() 获取返回值
  • 支持 map — 内置 Executor.map 方法,批量提交更便捷
  • 完美兼容 asyncio — 自动检测 async def 函数并使用事件循环调度,无需手动 await
  • 协程跨 Loop 安全 — 每个工作线程拥有独立事件循环(threading.local),彻底解决 "attached to a different loop" 错误
  • 指定外部 Loop — 支持通过 specify_async_loop 参数绑定外部事件循环,满足连接池等特殊场景
  • 有界队列背压 — 严格队列大小限制,防止内存溢出
  • 自然生命周期 — 利用 daemon=False + MIN_WORKERS,脚本任务完成后自动退出

安装

pip install flexible_thread_pool

依赖:nb_log


快速开始

基本用法

from flexible_thread_pool import FlexibleThreadPool


def sync_task(x):
    return x * 2


pool = FlexibleThreadPool(max_workers=100)

# submit 返回 Future,可获取结果
fut = pool.submit(sync_task, 42)
print(fut.result())  # 84

# 批量提交(自动处理超时)
results = pool.map(sync_task, [1, 2, 3, 4], timeout=5)
for res in results:
    print(res)

异步函数支持

import asyncio
from flexible_thread_pool import FlexibleThreadPool


async def async_task(x):
    await asyncio.sleep(0.1)
    return x * 2


pool = FlexibleThreadPool(max_workers=100)

# 自动调度 async def 函数
fut = pool.submit(async_task, 21)
print(fut.result())  # 42

无需手动创建事件循环,线程池自动为每个工作线程分配独立的 asyncio.new_event_loop(),避免跨线程 loop 冲突。

指定外部事件循环

某些异步库(如 aiohttp、asyncpg)要求连接池与事件循环绑定在同一线程。可以通过 specify_async_loop 解决:

import asyncio
from flexible_thread_pool import FlexibleThreadPool

loop = asyncio.new_event_loop()

pool = FlexibleThreadPool(
    max_workers=100,
    specify_async_loop=loop,
    is_auto_start_specify_async_loop_in_child_thread=True,
)

自动退出 vs 常驻进程

类名 MIN_WORKERS 行为
FlexibleThreadPool 0 任务全部完成后自动退出
FlexibleThreadPoolMinWorkers0 0 同上,显式别名
FlexibleThreadPoolMinWorkers1 1 至少保留 1 个常驻线程,进程永不退出
from flexible_thread_pool import (
    FlexibleThreadPool,
    FlexibleThreadPoolMinWorkers1,
)

# 脚本结束后自动退出
pool = FlexibleThreadPool(max_workers=100)
pool.MIN_WORKERS = 0  # 默认就是 0

# 常驻进程,永不结束
pool = FlexibleThreadPoolMinWorkers1(max_workers=100)

# 或动态设置
pool.MIN_WORKERS = 1

为什么选择 FlexibleThreadPool?

官方线程池的缺陷

import time
from concurrent.futures import ThreadPoolExecutor

pool = ThreadPoolExecutor(500)


def f(x):
    time.sleep(10)  # 模拟 IO 阻塞
    print(x)


for i in range(10000):
    time.sleep(100)  # 模拟低频提交
    pool.submit(f, i)
  • 低峰期浪费:即使每隔 100 秒才提交一个任务,官方线程池依然会创建 500 个线程
  • 无自动缩容:流量高峰过后,线程数不会自动降低

FlexibleThreadPool 的智能策略

场景 官方 ThreadPoolExecutor FlexibleThreadPool
任务稀疏(隔 100s 提交一次) 盲目扩容到 500 线程 只开 1 个线程
流量高峰(9:00-10:00) → 低峰 始终保持 500 线程 高峰 500,低峰自动降至 ~5
函数耗时降低(100s → 1s) 保持 500 线程 自动从 500 降至 ~100
函数耗时增加(1s → 100s) 保持 500 线程 自动从 ~100 升至 500

FlexibleThreadPool 无需用户分析函数耗时,自动计算出最合理的线程数量。


性能对比

Win11 + AMD R5 4600U 单核单进程测试:

线程池                          | 吞吐量 (ops/s)
-------------------------------|---------------
concurrent.futures.ThreadPoolExecutor |  ~10000
FlexibleThreadPool              |  ~30000+  🚀

性能提升约 200%,且线程数越高峰值吞吐优势越明显。


API

FlexibleThreadPool(max_workers, work_queue_maxsize, specify_async_loop, is_auto_start_specify_async_loop_in_child_thread)

参数 类型 默认值 说明
max_workers int cpu_count * 5 最大线程数
work_queue_maxsize int 10 工作队列最大长度(背压控制)
specify_async_loop AbstractEventLoop None 指定外部事件循环
is_auto_start_specify_async_loop_in_child_thread bool True 是否自动启动指定 loop

实例方法

  • submit(func, *args, **kwargs)Future — 提交任务,返回 Future 对象
  • map(func, *iterables, timeout, chunksize) — 批量提交(继承自 Executor.map
  • shutdown(wait=True) — 优雅关闭(当前依赖 KEEP_ALIVE_TIME 自动清理,无需显式调用)

实例属性

属性 类型 默认值 说明
KEEP_ALIVE_TIME float 5.0 空闲线程存活超时(秒)
MIN_WORKERS int 0 最小常驻线程数
max_workers int 最大线程数(构造函数传入)

链接


如有问题或建议,欢迎提交 Issue 或 PR。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flexible_thread_pool-0.3.tar.gz (9.1 kB view details)

Uploaded Source

File details

Details for the file flexible_thread_pool-0.3.tar.gz.

File metadata

  • Download URL: flexible_thread_pool-0.3.tar.gz
  • Upload date:
  • Size: 9.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for flexible_thread_pool-0.3.tar.gz
Algorithm Hash digest
SHA256 19ecb69754872c58ff2139dc883d877f8ba8e03dc90c503d10fbcd926d8f570f
MD5 fc7bde4d729cdb5cd82e777953d0598e
BLAKE2b-256 aadc7792040326a2a894060383a54c4e8ce929a0b28222da96fa24b7667e6a92

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page