The missing Python async timer.
Project description
Async timer
The missing Python async timer.
Run something repeatedly on an interval in asyncio — useful for cache refresh, periodic polling, metrics emission, and similar background work.
Features
- Zero runtime dependencies.
- Any callable shape. Sync or async functions, generators, async generators, or callables returning any of those.
- Two delivery models.
join()/wait()/async for selfis single-shot fan-out (latest value, may drop intermediate ticks under slow consumers).subscribe()gives each consumer a buffered queue (every tick, optionalmaxsizefor bounded drop-oldest). - Scheduling modes.
fixed_delay(default; next tick firesdelayafter the previous one finishes) orfixed_rate(anchored to wall clock; missed slots skipped + logged). Optionalinitial_delayandjitter. - Trigger on demand.
await timer.trigger()fires now and resumes the schedule. - Last-value cache.
timer.last_result/timer.last_tick_at— no blocking. - Cancel anytime. Explicit
cancel()or constructorcancel_aws(awaitables that stop the timer when they resolve).await cancel()waits for cleanup before returning; safe from inside the target/callbacks. - Restartable.
start()aftercancel()works (raises ifcancel_awswas used — those are single-shot). - Decorator.
@async_timer.every(5)wraps a function into a Timer; original on.func. - Groups.
TimerGroup()starts/cancels a set of timers together. - Named.
name="db_refresh"shows inrepr()and scopes the logger. - Test-friendly.
mock_async_timer.MockTimerreplaces real sleeps with anAsyncMock.
Requirements
Python 3.9+.
Installation
pip install async-timer
Examples
FastAPI lifespan
import contextlib
import time
import uvicorn
from fastapi import FastAPI
import async_timer
DB_CACHE = {"initialised": False}
async def refresh_db():
DB_CACHE.update(initialised=True, cur_value=time.time())
@contextlib.asynccontextmanager
async def lifespan(_app: FastAPI):
async with async_timer.Timer(delay=5, target=refresh_db) as timer:
await timer.wait(hit_count=1) # wait for first tick
yield
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
return {"db_cache": DB_CACHE}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
join()
import asyncio
import async_timer
async def main():
timer = async_timer.Timer(12, target=lambda: 42)
timer.start()
val = await timer.join() # 42, after the first tick
await timer.cancel()
asyncio.run(main())
async for
import asyncio, time
import async_timer
async def main():
async with async_timer.Timer(14, target=time.time) as timer:
async for t in timer:
print(t) # current time every 14 seconds
asyncio.run(main())
Decorator
import async_timer
@async_timer.every(5, mode="fixed_rate", name="db_refresh")
async def refresh_db():
...
await refresh_db.func() # call the undecorated fn (tests)
async def main():
refresh_db.start()
await refresh_db.join()
await refresh_db.cancel()
TimerGroup
import async_timer
async def lifespan():
async with async_timer.TimerGroup() as group:
group.add(async_timer.Timer(5, target=refresh_db))
group.add(async_timer.Timer(60, target=prune_cache))
yield # both running; both cancelled on exit
Trigger now
async def force_refresh(timer):
return await timer.trigger()
Latest value, no blocking
@async_timer.every(5)
async def refresh_db():
return await db.fetch()
def get_cached():
return refresh_db.last_result # None until the first tick
Every-tick delivery via subscribe()
join() / async for self drop ticks under slow consumers (single-shot fan-out). Use subscribe() when you need every tick:
async with timer.subscribe() as feed:
async for value in feed:
await log_it(value) # never misses a tick from subscribe-time
await asyncio.sleep(3.0) # even though the consumer is slow
Bounded queue (drop oldest + log when full):
async with timer.subscribe(maxsize=10, name="metrics-sink") as feed:
async for value in feed:
await slow_export(value)
Multiple subscribers each get an independent copy:
async with timer.subscribe() as a, timer.subscribe() as b:
...
Consumer-side load shedding:
async with timer.subscribe() as feed:
async for value in feed:
if feed.qsize > 100:
feed.drop_oldest(feed.qsize - 1) # keep only the newest
log.warning("shed %d ticks", feed.dropped_count)
await slow_export(value)
drop_oldest() never swallows end-of-stream / exception sentinels. Target exceptions re-raise from the subscriber's iteration.
Thread safety
A Timer runs in a single asyncio event loop. Most state-mutating
operations must be called from the loop's thread. The following are
explicitly safe to use from any thread:
Read-only attributes (atomic under CPython's GIL):
timer.last_result,timer.last_tick_at,timer.hit_counttimer.is_running(),timer.delay,timer.namesubscription.qsize,subscription.dropped_count
set_delay(new_delay) is a single attribute write — safe from any
thread; takes effect on the next sleep.
Cross-thread control methods — marshal the operation back to the timer's loop and block for completion:
# From a sync REST handler, signal handler, worker thread, etc.:
timer.cancel_threadsafe(timeout=5.0) # raises TimeoutError if exceeded
result = timer.trigger_threadsafe(timeout=5.0)
feed.close_threadsafe()
These raise RuntimeError with a clear message if called from the
timer's own loop thread (use await cancel() / await trigger()
instead), or if the timer has not been started yet, or if the bound
event loop has been closed.
Anything else (subscribe(), awaiting join() / wait(), iterating
async for over the timer or a subscription, reading from a
subscription queue) must happen on the loop's thread. From other
threads, use asyncio.run_coroutine_threadsafe(coro, loop) to
dispatch.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_timer-1.2.0.tar.gz.
File metadata
- Download URL: async_timer-1.2.0.tar.gz
- Upload date:
- Size: 44.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: pdm/2.27.0 CPython/3.13.13 Linux/6.17.0-1013-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a4b8ec1124a4b66253ebc4652fe4c9fd3320360e6159d24ede25899924e60a8a
|
|
| MD5 |
c55095b3d23f081cad417b8cd3ce5605
|
|
| BLAKE2b-256 |
85c1e0e19e14fa0c350da460c73f2f13af29a1bc6fa1c6d7a628a60811563332
|
File details
Details for the file async_timer-1.2.0-py3-none-any.whl.
File metadata
- Download URL: async_timer-1.2.0-py3-none-any.whl
- Upload date:
- Size: 21.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: pdm/2.27.0 CPython/3.13.13 Linux/6.17.0-1013-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b25fe6a00925f56703f95eef8e51771e02758a81a314518f58fe406619381104
|
|
| MD5 |
912eee63ceecab2a03b4f5906d38889b
|
|
| BLAKE2b-256 |
2f873605180da731255dec865a53086251e527e6ab1c3c527daa964d57f6fa1f
|