Skip to main content

一个全面的Python并发编程增强库,为标准线程、进程和Qt线程提供了更高级的抽象和异常处理机制

Project description

nsthread

版本: 0.0.6 | GitHub | Python 3.13+

nsthread 是一个全面的 Python 并发编程增强库,为标准线程、进程和 Qt 线程提供了更高级的抽象和异常处理机制。作者: sandorn sandorn@live.cn

功能特点

  • 增强的线程管理:提供 ThreadManagerQtThreadManager 用于集中管理线程生命周期
  • 异常安全的执行:通过 safe_call 装饰器提供统一的异常捕获和日志记录
  • 单例线程支持:实现 SingletonThreadComposedSingletonThread 等单例模式线程
  • Qt 集成:提供基于 PyQt6 的线程增强实现
  • Future 模式支持:多种线程池实现,简化异步任务处理
  • 进程管理:提供增强的进程处理能力
  • 生产者-消费者模式:实现同步和异步版本的任务处理框架
  • 丰富的装饰器:提供多种线程相关的装饰器,简化并发编程

安装

基本安装

pip install nsthread

可选依赖

如果需要使用Qt相关功能,请单独安装PyQt6:

pip install PyQt6

快速开始

基本线程使用

from nsthread import SafeThread
import time

def worker_function():
    for i in range(5):
        print(f"Working: {i}")
        time.sleep(1)

# 创建并启动安全线程
thread = SafeThread(target=worker_function)
thread.start()
thread.join()

使用装饰器简化并发编程

from nsthread import thread_safe, run_in_thread, safe_call
import time

# 确保函数线程安全
@thread_safe
def shared_resource_access(data):
    # 访问共享资源的代码
    time.sleep(0.1)  # 模拟操作
    return data * 2

# 在单独的线程中执行函数
@run_in_thread
def background_task(param):
    time.sleep(2)  # 模拟耗时操作
    return f"处理完成: {param}"

# 安全执行可能抛出异常的函数
@safe_call()
def risky_operation(value):
    if value < 0:
        raise ValueError("值不能为负数")
    return value * value

# 使用这些装饰过的函数
result1 = shared_resource_access(5)
result2 = background_task("测试")  # 立即返回,函数在后台执行
result3 = risky_operation(-1)  # 捕获异常并记录,返回默认值None

使用线程池处理批量任务

from nsthread import EnhancedThreadPool
import time

def process_item(item):
    """处理单个项目的函数"""
    time.sleep(0.5)  # 模拟处理时间
    return item * item

# 创建线程池
pool = EnhancedThreadPool(max_workers=4)

# 提交多个任务
results = []
for i in range(10):
    future = pool.submit(process_item, i)
    results.append(future)

# 获取所有结果
final_results = [future.result() for future in results]
print(f"处理结果: {final_results}")

# 关闭线程池
pool.shutdown()

生产者-消费者模式示例

from nsthread import Production
import time
import random

def producer():
    """生产者函数,生成任务项"""
    time.sleep(random.uniform(0.1, 0.5))  # 模拟生产时间
    return random.randint(1, 100)

def consumer(item):
    """消费者函数,处理任务项"""
    time.sleep(random.uniform(0.2, 0.7))  # 模拟消费时间
    return item * 2

# 创建生产系统
production = Production(queue_size=5)

# 启动系统,2个生产者,3个消费者
production.start(num_producers=2, num_consumers=3, producer_fn=producer, consumer_fn=consumer)

# 运行一段时间
print("生产系统运行中...")
time.sleep(5)

# 等待所有任务处理完成
print("等待所有任务处理完成...")
production.wait_until_done(timeout=10)

# 获取所有结果
results = production.get_result_list()
print(f"处理结果数量: {len(results)}")

# 关闭系统
production.shutdown()
print("生产系统已关闭")

## 核心模块

### exception.py
提供 `safe_call` 装饰器用于捕获和处理函数执行过程中的异常确保异常被妥善记录而不导致程序崩溃

### thread.py
提供标准 Python 线程的增强实现包括 `ThreadBase`、`SafeThread`、`ThreadManager` 等类

### qt_thread.py
提供基于 PyQt6 的线程增强实现包括 `QtThreadBase`、`QtSafeThread`、`QtThreadManager` 等类

### singleton.py
提供单例模式的实现用于创建单例线程

### futures.py
提供 Future 模式的实现简化异步任务处理

### process.py
提供增强的进程处理能力

### wraps.py
提供功能增强的装饰器集合

## 使用示例

### 基本线程使用

