Skip to main content

Async task layer for Streamlit: background sync/async work, polling, progress, cancellation

Project description

asynclit

Read the Docs PyPI version Python versions License

Documentation: asynclit.readthedocs.io

Small async task layer for Streamlit (and similar “sync main thread + rerun” UIs): run sync or async work on a dedicated background event loop, then poll status, results, progress, and cancellation without blocking the UI thread.

Install

pip install asynclit

Optional extras:

  • Streamlit (for a typical app environment): pip install 'asynclit[streamlit]'
  • APScheduler (optional timers/jobs): pip install 'asynclit[scheduler]'

Requires Python 3.9+.

Quick start

import streamlit as st
import asynclit

task = asynclit.run(fetch_data)

if task.done:
    st.write(task.result)
else:
    st.write("Loading…")

On each rerun, check task.done and read task.result when finished.

Example output (from the included test app tests/streamlit_apps/asynclet_poll_app.py via Streamlit AppTest):

run 0 ['wait']
run 1 ['ready:138']
... last ['ready:138']

Public API

Item Role
asynclit.run(func, /, *args, manager=None, retry=None, **kwargs) Submit func on the worker; returns a Task.
Task.done Whether the result (or error) is ready.
Task.result Result value; raises if not complete.
Task.status TaskStatus: PENDING, RUNNING, DONE, ERROR, CANCELLED.
Task.error Exception object when status is ERROR, else None.
Task.cancel() Request cancellation (running tasks use asyncio cancellation; pending tasks cancel the result future).
Task.progress Non-blocking drain of progress values (see below).
TaskManager / get_default_manager() Custom registry and cleanup() when you keep many completed tasks.
session_tasks(session_state) Dict stored on st.session_state for named tasks.
RetryPolicy Retry configuration for exception-based retries.
schedule_interval(...) / schedule_cron(...) Schedule periodic task submissions (requires asynclit[scheduler]).

Progress (Janus)

Progress is supported for async functions only.

Declare a parameter named queue or progress_queue:

  • If it is the first parameter, asynclit injects the Janus queue positionally and the remaining positional arguments to run() map to the rest of the signature.
  • Otherwise, asynclit injects the queue by keyword (queue= / progress_queue=).
async def job(queue, steps: int):
    for i in range(steps):
        await queue.async_q.put(i)
    return steps

task = asynclit.run(job, 10)
# Each rerun:
for x in task.progress:
    st.write(x)

The UI thread reads via task.progress, which pulls from the sync side of a janus queue.

Streamlit session state

import streamlit as st
import asynclit

tasks = asynclit.session_tasks(st.session_state)
if "load" not in tasks:
    tasks["load"] = asynclit.run(load_data)

task = tasks["load"]

Patterns

Named tasks (per session)

Use session_tasks(st.session_state) as a stable dict to store tasks across reruns:

tasks = asynclit.session_tasks(st.session_state)

if "load" not in tasks:
    tasks["load"] = asynclit.run(load_data)

task = tasks["load"]

Cleanup (when you create many tasks)

If you create many tasks over time, keep them in a TaskManager and periodically call cleanup() to trim completed entries:

m = asynclit.TaskManager(max_completed=256)
task = asynclit.run(load_data, manager=m)

# ... later:
m.cleanup()

Errors

If the callable raises, task.status becomes ERROR, task.error holds the exception, and reading task.result re-raises it.

if task.status == asynclit.TaskStatus.ERROR:
    st.error(f"failed: {task.error!r}")
elif task.done:
    st.write(task.result)
else:
    st.write("Loading…")

Cancellation

task.cancel() requests cancellation:

  • If the task is running, it schedules asyncio cancellation on the worker loop.
  • If the task is still pending (not yet bound on the worker loop), it cancels the result future.

Treat CANCELLED as a terminal state in UI code.

Cooperative cancellation patterns

  • Async jobs: rely on asyncio cancellation; add cancellation checkpoints when doing long CPU work (break work into chunks; await occasionally).
  • Sync jobs: cancellation is best-effort; the underlying threadpool work may continue running. Prefer chunked work that you can stop between chunks.

Retries

Use RetryPolicy for exception-based retries (opt-in per run() / TaskManager.submit()).

policy = asynclit.RetryPolicy(
    max_attempts=3,
    base_delay=0.1,
    max_delay=1.0,
    multiplier=2.0,
    jitter=0.0,
)

task = asynclit.run(fetch_data, retry=policy)

Retries stop when attempts are exhausted, the task is cancelled, or a raised exception does not match the policy.

Scheduling (APScheduler)

Install with:

pip install 'asynclit[scheduler]'

Then schedule periodic task submission on the worker loop:

asynclit.schedule_interval(load_data, seconds=60, latest_task_name="load_data")

# Later (poll the most recent task):
m = asynclit.get_default_manager()
latest = m.get("global:load_data")
if latest and latest.done:
    st.write(latest.result)

Troubleshooting / FAQ

Why does it keep showing wait?

In rerun-driven UIs, a single script run may finish before the background task completes. The usual pattern is: show wait, then on the next rerun read task.done / task.result.

In tests (or special cases), you may need to allow a small amount of wall time between reruns for the worker to finish.

How it works (short)

  • One daemon thread runs a single asyncio event loop.
  • Async callables run on that loop; sync callables run via asyncer’s asyncify (thread pool).
  • Submissions use asyncio.run_coroutine_threadsafe; results are bridged with a concurrent.futures.Future for the polling API.

Development

pip install -e '.[dev]'
pytest

Development (uv)

If you use uv, you can run tests in a fresh env like:

uv venv
uv pip install -e '.[dev]'
uv run pytest

The dev extra includes Streamlit so CI can run headless AppTest checks in tests/test_streamlit_apptest.py against the sample apps under tests/streamlit_apps/.

Docs

Build the documentation (Sphinx + MyST):

uv venv
uv pip install -e '.[docs]'
uv run python -m sphinx -b html docs docs/_build/html

License

MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asynclit-0.2.1.tar.gz (12.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asynclit-0.2.1-py3-none-any.whl (17.1 kB view details)

Uploaded Python 3

File details

Details for the file asynclit-0.2.1.tar.gz.

File metadata

  • Download URL: asynclit-0.2.1.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for asynclit-0.2.1.tar.gz
Algorithm Hash digest
SHA256 5760231ac4f60c0d0099edf4a076f49426405aa8a36655d10e602f0bbea2cd84
MD5 8b5f80f021454cc446e4582618dec0aa
BLAKE2b-256 7a2fa8d0c46e14fe331892d1d937770e71301f4dd3340d3312921d02500b1d20

See more details on using hashes here.

File details

Details for the file asynclit-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: asynclit-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 17.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for asynclit-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 98850a35de58205ea289ced009f63fe2f7f35bbebdb7618b977c50ffc6989926
MD5 17ca8df56f5b63fcd085479db809c927
BLAKE2b-256 fb6fec262df478929f521e2b18537c111c115871e49632da4204819811ab157f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page