Skip to main content

Backend logging primitive for long-running async jobs

Project description

streamator

Backend primitive for streaming structured events and logs from long-running async jobs to a browser UI via SSE — no WebSockets, no custom infrastructure.

Pairs with streamator-react on the frontend.

Install

pip install streamator                    # core only (in-memory)
pip install streamator[fastapi]           # + FastAPI route helpers
pip install streamator[dynamo]            # + DynamoDB backend
pip install streamator[fastapi,dynamo]    # everything

JobEmitter

JobEmitter is the recommended API. It supports structured events, task tracking, cancellation, and result storage — all in one class.

from streamator import JobEmitter
from streamator.fastapi import add_job_routes

add_job_routes(app, prefix="/job")
# GET  /job/{job_id}/stream  → SSE event stream
# GET  /job/{job_id}/result  → stored result (JSON)
# POST /job/{job_id}/cancel  → cancel tracked task

@router.post("/start")
async def start():
    emitter = JobEmitter()

    async def run():
        async with emitter:
            for i in range(1, 6):
                emitter.emit({"event": "progress", "current": i, "total": 5})
                emitter.log(f"Step {i} of 5")
                await do_work()
            emitter.set_result({"steps_completed": 5})
            emitter.log("Done", level="success")

    task = asyncio.create_task(run())
    emitter.track(task)
    return {"job_id": emitter.job_id}

Instance methods

emitter.emit({"event": "progress", "step": 1})  # structured event, t added automatically
emitter.log("Working...", level="info")          # log message shorthand
emitter.track(task)                              # store asyncio.Task for cancellation
emitter.set_result({"count": 42})               # store final result for later retrieval
emitter.close()                                  # idempotent; sends done sentinel

Calling emit() or log() after close() logs a warning and no-ops. Non-JSON-serializable values in emit() are converted to strings automatically.

Class methods

JobEmitter.cancel(job_id)       # cancel tracked task; no-op if unknown
JobEmitter.pop_result(job_id)   # fetch and consume result; None if not ready
JobEmitter.exists(job_id)       # check if job is in registry

Event format on the wire

# emit() — passes through with t added
{"event": "progress", "step": 1, "t": 1.23}

# log() — standard log shape
{"event": "log", "message": "Working...", "level": "info", "t": 1.23}

# close() — terminal sentinel
{"event": "done", "t": 5.67}

Background reaper (optional)

from streamator.emitter import start_reaper

# In FastAPI lifespan:
asyncio.create_task(start_reaper(ttl_seconds=300, interval=60))

Only evicts closed jobs older than ttl_seconds. Off by default.


JobLogger

JobLogger is the simpler API for log-only use cases.

from streamator import JobLogger
from streamator.fastapi import add_log_routes

add_log_routes(app, prefix="/log")   # deprecated — use add_job_routes instead

@router.post("/start")
async def start():
    logger = JobLogger()
    asyncio.create_task(my_job(logger))
    return {"log_job_id": logger.job_id}

add_log_routes still works but emits a DeprecationWarning. Migrate to add_job_routes + JobEmitter when convenient.


Storage backends

Memory (default) — asyncio.Queue, SSE push, zero dependencies, single-process only.

DynamoDB — persists across restarts, readable by polling, distributed-friendly.

emitter = JobEmitter(store="dynamo", table="my-jobs", ttl_days=1)

Log levels

"info" (default) · "success" · "warning" · "error"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamator-0.2.1.tar.gz (7.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamator-0.2.1-py3-none-any.whl (6.2 kB view details)

Uploaded Python 3

File details

Details for the file streamator-0.2.1.tar.gz.

File metadata

  • Download URL: streamator-0.2.1.tar.gz
  • Upload date:
  • Size: 7.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for streamator-0.2.1.tar.gz
Algorithm Hash digest
SHA256 6f7e2a5b084561b065c20cff5550f7ee385a687f5f64d7cf610a2f6f1984bae4
MD5 52c4408e5705ca2785cb8bf44b8beb28
BLAKE2b-256 9df52e8f256993b1efbc8003562b6b70cbea9e1113bcb223610270311725e51c

See more details on using hashes here.

File details

Details for the file streamator-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: streamator-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 6.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for streamator-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9be7a110d3943af13af11993b7f7b77ae81644a479d1ba872ec8b56827e696f4
MD5 afb5599d9c020bf24c63509ac7a3cb57
BLAKE2b-256 3bf69e3fda26f21fb2c049b5fffcc5d08c0d933e11b840c21cf54f0e2db607c3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page