```python
from nsthread import SafeThread
import time

def worker_function():
    for i in range(5):
        print(f"Working: {i}")
        time.sleep(1)

# 创建并启动安全线程
thread = SafeThread(target=worker_function)
thread.start()
thread.join()

使用 ThreadManager 管理多个线程

from nsthread import ThreadManager
import time

def task(name, count):
    for i in range(count):
        print(f"Task {name}: {i}")
        time.sleep(0.5)
    return f"Task {name} completed"

# 创建线程管理器
manager = ThreadManager()

# 添加并启动多个线程
manager.create_thread(target=task, args=("A", 3))
manager.create_thread(target=task, args=("B", 5))

# 等待所有线程完成
manager.wait_all_completed()

# 获取所有线程的结果
results = manager.get_all_result()
print(f"All results: {results}")

使用 safe_call 装饰器

from nsthread import safe_call

def might_raise_error(value):
    if value < 0:
        raise ValueError("Value cannot be negative")
    return value * 2

# 使用装饰器包装函数
@safe_call()
def safe_function(value):
    return might_raise_error(value)

# 安全地调用可能抛出异常的函数
safe_function(10)  # 返回 20
safe_function(-1)  # 记录异常但不会导致程序崩溃

Qt 线程使用

from nsthread import QtSafeThread
from PyQt6.QtWidgets import QApplication
import sys
import time

def qt_worker():
    for i in range(10):
        print(f"Qt thread working: {i}")
        time.sleep(0.5)
    return "Qt thread completed"

app = QApplication(sys.argv)

# 创建并启动 Qt 安全线程
qt_thread = QtSafeThread(target=qt_worker)
qt_thread.result_ready.connect(lambda result: print(f"Result: {result}"))
qt_thread.start()

# 运行 Qt 应用程序主循环
sys.exit(app.exec())

使用场景

  • 简单并发任务: 使用 ThreadBase 或 SafeThread
  • 批量任务处理: 使用 BaseThreadPool 或 EnhancedThreadPool
  • 动态资源管理: 使用 DynamicThreadPool 或 ThreadPoolManager
  • 跨进程并行: 使用 CustomProcess 或 run_custom_process
  • UI响应式应用: 使用 QtThreadBase 或 QtSafeThread
  • 复杂任务流: 使用 Production 或 AsyncProduction
  • 异步函数执行: 使用 AsyncFunction
  • 线程安全保证: 使用 thread_safe 装饰器
  • 单例线程: 使用 SingletonThread 或 SingletonQtThread

API 参考

核心装饰器

safe_call

def safe_call(log_level: int = logging.ERROR, default_return: Any = None):
    """捕获函数执行过程中的异常并记录日志
    
    参数:
        log_level: 日志记录级别
        default_return: 发生异常时的默认返回值
    """

thread_safe

def thread_safe(func: Callable[..., T]) -> Callable[..., T]:
    """确保函数在多线程环境下的安全性
    
    参数:
        func: 要保护的目标函数
    
    返回:
        线程安全的函数包装器
    """

run_in_thread

def run_in_thread(func: Callable[..., T]) -> Callable[..., T]:
    """在单独的线程中执行函数
    
    参数:
        func: 要在新线程中执行的目标函数
    
    返回:
        异步执行的函数包装器
    """

线程基础类

ThreadBase

class ThreadBase(threading.Thread):
    """基础线程类,提供线程生命周期管理和结果存储功能
    
    属性:
        result: 线程执行的结果
        error: 线程执行过程中的异常(如果有)
        is_stopped: 线程是否已停止
    
    方法:
        stop(): 安全地停止线程
        join(timeout=None): 等待线程完成
        get_result(block=True, timeout=None): 获取线程执行结果
    """

SafeThread

class SafeThread(ThreadBase):
    """继承自 ThreadBase,提供额外的异常处理机制
    
    特性:
        - 自动捕获并记录线程执行过程中的异常
        - 提供更安全的线程停止机制
        - 支持结果获取和超时控制
    """

线程管理器

ThreadManager

class ThreadManager(SingletonMixin):
    """线程管理器,用于集中管理多个线程的创建、启动、停止和结果获取
    
    类方法:
        create_thread(target, *args, **kwargs): 创建并启动一个新线程
        create_safe_thread(target, *args, **kwargs): 创建并启动一个安全线程
        add_thread(thread): 添加一个已创建的线程
        stop_all(wait=True): 停止所有管理的线程
        wait_all_completed(timeout=None): 等待所有线程完成
        get_all_result(): 获取所有线程的执行结果
        get_active_count(): 获取当前活跃的线程数量
    """

线程池

BaseThreadPool

class BaseThreadPool:
    """基础线程池实现
    
    参数:
        max_workers: 最大工作线程数
        name_prefix: 线程名称前缀
    
    方法:
        submit(func, *args, **kwargs): 提交任务到线程池
        shutdown(wait=True): 关闭线程池
        get_workers_count(): 获取当前工作线程数量
    """

EnhancedThreadPool

class EnhancedThreadPool(BaseThreadPool):
    """增强型线程池,提供更多高级功能
    
    特性:
        - 任务优先级支持
        - 任务取消功能
        - 动态调整线程池大小
        - 任务完成回调
    """

AsyncFunction

class AsyncFunction:
    """函数异步执行包装器
    
    参数:
        fn: 要执行的目标函数
        *args: 传递给目标函数的参数
        **kwargs: 传递给目标函数的关键字参数
        max_workers: 可选,指定线程池最大工作线程数
    
    属性:
        result: 异步执行的结果
    """

单例线程

SingletonThread

class SingletonThread(SingletonMixin, SafeThread):
    """单例线程类,确保同一目标函数只有一个线程实例
    
    特性:
        - 单例模式保证唯一性
        - 自动线程管理
        - 安全停止机制
    """

Qt 线程

QtThreadBase

class QtThreadBase(QThread):
    """基于QThread的增强型线程基类
    
    信号:
        finished_signal: 线程完成时发射,携带执行结果
        error_signal: 线程发生错误时发射,携带异常信息
    
    方法:
        stop(): 安全停止线程
        get_result(): 获取线程执行结果
    """

QtSafeThread

class QtSafeThread(QtThreadBase):
    """基于QThread的安全线程类
    
    特性:
        - 自动异常捕获和信号发射
        - 安全的资源管理
        - 结果获取机制
    """

生产者-消费者模式

Production

class Production:
    """同步多线程生产者-消费者模式实现
    
    参数:
        queue_size: 任务队列的最大容量
    
    方法:
        start(num_producers, num_consumers, producer_fn, consumer_fn): 启动生产系统
        shutdown(): 关闭生产系统
        wait_until_done(timeout=None): 等待所有任务处理完成
        get_result(): 获取一个处理结果
        get_result_list(): 获取所有处理结果
    """

AsyncProduction

class AsyncProduction:
    """异步协程生产者-消费者模式实现
    
    参数:
        queue_size: 异步任务队列的最大容量
    
    方法:
        start(num_producers, num_consumers, producer_fn, consumer_fn): 启动异步生产系统
        shutdown(): 关闭异步生产系统
        wait_until_done(timeout=None): 等待所有任务处理完成
    """

进程管理

CustomProcess

class CustomProcess(multiprocessing.Process):
    """自定义进程类,提供增强的进程管理功能
    
    特性:
        - 结果返回机制
        - 异常处理
        - 优雅的进程终止
    """

run_custom_process

def run_custom_process(func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """在新进程中执行函数并返回结果
    
    参数:
        func: 要在新进程中执行的函数
        *args: 传递给函数的位置参数
        **kwargs: 传递给函数的关键字参数
    
    返回:
        函数执行的结果
    """

测试

项目包含完整的测试套件,使用以下命令运行测试:

pytest

开发工具

项目使用以下工具进行代码质量保证:

  • Ruff: 用于代码风格检查和格式化
  • basedpyright: 用于静态类型检查
  • mypy: 可选的静态类型检查器

贡献指南

  1. Fork 项目仓库
  2. 创建功能分支
  3. 提交代码更改
  4. 运行测试确保代码质量
  5. 提交 Pull Request

许可证

本项目采用 MIT 许可证。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nsthread-0.0.6.tar.gz (45.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nsthread-0.0.6-py3-none-any.whl (39.1 kB view details)

Uploaded Python 3

File details

Details for the file nsthread-0.0.6.tar.gz.

File metadata

  • Download URL: nsthread-0.0.6.tar.gz
  • Upload date:
  • Size: 45.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for nsthread-0.0.6.tar.gz
Algorithm Hash digest
SHA256 5fd6ed32c8bbf8e38aff178c3eb0ef2e3115423f652c6f8255fdf9db6cfee776
MD5 a160cc9e545943ea6398316a49d22265
BLAKE2b-256 9d1ef0f71b4dc01646f0e40044c1daf5a2d9f80e9d9b6ac3c8d394a7e1ca43d0

See more details on using hashes here.

File details

Details for the file nsthread-0.0.6-py3-none-any.whl.

File metadata

  • Download URL: nsthread-0.0.6-py3-none-any.whl
  • Upload date:
  • Size: 39.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for nsthread-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 baaad84045ee97495914489101e3299e2ad9988bad8da11b8782699a5e028dbf
MD5 a69377155860ddf5ace875929e5bb4fb
BLAKE2b-256 74734a25699457a2662c9652a5449809e5c7e408ec9b5238dd715ba6bea74d12

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page