Skip to main content

Executor leasing, backpressure, and adaptive CPU/I/O offloading for async Python.

Project description

leasepool

Executor leasing, backpressure, and adaptive CPU/I/O offloading for async Python.

leasepool helps async applications borrow local executor capacity safely. Instead of passing raw ThreadPoolExecutor or ProcessPoolExecutor objects around your codebase, callers acquire short-lived leases, run synchronous work, and return the executor to the manager.

Use it when you need to run blocking I/O, legacy sync SDK calls, or CPU-heavy functions from an asyncio application without letting unbounded executor usage spread through the service.

Install

pip install leasepool

For local development from this repository:

pip install -e .

Project links

Free-threaded Python / no-GIL support

leasepool is a pure Python package and does not ship native extension modules. It is intended to work on CPython free-threaded builds.

Current support level:

  • CPython free-threaded builds: Stable
  • Package classifier: Programming Language :: Python :: Free Threading :: 3 - Stable

Note that leasepool can manage executor leases safely, but functions submitted by users are still responsible for their own thread-safety when run concurrently.

Quickstart

import asyncio
import time

from leasepool import ExecutorBackend, LeasedExecutorManager


def blocking_uppercase(value: str) -> str:
    time.sleep(0.2)
    return value.upper()


async def main() -> None:
    manager = LeasedExecutorManager(
        backend=ExecutorBackend.THREAD,
        max_pools=2,
        min_pools=1,
        workers_per_pool=4,
        name_prefix="quickstart-worker",
    )

    await manager.start()

    try:
        async with await manager.acquire(owner="quickstart") as lease:
            result = await lease.run(blocking_uppercase, "hello leasepool")

        print(result)
    finally:
        await manager.stop()


asyncio.run(main())

Core usage model

A LeasedExecutorManager owns a bounded number of executor pools.

async application
  -> LeasedExecutorManager
  -> ExecutorLease
  -> ThreadPoolExecutor / ProcessPoolExecutor

The lease is the temporary right to submit work to one executor. Prefer the async context-manager style because it returns the lease automatically:

async with await manager.acquire(owner="vendor-sdk-call") as lease:
    result = await lease.run(sync_function, payload)

Manual release is also available when your control flow requires it:

lease = await manager.acquire(owner="manual-flow")
try:
    result = await lease.run(sync_function)
finally:
    await lease.release()

lease.executor is a safe proxy. It supports submit(), but prevents callers from shutting down the internal executor or submitting after the lease expires or has been released.

Backends

Thread backend

Use backend="thread" or ExecutorBackend.THREAD for blocking I/O:

  • blocking vendor SDKs
  • synchronous HTTP or database clients
  • file I/O
  • small blocking operations called from async services

Process backend

Use backend="process" or ExecutorBackend.PROCESS for CPU-heavy Python work that should run across CPU cores:

manager = LeasedExecutorManager(
    backend=ExecutorBackend.PROCESS,
    max_pools=1,
    min_pools=1,
    workers_per_pool=4,
)

Functions, arguments, and return values submitted to the process backend must be picklable. Prefer top-level functions and simple serializable data. Do not submit lambdas, nested functions, sockets, database clients, or async clients.

Interpreter backend

backend="interpreter" is reserved for Python 3.14+ InterpreterPoolExecutor. On earlier Python versions, selecting it raises UnsupportedBackendError.

Backpressure and waiting

max_pools bounds the number of executor objects owned by one manager. If every pool is leased, acquire() waits by default:

lease = await manager.acquire(owner="wait-for-capacity", timeout=2.0)

Fail immediately with wait=False:

from leasepool import LeaseUnavailableError

try:
    lease = await manager.acquire(owner="no-wait", wait=False)
except LeaseUnavailableError:
    # Return HTTP 503, retry later, or enqueue elsewhere.
    ...

Lease expiry

Each lease has a soft lifetime and a grace period:

manager = LeasedExecutorManager(
    backend="thread",
    max_pools=2,
    default_lease_seconds=300.0,
    lease_grace_seconds=15.0,
)

lease = await manager.acquire(lease_seconds=30.0, owner="bounded-operation")

