Backend logging primitive for long-running async jobs
Project description
streamator
Backend primitive for streaming structured events and logs from long-running async jobs to a browser UI via SSE — no WebSockets, no custom infrastructure.
Pairs with streamator-react on the frontend.
Install
pip install streamator # core only (in-memory)
pip install streamator[fastapi] # + FastAPI route helpers
pip install streamator[dynamo] # + DynamoDB backend
pip install streamator[fastapi,dynamo] # everything
JobEmitter
JobEmitter is the recommended API. It supports structured events, task tracking,
cancellation, and result storage — all in one class.
from streamator import JobEmitter
from streamator.fastapi import add_job_routes
add_job_routes(app, prefix="/job")
# GET /job/{job_id}/stream → SSE event stream
# GET /job/{job_id}/result → stored result (JSON)
# POST /job/{job_id}/cancel → cancel tracked task
@router.post("/start")
async def start():
emitter = JobEmitter()
async def run():
async with emitter:
for i in range(1, 6):
emitter.emit({"event": "progress", "current": i, "total": 5})
emitter.log(f"Step {i} of 5")
await do_work()
emitter.set_result({"steps_completed": 5})
emitter.log("Done", level="success")
task = asyncio.create_task(run())
emitter.track(task)
return {"job_id": emitter.job_id}
Instance methods
emitter.emit({"event": "progress", "step": 1}) # structured event, t added automatically
emitter.log("Working...", level="info") # log message shorthand
emitter.track(task) # store asyncio.Task for cancellation
emitter.set_result({"count": 42}) # store final result for later retrieval
emitter.close() # idempotent; sends done sentinel
Calling emit() or log() after close() logs a warning and no-ops. Non-JSON-serializable values in emit() are converted to strings automatically.
Class methods
JobEmitter.cancel(job_id) # cancel tracked task; no-op if unknown
JobEmitter.pop_result(job_id) # fetch and consume result; None if not ready
JobEmitter.exists(job_id) # check if job is in registry
Event format on the wire
# emit() — passes through with t added
{"event": "progress", "step": 1, "t": 1.23}
# log() — standard log shape
{"event": "log", "message": "Working...", "level": "info", "t": 1.23}
# close() — terminal sentinel
{"event": "done", "t": 5.67}
Background reaper (optional)
from streamator.emitter import start_reaper
# In FastAPI lifespan:
asyncio.create_task(start_reaper(ttl_seconds=300, interval=60))
Only evicts closed jobs older than ttl_seconds. Off by default.
JobLogger
JobLogger is the simpler API for log-only use cases.
from streamator import JobLogger
from streamator.fastapi import add_log_routes
add_log_routes(app, prefix="/log") # deprecated — use add_job_routes instead
@router.post("/start")
async def start():
logger = JobLogger()
asyncio.create_task(my_job(logger))
return {"log_job_id": logger.job_id}
add_log_routes still works but emits a DeprecationWarning. Migrate to
add_job_routes + JobEmitter when convenient.
Storage backends
Memory (default) — asyncio.Queue, SSE push, zero dependencies, single-process only.
DynamoDB — persists across restarts, readable by polling, distributed-friendly.
emitter = JobEmitter(store="dynamo", table="my-jobs", ttl_days=1)
Log levels
"info" (default) · "success" · "warning" · "error"
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamator-0.2.1.tar.gz.
File metadata
- Download URL: streamator-0.2.1.tar.gz
- Upload date:
- Size: 7.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6f7e2a5b084561b065c20cff5550f7ee385a687f5f64d7cf610a2f6f1984bae4
|
|
| MD5 |
52c4408e5705ca2785cb8bf44b8beb28
|
|
| BLAKE2b-256 |
9df52e8f256993b1efbc8003562b6b70cbea9e1113bcb223610270311725e51c
|
File details
Details for the file streamator-0.2.1-py3-none-any.whl.
File metadata
- Download URL: streamator-0.2.1-py3-none-any.whl
- Upload date:
- Size: 6.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9be7a110d3943af13af11993b7f7b77ae81644a479d1ba872ec8b56827e696f4
|
|
| MD5 |
afb5599d9c020bf24c63509ac7a3cb57
|
|
| BLAKE2b-256 |
3bf69e3fda26f21fb2c049b5fffcc5d08c0d933e11b840c21cf54f0e2db607c3
|