Skip to main content

Real-time browser dashboard for Python logging — zero config, non-blocking

Project description

pulselog

Non-blocking Python logger with a live browser dashboard.

pip install pulselog

Every log.info() call costs 1.8µs. Zero config. Browser opens automatically.


Benchmark

Tested on Python 3.12.9, Windows, dashboard disabled (dashboard=False).

Scenario Throughput Notes
Single-thread burst 355.8k / sec p50=1.8µs · p99=6.0µs · p99.9=39.7µs
Multi-thread burst (8 threads) 301.4k / sec matches single-thread — zero contention
Sustained (5s) 427.6k / sec 2.1M records logged
Queue saturation 383.9k / sec 0 drops — worker drained fast enough
Realistic (info + save + warn + error) 216.4k / sec mixed call types with kwargs
Fast path (info_fast) 455.4k / sec no kwargs — zero dict allocation
Mixed workload (6 threads, varied calls) 339.9k / sec 1.86M records, 0 drops

A typical ML training loop logs 10–100 records/sec.
PulseLog handles 2,164× that load before any issues.

Latency under concurrent load — p99 with 4 background threads flooding the queue:

p50 p99 p99.9
No contention 1.8µs 6.0µs 39.7µs
Under load (4 bg threads) 2.1µs 5.7µs 27.6µs

p99 is lower under load than idle — per-thread sharding means concurrent producers create zero interference with each other.

v0.1.2 → v0.2.0 improvements
  Single-thread       263k → 356k / sec    +35%
  Multi-thread         41k → 301k / sec    +633%
  Fast path (new)        — → 455k / sec    new
  Realistic           160k → 216k / sec    +35%
  p99.9 latency       87.9 → 39.7 µs       −55%
  Dropped records        0 → 0             still 0

Quick start

from pulselog import Logger

log = Logger("my-app")

log.info("training started", epoch=1)
log.warning("learning rate too high", lr=0.1)
log.save("epoch-1", {"acc": 0.91, "loss": 0.23}, status="DONE", progress=33)

log.shutdown()

A browser tab opens at http://localhost:5678 and streams every log in real time.


Why pulselog?

Standard logging blocks the calling thread on every write — waiting for a file, a socket, or a database. In tight loops (ML training, data pipelines, inference servers) this adds up fast.

pulselog never blocks. Every log call enqueues a record in O(1) and returns immediately. A daemon worker drains the queue every 10ms and pushes batches to the dashboard over WebSocket.

log.info()             ← O(1), ~1.8µs, never blocks
      │
      ▼
  ShardedLogQueue       ← per-thread deques, zero cross-thread contention
      │                    each thread writes to its own private deque
      │
      ▼  every 10ms (adaptive — halves under load)
BackgroundWorker        ← daemon thread, fan-drains all shards
      │
      ├──▶ DashboardServer.broadcast()  ← WebSocket → live browser
      │
      └──▶ (custom handlers)

Why per-thread sharding matters:
A single shared queue means all producer threads compete for the same lock on every put(). At 8 threads, that bottleneck cut throughput from 356k to 41k/sec — an 87% collapse. Per-thread sharding eliminates the shared state entirely. Each thread appends to its own deque (a GIL-atomic operation) and the worker fan-drains all shards once per cycle. Result: multi-thread throughput matches single-thread.


Dashboard

Single self-contained HTML file served over WebSocket — no build step, no CDN, no npm.

Logs tab

  • Colour-coded by level — DEBUG gray · INFO blue · WARNING amber · ERROR/CRITICAL red
  • Level filter + full-text search
  • Virtual list rendering — 100k+ logs with zero browser lag
  • Auto-scroll with manual scroll override
  • Export session as JSON

Checkpoints tab

  • Progress bar per checkpoint
  • Overall progress = average across all checkpoints
  • Expandable JSON data viewer
  • Status badges — DONE ✅ · IN_PROGRESS 🟡 · FAILED 🔴 · SKIPPED ⚫

API

Logger

log = Logger(
    name             = "my-app",
    host             = "localhost",
    port             = 5678,          # auto-increments if taken
    auto_open        = True,           # open browser on start
    dashboard        = True,           # False for CI / production
    checkpoint_path  = ".pulselog/checkpoints.db",
    level            = "DEBUG",
    worker_interval  = 0.01,           # drain interval in seconds (default 10ms)
    queue_size       = 100_000,        # max records before oldest evicted
    overflow         = "drop",         # "drop" | "block" | "raise"
)

Logging

log.debug("msg", **extra)
log.info("msg", **extra)
log.warning("msg", **extra)
log.error("msg", **extra)
log.critical("msg", **extra)

