Skip to main content

A generic, async-first object pool for Python

Project description

Object Pool

一个功能丰富、支持异步的通用对象池,提供细粒度的对象生命周期管理、状态检测、超时回收和灵活的定制能力。

特性

  • 异步优先:原生支持 asyncio,在高并发 I/O 场景下性能优异
  • 🔄 完整生命周期:管理对象的 PENDINGAVAILABLEBORROWEDINVALIDDORMANTUNKNOWN 状态
  • ⏱️ 超时回收:支持对象级别的借出超时自动回收
  • 🧩 高度可定制:可自定义状态检测器、对象选择器、包装器(Wrapper)和打包器(Packer)
  • 🚦 并发控制:内置并发度限制,批量操作自动分块
  • 🔁 双 API:同时提供异步(*_async)和同步接口
  • 🧵 协程安全:基于 AsyncRLock 实现,在单事件循环多协程下安全

安装

pip install object-pool-async

或从源码安装:

git clone https://github.com/Jerry-Wu-GitHub/object-pool.git
cd object-pool
pip install -e .

快速开始

同步使用

from object_pool import Pool, Status
from datetime import timedelta

# 定义一个简单的状态检测函数
def check_status(wrapper):
    # 假设对象有一个 is_alive 属性
    if wrapper.object.is_alive():
        return Status.AVAILABLE
    return Status.INVALID

# 创建对象池
pool = Pool(detect_status=check_status)

# 添加对象
obj = MyResource()
pool.add(obj)

# 借出对象
with pool.borrow() as resource:
    resource.do_something()
# 离开上下文时自动归还

# 手动归还
pack = pool.borrow()
pack.close()   # 归还

异步使用

import asyncio
from object_pool import Pool, Status

async def check_status_async(wrapper):
    # 异步检测对象健康状态
    if await wrapper.object.ping():
        return Status.AVAILABLE
    return Status.INVALID

async def main():
    pool = Pool(detect_status=check_status_async)
    obj = AsyncResource()
    await pool.add_async(obj)

    pack = await pool.borrow_async(timeout=3.0)
    try:
        await pack.object.work()
    finally:
        pack.close()   # 归还

    # 或使用异步上下文管理器
    async with await pool.borrow_async() as resource:
        await resource.work()

asyncio.run(main())

高级用法

自定义对象选择器

默认使用随机选择。你可以实现自己的选择逻辑,例如轮询(Round-Robin)或基于权重的选择

def select_object(object_wrapers: List[ObjectWrapper[DummyObject]]) -> Optional[ObjectWrapper[DummyObject]]:
    """
    按照权重选择一个对象。
    """
    if not object_wrapers:
        return None
    weights = [
        object_wraper.weight
        for object_wraper in object_wrapers
    ]
    return choices(object_wrapers, weights)[0]

# 注意:选择器可以是同步或异步函数,池内部会自动转换

定制包装器和打包器

class MyWrapper(ObjectWrapper):
    def __init__(self, obj, **kwargs):
        super().__init__(obj, **kwargs)
        self.extra_info = kwargs.get('extra', {})

def my_packer(wrapper):
    # 返回自定义的打包对象,可以增加额外行为
    return MyPack(wrapper)

pool = Pool(detect_status=..., wrapper=MyWrapper, packer=my_packer)

使用默认数据

通过 wrapping_data 为每个对象附加默认属性:

pool = Pool(..., wrapping_data={"created_at": datetime.now(), "source": "db"})
pool.add(obj)   # ObjectWrapper 会自动包含 created_at 和 source

并发控制

async_concurrency 限制了同时执行状态检测或清理操作的并发任务数,避免瞬间压力过大:

pool = Pool(..., async_concurrency=10)   # 最多同时检测10个对象

并发与线程安全

  • 协程安全Pool 使用 AsyncRLock 保护关键操作,在单个事件循环内多协程并发访问是安全的。
  • 线程安全不保证多线程安全。如果在多线程环境中使用同步方法(addborrow 等),需要外部加锁(如 threading.Lock)。
  • 建议:在 asyncio 应用中始终使用异步 API;在传统多线程应用中请自行添加线程锁。

依赖

  • Python >= 3.10

许可证

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

object_pool_async-0.1.3.tar.gz (14.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

object_pool_async-0.1.3-py3-none-any.whl (11.4 kB view details)

Uploaded Python 3

File details

Details for the file object_pool_async-0.1.3.tar.gz.

File metadata

  • Download URL: object_pool_async-0.1.3.tar.gz
  • Upload date:
  • Size: 14.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for object_pool_async-0.1.3.tar.gz
Algorithm Hash digest
SHA256 b48466bbb2343ac17629249483c89e3487a8f1d1a3e00431ab473afc9c83dd21
MD5 33e42b7f27227027ab65fd80f6dabf0a
BLAKE2b-256 667707ffb3169a0306a3ad7bd09637de9cbe66f47cc764e630fd2d9a687353ad

See more details on using hashes here.

File details

Details for the file object_pool_async-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for object_pool_async-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 3bcf77f6c48739b41ee41a480a6856e6b1ef7c449a6e2b039ddf15fd0f9c8335
MD5 b9c833994cfceab8977afd3bd1f6fa8f
BLAKE2b-256 d1bd4ee56f68236dec8f2b553368e73c582ef06a86038d6eb89df0a0bdfc68a8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page