Skip to main content

Async task layer for Streamlit: background sync/async work, polling, progress, cancellation

Project description

asynclit

Read the Docs PyPI version Python versions License

Documentation: asynclit.readthedocs.io

Small async task layer for Streamlit (and similar “sync main thread + rerun” UIs): run sync or async work on a dedicated background event loop, then poll status, results, progress, and cancellation without blocking the UI thread.

Install

pip install asynclit

Optional extras:

  • Streamlit (for a typical app environment): pip install 'asynclit[streamlit]'
  • APScheduler (optional timers/jobs): pip install 'asynclit[scheduler]'

Requires Python 3.9+.

Quick start

import streamlit as st
import asynclit

task = asynclit.run(fetch_data)

if task.done:
    st.write(task.result)
else:
    st.write("Loading…")

On each rerun, check task.done and read task.result when finished.

Example output (from the included test app tests/streamlit_apps/asynclet_poll_app.py via Streamlit AppTest):

run 0 ['wait']
run 1 ['ready:138']
... last ['ready:138']

Public API

Item Role
asynclit.run(func, /, *args, manager=None, retry=None, **kwargs) Submit func on the worker; returns a Task.
Task.done Whether the result (or error) is ready.
Task.result Result value; raises if not complete.
Task.status TaskStatus: PENDING, RUNNING, DONE, ERROR, CANCELLED.
Task.error Exception object when status is ERROR, else None.
Task.cancel() Request cancellation (running tasks use asyncio cancellation; pending tasks cancel the result future).
Task.progress Non-blocking drain of progress values (see below).
TaskManager / get_default_manager() Custom registry and cleanup() when you keep many completed tasks.
session_tasks(session_state) Dict stored on st.session_state for named tasks.
RetryPolicy Retry configuration for exception-based retries.
schedule_interval(...) / schedule_cron(...) Schedule periodic task submissions (requires asynclit[scheduler]).

Progress (Janus)

Progress is supported for async functions only.

Declare a parameter named queue or progress_queue:

  • If it is the first parameter, asynclit injects the Janus queue positionally and the remaining positional arguments to run() map to the rest of the signature.
  • Otherwise, asynclit injects the queue by keyword (queue= / progress_queue=).
async def job(queue, steps: int):
    for i in range(steps):
        await queue.async_q.put(i)
    return steps

task = asynclit.run(job, 10)
# Each rerun:
for x in task.progress:
    st.write(x)

The UI thread reads via task.progress, which pulls from the sync side of a janus queue.

Streamlit session state

import streamlit as st
import asynclit

tasks = asynclit.session_tasks(st.session_state)
if "load" not in tasks:
    tasks["load"] = asynclit.run(load_data)

task = tasks["load"]

Patterns

Named tasks (per session)

Use session_tasks(st.session_state) as a stable dict to store tasks across reruns:

tasks = asynclit.session_tasks(st.session_state)

if "load" not in tasks:
    tasks["load"] = asynclit.run(load_data)

task = tasks["load"]

Cleanup (when you create many tasks)

If you create many tasks over time, keep them in a TaskManager and periodically call cleanup() to trim completed entries:

m = asynclit.TaskManager(max_completed=256)
task = asynclit.run(load_data, manager=m)

# ... later:
m.cleanup()

Errors

If the callable raises, task.status becomes ERROR, task.error holds the exception, and reading task.result re-raises it.

if task.status == asynclit.TaskStatus.ERROR:
    st.error(f"failed: {task.error!r}")
elif task.done:
    st.write(task.result)
else:
    st.write("Loading…")

Cancellation

task.cancel() requests cancellation:

  • If the task is running, it schedules asyncio cancellation on the worker loop.
  • If the task is still pending (not yet bound on the worker loop), it cancels the result future.

Treat CANCELLED as a terminal state in UI code.

Cooperative cancellation patterns

  • Async jobs: rely on asyncio cancellation; add cancellation checkpoints when doing long CPU work (break work into chunks; await occasionally).
  • Sync jobs: cancellation is best-effort; the underlying threadpool work may continue running. Prefer chunked work that you can stop between chunks.

Retries

Use RetryPolicy for exception-based retries (opt-in per run() / TaskManager.submit()).

policy = asynclit.RetryPolicy(
    max_attempts=3,
    base_delay=0.1,
    max_delay=1.0,
    multiplier=2.0,
    jitter=0.0,
)

task = asynclit.run(fetch_data, retry=policy)

Retries stop when attempts are exhausted, the task is cancelled, or a raised exception does not match the policy.

Scheduling (APScheduler)

Install with:

pip install 'asynclit[scheduler]'

Then schedule periodic task submission on the worker loop:

asynclit.schedule_interval(load_data, seconds=60, latest_task_name="load_data")

# Later (poll the most recent task):
m = asynclit.get_default_manager()
latest = m.get("global:load_data")
if latest and latest.done:
    st.write(latest.result)

Troubleshooting / FAQ

Why does it keep showing wait?

In rerun-driven UIs, a single script run may finish before the background task completes. The usual pattern is: show wait, then on the next rerun read task.done / task.result.

In tests (or special cases), you may need to allow a small amount of wall time between reruns for the worker to finish.

How it works (short)

  • One daemon thread runs a single asyncio event loop.
  • Async callables run on that loop; sync callables run via asyncer’s asyncify (thread pool).
  • Submissions use asyncio.run_coroutine_threadsafe; results are bridged with a concurrent.futures.Future for the polling API.

Development

pip install -e '.[dev]'
pytest

Development (uv)

If you use uv, you can run tests in a fresh env like:

uv venv
uv pip install -e '.[dev]'
uv run pytest

The dev extra includes Streamlit so CI can run headless AppTest checks in tests/test_streamlit_apptest.py against the sample apps under tests/streamlit_apps/.

Docs

Build the documentation (Sphinx + MyST):

uv venv
uv pip install -e '.[docs]'
uv run python -m sphinx -b html docs docs/_build/html

License

MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asynclit-0.2.2.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asynclit-0.2.2-py3-none-any.whl (14.7 kB view details)

Uploaded Python 3

File details

Details for the file asynclit-0.2.2.tar.gz.

File metadata

  • Download URL: asynclit-0.2.2.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for asynclit-0.2.2.tar.gz
Algorithm Hash digest
SHA256 94034ced83598f22b1cebf42554110b6da6ba224f1003e9296f04e0dfa733cf4
MD5 f4ba186198798646947ccd2895c1327b
BLAKE2b-256 1292bd4d5c2d280344ff0c1001ce5b070e16954ece3434ce6c70abe146c42d2d

See more details on using hashes here.

File details

Details for the file asynclit-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: asynclit-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 14.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for asynclit-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 20557c8d414fee7ac3009a61ce095dff0342730653d9c55de477f1b73cfaba19
MD5 93bff630b784adeb05001e90ab05dfcb
BLAKE2b-256 096cf8e1f36be8c199e9901830f320308eaf6166a54b8d7ae35043cd7eb37a79

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page