Skip to main content

execute with timeout

Project description

timeout-executor

License: MIT github action codecov PyPI version python version

how to install

$ pip install timeout_executor
# or
$ pip install "timeout_executor[uvloop]"
# or
$ pip install "timeout_executor[jinja]"

how to use

from __future__ import annotations

import time

import anyio

from timeout_executor import AsyncResult, TimeoutExecutor


def sample_sync_func(x: float) -> str:
    time.sleep(x)
    return "done"


async def sample_async_func(x: float) -> str:
    await anyio.sleep(x)
    return "done"


def main() -> None:
    executor = TimeoutExecutor(2)
    result = executor.apply(sample_sync_func, 10)
    assert isinstance(result, AsyncResult)

    try:
        value = result.result()
    except Exception as exc:
        assert isinstance(exc, TimeoutError)

    result = executor.apply(sample_async_func, 1)
    assert isinstance(result, AsyncResult)
    value = result.result()
    assert value == "done"

    result = executor.apply(lambda: "done")
    assert isinstance(result, AsyncResult)
    value = result.result()
    assert value == "done"


async def async_main() -> None:
    executor = TimeoutExecutor(2)
    result = await executor.delay(sample_sync_func, 10)
    assert isinstance(result, AsyncResult)

    try:
        value = await result.delay()
    except Exception as exc:
        assert isinstance(exc, TimeoutError)

    result = await executor.delay(sample_async_func, 1)
    assert isinstance(result, AsyncResult)
    value = await result.delay()
    assert value == "done"

    result = await executor.delay(lambda: "done")
    assert isinstance(result, AsyncResult)
    value = await result.delay()
    assert value == "done"


if __name__ == "__main__":
    main()
    anyio.run(async_main)

License

MIT, see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

timeout_executor-0.9.1.tar.gz (13.9 kB view details)

Uploaded Source

Built Distribution

timeout_executor-0.9.1-py3-none-any.whl (18.9 kB view details)

Uploaded Python 3

File details

Details for the file timeout_executor-0.9.1.tar.gz.

File metadata

  • Download URL: timeout_executor-0.9.1.tar.gz
  • Upload date:
  • Size: 13.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.4.22

File hashes

Hashes for timeout_executor-0.9.1.tar.gz
Algorithm Hash digest
SHA256 e499b05897c8e46dcabd95ea62e30fc589b2441f0416c7cfd84b3a82effcfe6d
MD5 3f10d00bcc42b6a78fa4ef2b7b1e0cb8
BLAKE2b-256 2386077caa9fd4df1dc29ade7af5186065ca5c244aa8496a88d4d6606686cfab

See more details on using hashes here.

File details

Details for the file timeout_executor-0.9.1-py3-none-any.whl.

File metadata

File hashes

Hashes for timeout_executor-0.9.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1f7f0ce010253c0dbb49a659717b77ff4d6cb43a6a160ca6db376dd9e857ce29
MD5 5e9f9c387cca41d14dfccfcfad899a5c
BLAKE2b-256 22fa76e4bee654f838e248572292e29164ac0a0fee2d01b6e6df4268312e97c6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page