Skip to main content

Realtime push pipeline for mgf-common consumers — SSE streaming, a pluggable pub/sub broker seam (Redis + mock), and per-key connection-slot caps. Sibling of mgf-common under the mgf.* namespace.

Project description

mgf-livepush

Realtime push pipeline for mgf-common consumers: an SSE streaming primitive, a pluggable pub/sub broker seam (Redis adapter + an in-process mock), and a per-key connection-slot cap. A sibling of mgf-common under the mgf.* namespace.

Extracted from PlasmaMapper's Phase-4 push surface after the pre-release core re-design hardened it (heartbeat cadence, slot lifecycle, disconnect cleanup, and the dev/prod-proxy buffering recipe below). The broker seam means a consumer can swap Redis pub/sub for Redis Streams / NATS without touching the SSE machinery.

Install

pip install "mgf-livepush[redis,fastapi]"   # adapters are opt-in extras

Quickstart (FastAPI)

from redis.asyncio import Redis
from mgf.livepush import SSEStreamService, RedisBroker, SlotCap
from mgf.livepush.fastapi import sse_streaming_response

redis = Redis.from_url("redis://localhost:6379/0")
sse = SSEStreamService(RedisBroker(redis), SlotCap(redis, cap=20))

@router.get("/events/stream")
async def stream(request: Request):
    # 429 (Retry-After: 5) automatically when the per-key cap is full.
    return await sse_streaming_response(
        sse, request, channel=f"events:{tenant_id}", slot_key=str(tenant_id)
    )

Publisher side, anywhere:

await RedisBroker(redis).publish(f"events:{tenant_id}", json.dumps(event))

Framework-agnostic core: open_stream(...) returns an SSEStream (.headers + .body async byte iterator) — wrap .body in whatever streaming response your framework uses. Tests use MockBroker (no Redis).

The dev/prod-proxy SSE recipe (read this — it's the tarpit)

A FastAPI text/event-stream endpoint behind a proxy (SvelteKit vite, a +server.ts [...path] forward, nginx, AWS ALB) will silently buffer the body and break your latency budget even though the API side is correct. To get live push through a proxy:

  1. Response headers (set by this lib): Cache-Control: no-cache, no-transform + X-Accel-Buffering: no. If your proxy gzips, also force Content-Encoding: identity on the proxied response.
  2. Node fetch forwarding of a streamed body needs duplex: 'half'.
  3. Playwright: waitForLoadState('networkidle') never fires with an open EventSource (the long-lived connection counts as in-flight) — wait on a concrete readiness signal instead.
  4. Playwright: webServer spawns before globalSetup, so any env the proxy needs must be declared statically in webServer.env, not threaded from globalSetup.

Public surface

Name What
SSEStreamService acquire slot → subscribe → stream (open_stream)
SSEStream / sse_headers the (headers, body) result + the proxy-safe headers
Broker / Subscription the pub/sub port (Protocols)
RedisBroker / MockBroker adapters ([redis] extra / in-process)
SlotCap per-key INCR/DECR connection cap, fail-open, TTL
CapExceededError / LivePushError typed errors (subclass AppError)
mgf.livepush.fastapi.sse_streaming_response one-line FastAPI wrapper ([fastapi] extra)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mgf_livepush-0.1.2.tar.gz (94.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mgf_livepush-0.1.2-py3-none-any.whl (10.7 kB view details)

Uploaded Python 3

File details

Details for the file mgf_livepush-0.1.2.tar.gz.

File metadata

  • Download URL: mgf_livepush-0.1.2.tar.gz
  • Upload date:
  • Size: 94.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for mgf_livepush-0.1.2.tar.gz
Algorithm Hash digest
SHA256 5139d0288daf633bb4d1396ea926ee2c4bad80c90a99a4410198e4697f3d9990
MD5 e6d031422332aa447f6fa5d5ce5f7840
BLAKE2b-256 a2d254b6e310f435964d088d3a1ba4105449569e115f1beac32ac7d15425fa65

See more details on using hashes here.

File details

Details for the file mgf_livepush-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: mgf_livepush-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 10.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for mgf_livepush-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f8531c42c4a6cdd2dee1ac55aba478d695e6957c059f182bdabce313e0845ee7
MD5 21b66de46c66e1fa24a2e77b5aafa4ff
BLAKE2b-256 b74399ef59abe249016a11bfcbcc9f700fdca30bba8cda15af56cef8a818a687

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page