# kwargs appear as structured metadata in the dashboard
log.info("request handled", user_id=42, latency_ms=12, status=200)

# exception() captures the current traceback automatically
try:
    result = model.predict(x)
except Exception:
    log.exception("prediction failed", input_shape=str(x.shape))

Zero-allocation fast paths

When you call log.info("msg", key=val), Python builds the {"key": val} dict before the function is entered — in the C layer, before any pulselog code runs. At 216k/sec that's 216k dict allocations/sec you cannot avoid with **kwargs syntax.

For calls where you don't need per-record metadata, use the fast-path variants:

log.info_fast("step done")       # ~455k/sec — no dict allocated, ever
log.debug_fast("heartbeat")
log.warning_fast("queue high")
log.error_fast("connection lost")
log.critical_fast("out of memory")

When to use which:

# Tight loop — no metadata needed → use fast path
for step in range(100_000):
    log.info_fast("step")               # 455k/sec

# Need metadata → use standard API
log.info("step", loss=loss, acc=acc)    # 216k/sec — kwargs cost is unavoidable

Checkpoints

log.save(
    name      = "epoch-5",
    data      = {"loss": 0.31, "acc": 0.94},
    status    = "DONE",        # "DONE" | "IN_PROGRESS" | "FAILED" | "SKIPPED"
    note      = "best so far",
    progress  = 50             # 0–100, shown as progress bar in dashboard
)

result = log.load("epoch-5")        # → dict | None  (never raises)
names  = log.checkpoints()          # → list[str], most recent first
log.delete_checkpoint("epoch-3")

Context and grouping

# Tag groups subsequent logs under a label (per-thread — safe for concurrent use)
log.tag("training")

# Context manager — restores the previous tag on exit, even on exception
with log.context(tag="validation"):
    log.info("val loss", loss=0.41)
# tag is restored here

# Visual divider in the dashboard stream
log.divider("epoch boundary")

Utilities

log.flush(timeout=2.0)   # drain queue synchronously — returns False if timeout hit
log.shutdown()            # graceful teardown (also called automatically on exit)

stats = log.stats()
# {
#   "records_dropped":  int,
#   "drop_rate":        float,   # e.g. 0.04 = 4%
#   "queue_size":       int,
#   "queue_capacity":   int,
#   "queue_fill_pct":   float,
#   "checkpoints_saved": int,
#   "dashboard_clients": int,
#   "uptime_seconds":   float,
# }

stdlib logging integration

Drop-in bridge — all structured fields (lineno, filename, funcName, exc_info) are forwarded to the dashboard.

import logging
from pulselog.handler import PulseHandler

logging.getLogger().addHandler(PulseHandler("my-app"))

logging.info("this appears in the dashboard")
logging.error("with traceback", exc_info=True)  # traceback preserved

Configuration

Priority (highest → lowest): Logger() kwargs → env vars → pulselog.toml → defaults

Environment variables

PULSELOG_DASHBOARD=false
PULSELOG_HOST=0.0.0.0
PULSELOG_PORT=8080
PULSELOG_AUTO_OPEN=false
PULSELOG_CHECKPOINT_PATH=/data/checkpoints.db
PULSELOG_LEVEL=INFO
PULSELOG_WORKER_INTERVAL=0.01

pulselog.toml (place in project root)

[pulselog]
host            = "0.0.0.0"
port            = 8080
auto_open       = false
level           = "INFO"
worker_interval = 0.01

Production usage

# Disable dashboard, keep checkpoints, log to stderr on drop
log = Logger(
    "prod",
    dashboard        = False,
    checkpoint_path  = "/data/checkpoints.db",
    overflow         = "drop",   # never block — warn on stderr instead
)

With dashboard=False:

  • No threads started beyond the background worker, no port bound, no browser opened
  • Checkpoint reads/writes still work
  • CI environments (CI=true) disable the dashboard automatically

ML training example

from pulselog import Logger

log = Logger("resnet-training")

for epoch in range(1, 11):
    loss, acc = train_epoch(epoch)

    log.info("epoch", loss=loss, acc=acc)

    log.save(
        f"epoch-{epoch}",
        {"loss": loss, "acc": acc},
        status   = "DONE",
        progress = epoch * 10,
    )

    if loss > prev_loss * 1.5:
        log.warning("loss spike", epoch=epoch, loss=loss)

log.shutdown()

Data pipeline example

from pulselog import Logger

log = Logger("etl-pipeline")

with log.context("ingestion"):
    log.info("loading source", table="events", rows=1_200_000)
    records = ingest()
    log.info("ingestion complete", rows=len(records))

