execute with timeout
Project description
timeout-executor
how to install
$ pip install timeout_executor
# or
$ pip install "timeout_executor[uvloop]"
# or
$ pip install "timeout_executor[jinja]"
how to use
from __future__ import annotations
import time
import anyio
from timeout_executor import AsyncResult, TimeoutExecutor
def sample_sync_func(x: float) -> str:
time.sleep(x)
return "done"
async def sample_async_func(x: float) -> str:
await anyio.sleep(x)
return "done"
def main() -> None:
executor = TimeoutExecutor(2)
result = executor.apply(sample_sync_func, 10)
assert isinstance(result, AsyncResult)
try:
value = result.result()
except Exception as exc:
assert isinstance(exc, TimeoutError)
result = executor.apply(sample_async_func, 1)
assert isinstance(result, AsyncResult)
value = result.result()
assert value == "done"
result = executor.apply(lambda: "done")
assert isinstance(result, AsyncResult)
value = result.result()
assert value == "done"
async def async_main() -> None:
executor = TimeoutExecutor(2)
result = await executor.delay(sample_sync_func, 10)
assert isinstance(result, AsyncResult)
try:
value = await result.delay()
except Exception as exc:
assert isinstance(exc, TimeoutError)
result = await executor.delay(sample_async_func, 1)
assert isinstance(result, AsyncResult)
value = await result.delay()
assert value == "done"
result = await executor.delay(lambda: "done")
assert isinstance(result, AsyncResult)
value = await result.delay()
assert value == "done"
if __name__ == "__main__":
main()
anyio.run(async_main)
License
MIT, see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
timeout_executor-0.9.1.tar.gz
(13.9 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file timeout_executor-0.9.1.tar.gz.
File metadata
- Download URL: timeout_executor-0.9.1.tar.gz
- Upload date:
- Size: 13.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.4.22
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e499b05897c8e46dcabd95ea62e30fc589b2441f0416c7cfd84b3a82effcfe6d
|
|
| MD5 |
3f10d00bcc42b6a78fa4ef2b7b1e0cb8
|
|
| BLAKE2b-256 |
2386077caa9fd4df1dc29ade7af5186065ca5c244aa8496a88d4d6606686cfab
|
File details
Details for the file timeout_executor-0.9.1-py3-none-any.whl.
File metadata
- Download URL: timeout_executor-0.9.1-py3-none-any.whl
- Upload date:
- Size: 18.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.4.22
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1f7f0ce010253c0dbb49a659717b77ff4d6cb43a6a160ca6db376dd9e857ce29
|
|
| MD5 |
5e9f9c387cca41d14dfccfcfad899a5c
|
|
| BLAKE2b-256 |
22fa76e4bee654f838e248572292e29164ac0a0fee2d01b6e6df4268312e97c6
|