Skip to main content

A generic, async-first object pool for Python

Project description

Object Pool

一个功能丰富、支持异步的通用对象池,提供细粒度的对象生命周期管理、状态检测、超时回收和灵活的定制能力。

特性

  • 异步优先:原生支持 asyncio,在高并发 I/O 场景下性能优异
  • 🔄 完整生命周期:管理对象的 PENDINGAVAILABLEBORROWEDINVALIDDORMANTUNKNOWN 状态
  • ⏱️ 超时回收:支持对象级别的借出超时自动回收
  • 🧩 高度可定制:可自定义状态检测器、对象选择器、包装器(Wrapper)和打包器(Packer)
  • 🚦 并发控制:内置并发度限制,批量操作自动分块
  • 🔁 双 API:同时提供异步(*_async)和同步接口
  • 🧵 协程安全:基于 AsyncRLock 实现,在单事件循环多协程下安全

安装

pip install object-pool

或从源码安装:

git clone https://github.com/Jerry-Wu-GitHub/object-pool.git
cd object-pool
pip install -e .

快速开始

同步使用

from object_pool import Pool, Status
from datetime import timedelta

# 定义一个简单的状态检测函数
def check_status(wrapper):
    # 假设对象有一个 is_alive 属性
    if wrapper.object.is_alive():
        return Status.AVAILABLE
    return Status.INVALID

# 创建对象池
pool = Pool(detect_status=check_status)

# 添加对象
obj = MyResource()
pool.add(obj)

# 借出对象
with pool.borrow() as resource:
    resource.do_something()
# 离开上下文时自动归还

# 手动归还
pack = pool.borrow()
pack.close()   # 归还

异步使用

import asyncio
from object_pool import Pool, Status

async def check_status_async(wrapper):
    # 异步检测对象健康状态
    if await wrapper.object.ping():
        return Status.AVAILABLE
    return Status.INVALID

async def main():
    pool = Pool(detect_status=check_status_async)
    obj = AsyncResource()
    await pool.add_async(obj)

    pack = await pool.borrow_async(timeout=3.0)
    try:
        await pack.object.work()
    finally:
        pack.close()   # 归还

    # 或使用异步上下文管理器
    async with await pool.borrow_async() as resource:
        await resource.work()

asyncio.run(main())

高级用法

自定义对象选择器

默认使用随机选择。你可以实现自己的选择逻辑,例如轮询(Round-Robin)或基于权重的选择

def select_object(object_wrapers: List[ObjectWrapper[DummyObject]]) -> Optional[ObjectWrapper[DummyObject]]:
    """
    按照权重选择一个对象。
    """
    if not object_wrapers:
        return None
    weights = [
        object_wraper.weight
        for object_wraper in object_wrapers
    ]
    return choices(object_wrapers, weights)[0]

# 注意:选择器可以是同步或异步函数,池内部会自动转换

定制包装器和打包器

class MyWrapper(ObjectWrapper):
    def __init__(self, obj, **kwargs):
        super().__init__(obj, **kwargs)
        self.extra_info = kwargs.get('extra', {})

def my_packer(wrapper):
    # 返回自定义的打包对象,可以增加额外行为
    return MyPack(wrapper)

pool = Pool(detect_status=..., wrapper=MyWrapper, packer=my_packer)

使用默认数据

通过 wrapping_data 为每个对象附加默认属性:

pool = Pool(..., wrapping_data={"created_at": datetime.now(), "source": "db"})
pool.add(obj)   # ObjectWrapper 会自动包含 created_at 和 source

并发控制

async_concurrency 限制了同时执行状态检测或清理操作的并发任务数,避免瞬间压力过大:

pool = Pool(..., async_concurrency=10)   # 最多同时检测10个对象

并发与线程安全

  • 协程安全Pool 使用 AsyncRLock 保护关键操作,在单个事件循环内多协程并发访问是安全的。
  • 线程安全不保证多线程安全。如果在多线程环境中使用同步方法(addborrow 等),需要外部加锁(如 threading.Lock)。
  • 建议:在 asyncio 应用中始终使用异步 API;在传统多线程应用中请自行添加线程锁。

依赖

  • Python >= 3.10

许可证

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

object_pool_async-0.1.2.tar.gz (12.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

object_pool_async-0.1.2-py3-none-any.whl (10.9 kB view details)

Uploaded Python 3

File details

Details for the file object_pool_async-0.1.2.tar.gz.

File metadata

  • Download URL: object_pool_async-0.1.2.tar.gz
  • Upload date:
  • Size: 12.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for object_pool_async-0.1.2.tar.gz
Algorithm Hash digest
SHA256 52b0c37306964508a843493dba7ffc3114c49704a99ac751c481b2f147d1cf27
MD5 83251fe8467d2d2529e4e8ac4929d99b
BLAKE2b-256 81a0bf1db3227136f5d97cbf92d057d6750fad87fa4e876f608bf10ccfb3aa4a

See more details on using hashes here.

File details

Details for the file object_pool_async-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for object_pool_async-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 abe4213d6481a36a2a4d84144cf4b049b2689251e1940fd24a5829861a8a2093
MD5 381d95430d8eea1c020ded5a088a881e
BLAKE2b-256 66644f883d94934d1dea7d89ff0124c80898a49610f0847404ce8c0d79254561

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page