with log.context("validation"):
    errors = validate(records)
    if errors:
        log.warning("schema errors found", count=len(errors))

with log.context("feature_engineering"):
    features = compute_features(records)
    log.save("features", {"count": len(features)}, status="DONE", progress=100)

log.shutdown()

Data engineering example

from pulselog import Logger

log = Logger("etl-pipeline")

with log.context("ingestion"):
    log.info("loading source", table="events", rows=1_200_000)
    records = ingest()
    log.info("ingestion complete", rows=len(records))

with log.context("validation"):
    errors = validate(records)
    if errors:
        log.warning("schema errors found", count=len(errors))
    log.save("validation", {"errors": len(errors), "rows": len(records)},
             status="DONE", progress=40)

with log.context("feature_engineering"):
    for feat in ["activity_7d", "churn_score", "ltv_estimate"]:
        features = compute_feature(feat, records)
        log.info("feature computed", name=feat, coverage=features.coverage)
        if features.null_rate > 0.03:
            log.warning("high null rate", feat=feat, null_rate=features.null_rate)

with log.context("warehouse_write"):
    rows_written = write_to_warehouse(features)
    log.info("write complete", rows=rows_written, target="bigquery://features")
    log.save("pipeline_run", {"rows": rows_written, "features": 3},
             status="DONE", progress=100)

log.shutdown()

Design notes

Per-thread shardingShardedLogQueue gives each producer thread a private deque. put() appends to the caller's own deque — no lock, no shared state, no GIL contention between threads. The background worker registers each thread's deque on first use (one lock acquisition per thread lifetime) and fan-drains all shards every cycle. This is why multi-thread throughput matches single-thread.

Lock-free put()deque.append() is GIL-atomic in CPython. The hot path acquires no mutex. The threading.Event wake signal fires only on empty→non-empty transitions (~100/sec at steady state), not on every put() (which would be 300k+/sec).

Batch timestampsLogRecord.timestamp is set to None at creation. The worker stamps time.time() once per drain cycle and fills every record in the batch. This moves ~300k time.time() syscalls/sec down to ~100/sec, at the cost of sub-10ms timestamp precision within a batch — acceptable for all logging use cases.

__slots__ on LogRecord — eliminates the per-instance __dict__ (~240 bytes each). At 300k records/sec, the original @dataclass design generated ~72 MB/sec of heap churn. With __slots__, allocation pressure drops by ~3× and GC pause frequency falls accordingly. This is why p99.9 dropped from 87µs to 39µs.

Worker — wakes immediately on new records via threading.Event, falls back to polling every 10ms. Adaptive: halves the interval when queue exceeds 50% capacity, restores it when calm. Drain rate is ~17–19M records/sec — the worker has 50× headroom over the producer ceiling.

Drop policy — when a shard is full, the oldest record is evicted and a stderr warning is emitted every 1,000 drops. Configure overflow="block" to pause the caller instead, or overflow="raise" to surface the error explicitly.

Shutdownatexit and SIGTERM both call shutdown() once (guarded against double-invocation). flush() accepts a configurable timeout and returns False if the queue wasn't fully drained in time.

Thread safetytag() and context() use threading.local() so each thread maintains its own tag state independently. The overflow strategy is resolved to a bound method at __init__ time — no string comparisons on the hot path.


Requirements

  • Python ≥ 3.8
  • websockets ≥ 11.0 (only needed with dashboard=True)
pip install pulselog            # includes websockets

License

MIT

AUTHOR

DevBuddy

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulselog-0.1.4.tar.gz (42.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pulselog-0.1.4-py3-none-any.whl (33.1 kB view details)

Uploaded Python 3

File details

Details for the file pulselog-0.1.4.tar.gz.

File metadata

  • Download URL: pulselog-0.1.4.tar.gz
  • Upload date:
  • Size: 42.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.9

File hashes

Hashes for pulselog-0.1.4.tar.gz
Algorithm Hash digest
SHA256 31192ad5f44125fa22d1e5aa2990f1f92f101a8392f391757083d65da04f9bc3
MD5 5fd93869de64b3a8b3d83634de5f773a
BLAKE2b-256 50aaa798940d1ef49b366a3ac8fc2516b85e9721e9ed5d79547ffdd879008e5d

See more details on using hashes here.

File details

Details for the file pulselog-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: pulselog-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 33.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.9

File hashes

Hashes for pulselog-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 3529c9358c7fc40a6aac6301c1a21c30e50d51fb52a8d73349d7741b0f108145
MD5 5c4688950c26a7aa9ef7d19a8173baf8
BLAKE2b-256 acd21d61687c7efbffffeb050ee414ca410c830cf31ecc4766f2412b9d15b38c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page