Executor leasing, backpressure, and adaptive CPU/I/O offloading for async Python.
Project description
leasepool
Executor leasing, backpressure, and adaptive CPU/I/O offloading for async Python.
leasepool helps async applications borrow local executor capacity safely. Instead
of passing raw ThreadPoolExecutor or ProcessPoolExecutor objects around your
codebase, callers acquire short-lived leases, run synchronous work, and return the
executor to the manager.
Use it when you need to run blocking I/O, legacy sync SDK calls, or CPU-heavy
functions from an asyncio application without letting unbounded executor usage
spread through the service.
Install
pip install leasepool
For local development from this repository:
pip install -e .
Project links
- GitHub: https://github.com/jackofsometrades99/leasepool
- PyPI: https://pypi.org/project/leasepool/
- Official Documentation: https://leasepool.readthedocs.io/en/latest/
Free-threaded Python / no-GIL support
leasepool is a pure Python package and does not ship native extension modules.
It is intended to work on CPython free-threaded builds.
Current support level:
- CPython free-threaded builds: Stable
- Package classifier:
Programming Language :: Python :: Free Threading :: 3 - Stable
Note that leasepool can manage executor leases safely, but functions submitted by
users are still responsible for their own thread-safety when run concurrently.
Quickstart
import asyncio
import time
from leasepool import ExecutorBackend, LeasedExecutorManager
def blocking_uppercase(value: str) -> str:
time.sleep(0.2)
return value.upper()
async def main() -> None:
manager = LeasedExecutorManager(
backend=ExecutorBackend.THREAD,
max_pools=2,
min_pools=1,
workers_per_pool=4,
name_prefix="quickstart-worker",
)
await manager.start()
try:
async with await manager.acquire(owner="quickstart") as lease:
result = await lease.run(blocking_uppercase, "hello leasepool")
print(result)
finally:
await manager.stop()
asyncio.run(main())
Core usage model
A LeasedExecutorManager owns a bounded number of executor pools.
async application
-> LeasedExecutorManager
-> ExecutorLease
-> ThreadPoolExecutor / ProcessPoolExecutor
The lease is the temporary right to submit work to one executor. Prefer the async context-manager style because it returns the lease automatically:
async with await manager.acquire(owner="vendor-sdk-call") as lease:
result = await lease.run(sync_function, payload)
Manual release is also available when your control flow requires it:
lease = await manager.acquire(owner="manual-flow")
try:
result = await lease.run(sync_function)
finally:
await lease.release()
lease.executor is a safe proxy. It supports submit(), but prevents callers
from shutting down the internal executor or submitting after the lease expires or
has been released.
Backends
Thread backend
Use backend="thread" or ExecutorBackend.THREAD for blocking I/O:
- blocking vendor SDKs
- synchronous HTTP or database clients
- file I/O
- small blocking operations called from async services
Process backend
Use backend="process" or ExecutorBackend.PROCESS for CPU-heavy Python work
that should run across CPU cores:
manager = LeasedExecutorManager(
backend=ExecutorBackend.PROCESS,
max_pools=1,
min_pools=1,
workers_per_pool=4,
)
Functions, arguments, and return values submitted to the process backend must be picklable. Prefer top-level functions and simple serializable data. Do not submit lambdas, nested functions, sockets, database clients, or async clients.
Interpreter backend
backend="interpreter" is reserved for Python 3.14+ InterpreterPoolExecutor.
On earlier Python versions, selecting it raises UnsupportedBackendError.
Backpressure and waiting
max_pools bounds the number of executor objects owned by one manager. If every
pool is leased, acquire() waits by default:
lease = await manager.acquire(owner="wait-for-capacity", timeout=2.0)
Fail immediately with wait=False:
from leasepool import LeaseUnavailableError
try:
lease = await manager.acquire(owner="no-wait", wait=False)
except LeaseUnavailableError:
# Return HTTP 503, retry later, or enqueue elsewhere.
...
Lease expiry
Each lease has a soft lifetime and a grace period:
manager = LeasedExecutorManager(
backend="thread",
max_pools=2,
default_lease_seconds=300.0,
lease_grace_seconds=15.0,
)
lease = await manager.acquire(lease_seconds=30.0, owner="bounded-operation")
After lease_seconds + lease_grace_seconds, new submissions through that lease
raise LeaseExpiredError. Existing work that was already submitted is left to the
underlying executor semantics.
Adaptive sizing
You can connect desired pool count to a runtime signal:
connected_devices: set[str] = set()
manager = LeasedExecutorManager(
backend="thread",
max_pools=10,
min_pools=1,
units_per_pool=10,
size_provider=lambda: len(connected_devices),
)
await manager.start()
connected_devices.add("device-1")
manager.notify_scale_changed()
The target executor count is:
max(min_pools, ceil(size_provider() / units_per_pool))
capped by max_pools. Idle executors above the target are shut down. Active
non-expired leases are not revoked just because the target shrinks.
WorkGrinder
WorkGrinder batches many async submitters into leased executor batches:
from leasepool import WorkGrinder
grinder = WorkGrinder(
executor_manager=manager,
batch_size_threshold=50,
max_wait_seconds=2.0,
lease_seconds=60.0,
)
await grinder.start()
try:
result = await grinder.submit(sync_function, payload, owner="item-1")
finally:
await grinder.stop(cancel_pending=True)
Use:
await grinder.submit(...)to queue work and wait for its result.await grinder.enqueue(...)to receive anasyncio.Futureimmediately.grinder.submit_from_thread(...)from non-async code or another OS thread.grinder.stats()for diagnostics from the event-loop thread.
Process worker log forwarding
Worker-process logs do not automatically flow through the parent process logger. Enable forwarding when using the process backend:
import logging
from leasepool import LeasedExecutorManager, ProcessLoggingConfig
manager = LeasedExecutorManager(
backend="process",
max_pools=1,
min_pools=1,
workers_per_pool=2,
process_logging=ProcessLoggingConfig(
enabled=True,
level="INFO",
target_logger=logging.getLogger("leasepool.process"),
),
)
A convenience form is also available:
manager = LeasedExecutorManager(
backend="process",
max_pools=1,
forward_process_logs=True,
process_log_level="INFO",
)
Diagnostics
print(manager.backend)
print(manager.available_count)
print(manager.leased_count)
print(manager.total_count)
print(manager.desired_executor_count())
print(manager.stats())
manager.stats() includes backend, sizing, counts, workers per pool, and current
lease expiry information.
Examples
The examples/ directory is designed as a learning path:
python examples/00_quickstart_thread_backend.py
python examples/07_process_backend_cpu_work.py
python examples/08_work_grinder_submit.py
python examples/13_process_log_forwarding.py
python examples/14_complete_library_walkthrough.py
For process-backend examples, run files directly instead of pasting them into a
REPL. ProcessPoolExecutor needs importable top-level functions.
See README_EXAMPLES.md for the complete example map.
What leasepool is not
leasepool is not a distributed task queue. It does not provide persistence,
remote workers, retries across machines, scheduling, or broker integration. It
manages local executor capacity inside one Python process.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file leasepool-0.1.1.tar.gz.
File metadata
- Download URL: leasepool-0.1.1.tar.gz
- Upload date:
- Size: 863.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1c5a1df589031ac16eddb234fc39fec979be0cacfaefe66d814b6d7353e05594
|
|
| MD5 |
fb42c2a4b3a6e73a858db3bb6cc137c9
|
|
| BLAKE2b-256 |
64316d76f20fcf67709d1f10cd50e6fa841f8a4fc365d8ae6acf1cc9da247c14
|
File details
Details for the file leasepool-0.1.1-py3-none-any.whl.
File metadata
- Download URL: leasepool-0.1.1-py3-none-any.whl
- Upload date:
- Size: 21.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
69b37e57013350938432dea6a572d780f5c113761dcfc8b03fe9db17ff412f5a
|
|
| MD5 |
23f33e523f6534f693ef235f639a3498
|
|
| BLAKE2b-256 |
94d2b17e9876a210d15408fb153f776a232cf797a2576b7040bd23f6569bf7cc
|