Skip to main content

Backend logging primitive for long-running async jobs

Project description

streamator

Backend primitive for streaming structured events and logs from long-running async jobs to a browser UI via SSE — no WebSockets, no custom infrastructure.

Pairs with streamator-react on the frontend.

Install

pip install streamator                    # core only (in-memory)
pip install streamator[fastapi]           # + FastAPI route helpers
pip install streamator[dynamo]            # + DynamoDB backend
pip install streamator[fastapi,dynamo]    # everything

JobEmitter

JobEmitter is the recommended API. It supports structured events, task tracking, cancellation, and result storage — all in one class.

from streamator import JobEmitter
from streamator.fastapi import add_job_routes

add_job_routes(app, prefix="/job")
# GET  /job/{job_id}/stream  → SSE event stream
# GET  /job/{job_id}/result  → stored result (JSON)
# POST /job/{job_id}/cancel  → cancel tracked task

@router.post("/start")
async def start():
    emitter = JobEmitter()

    async def run():
        async with emitter:
            for i in range(1, 6):
                emitter.emit({"event": "progress", "current": i, "total": 5})
                emitter.log(f"Step {i} of 5")
                await do_work()
            emitter.set_result({"steps_completed": 5})
            emitter.log("Done", level="success")

    task = asyncio.create_task(run())
    emitter.track(task)
    return {"job_id": emitter.job_id}

Instance methods

emitter.emit({"event": "progress", "step": 1})  # structured event, t added automatically
emitter.log("Working...", level="info")          # log message shorthand
emitter.track(task)                              # store asyncio.Task for cancellation
emitter.set_result({"count": 42})               # store final result for later retrieval
emitter.close()                                  # idempotent; sends done sentinel

Calling emit() or log() after close() logs a warning and no-ops.

Class methods

JobEmitter.cancel(job_id)       # cancel tracked task; no-op if unknown
JobEmitter.pop_result(job_id)   # fetch and consume result; None if not ready
JobEmitter.exists(job_id)       # check if job is in registry

Event format on the wire

# emit() — passes through with t added
{"event": "progress", "step": 1, "t": 1.23}

# log() — standard log shape
{"event": "log", "message": "Working...", "level": "info", "t": 1.23}

# close() — terminal sentinel
{"event": "done", "t": 5.67}

Background reaper (optional)

from streamator.emitter import start_reaper

# In FastAPI lifespan:
asyncio.create_task(start_reaper(ttl_seconds=300, interval=60))

Only evicts closed jobs older than ttl_seconds. Off by default.


JobLogger

JobLogger is the simpler API for log-only use cases.

from streamator import JobLogger
from streamator.fastapi import add_log_routes

add_log_routes(app, prefix="/log")   # deprecated — use add_job_routes instead

@router.post("/start")
async def start():
    logger = JobLogger()
    asyncio.create_task(my_job(logger))
    return {"log_job_id": logger.job_id}

add_log_routes still works but emits a DeprecationWarning. Migrate to add_job_routes + JobEmitter when convenient.


Storage backends

Memory (default) — asyncio.Queue, SSE push, zero dependencies, single-process only.

DynamoDB — persists across restarts, readable by polling, distributed-friendly.

emitter = JobEmitter(store="dynamo", table="my-jobs", ttl_days=1)

Log levels

"info" (default) · "success" · "warning" · "error"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamator-0.2.0.tar.gz (7.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamator-0.2.0-py3-none-any.whl (6.1 kB view details)

Uploaded Python 3

File details

Details for the file streamator-0.2.0.tar.gz.

File metadata

  • Download URL: streamator-0.2.0.tar.gz
  • Upload date:
  • Size: 7.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for streamator-0.2.0.tar.gz
Algorithm Hash digest
SHA256 4c417d34ce4d73ec9e4562c0c1babb4a8502df02b5c9149b5b92d0f2f9cf39f8
MD5 3d5a05ffc9839ddb3103eea223f8a327
BLAKE2b-256 378b85026d6cd05e91efa25ebafd741f6652634931420743221923ddcde62292

See more details on using hashes here.

File details

Details for the file streamator-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: streamator-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 6.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for streamator-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 60c782fec7f9a319756b579a2940602609831cee8ff8390d6cc0a611323569b8
MD5 1ad020dea293e42ecbbae1d9aa40ab98
BLAKE2b-256 2e4055100d947a05e208951cd7be4916b08967e057f36bf4af3d73a1fae726c3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page