After lease_seconds + lease_grace_seconds, new submissions through that lease raise LeaseExpiredError. Existing work that was already submitted is left to the underlying executor semantics.

Adaptive sizing

You can connect desired pool count to a runtime signal:

connected_devices: set[str] = set()

manager = LeasedExecutorManager(
    backend="thread",
    max_pools=10,
    min_pools=1,
    units_per_pool=10,
    size_provider=lambda: len(connected_devices),
)

await manager.start()

connected_devices.add("device-1")
manager.notify_scale_changed()

The target executor count is:

max(min_pools, ceil(size_provider() / units_per_pool))

capped by max_pools. Idle executors above the target are shut down. Active non-expired leases are not revoked just because the target shrinks.

WorkGrinder

WorkGrinder batches many async submitters into leased executor batches:

from leasepool import WorkGrinder

grinder = WorkGrinder(
    executor_manager=manager,
    batch_size_threshold=50,
    max_wait_seconds=2.0,
    lease_seconds=60.0,
)

await grinder.start()

try:
    result = await grinder.submit(sync_function, payload, owner="item-1")
finally:
    await grinder.stop(cancel_pending=True)

Use:

  • await grinder.submit(...) to queue work and wait for its result.
  • await grinder.enqueue(...) to receive an asyncio.Future immediately.
  • grinder.submit_from_thread(...) from non-async code or another OS thread.
  • grinder.stats() for diagnostics from the event-loop thread.

Process worker log forwarding

Worker-process logs do not automatically flow through the parent process logger. Enable forwarding when using the process backend:

import logging

from leasepool import LeasedExecutorManager, ProcessLoggingConfig

manager = LeasedExecutorManager(
    backend="process",
    max_pools=1,
    min_pools=1,
    workers_per_pool=2,
    process_logging=ProcessLoggingConfig(
        enabled=True,
        level="INFO",
        target_logger=logging.getLogger("leasepool.process"),
    ),
)

A convenience form is also available:

manager = LeasedExecutorManager(
    backend="process",
    max_pools=1,
    forward_process_logs=True,
    process_log_level="INFO",
)

Diagnostics

print(manager.backend)
print(manager.available_count)
print(manager.leased_count)
print(manager.total_count)
print(manager.desired_executor_count())
print(manager.stats())

manager.stats() includes backend, sizing, counts, workers per pool, and current lease expiry information.

Examples

The examples/ directory is designed as a learning path:

python examples/00_quickstart_thread_backend.py
python examples/07_process_backend_cpu_work.py
python examples/08_work_grinder_submit.py
python examples/13_process_log_forwarding.py
python examples/14_complete_library_walkthrough.py

For process-backend examples, run files directly instead of pasting them into a REPL. ProcessPoolExecutor needs importable top-level functions.

See README_EXAMPLES.md for the complete example map.

What leasepool is not

leasepool is not a distributed task queue. It does not provide persistence, remote workers, retries across machines, scheduling, or broker integration. It manages local executor capacity inside one Python process.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

leasepool-0.1.1.tar.gz (863.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

leasepool-0.1.1-py3-none-any.whl (21.9 kB view details)

Uploaded Python 3

File details

Details for the file leasepool-0.1.1.tar.gz.

File metadata

  • Download URL: leasepool-0.1.1.tar.gz
  • Upload date:
  • Size: 863.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for leasepool-0.1.1.tar.gz
Algorithm Hash digest
SHA256 1c5a1df589031ac16eddb234fc39fec979be0cacfaefe66d814b6d7353e05594
MD5 fb42c2a4b3a6e73a858db3bb6cc137c9
BLAKE2b-256 64316d76f20fcf67709d1f10cd50e6fa841f8a4fc365d8ae6acf1cc9da247c14

See more details on using hashes here.

File details

Details for the file leasepool-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: leasepool-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 21.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for leasepool-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 69b37e57013350938432dea6a572d780f5c113761dcfc8b03fe9db17ff412f5a
MD5 23f33e523f6534f693ef235f639a3498
BLAKE2b-256 94d2b17e9876a210d15408fb153f776a232cf797a2576b7040bd23f6569bf7cc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page