feat(system): live 1 Hz SSE stream behind admin gauges + dashboard tile
Adds /api/system/stream — a Server-Sent Events feed driven by a single per-process ticker that reads /proc directly and splices in the latest host-side metrics.json each second. Subscribers share the connection so N open tabs cost one ticker, and the ticker pauses entirely when nobody is listening. Frontend gets a singleton LiveSystem EventSource manager with auto- reconnect, Page-Visibility integration (closes on tab hide), and last- sample replay for late subscribers. Admin -> System gauges and the dashboard memory + disk tile now tick at 1 Hz; trend charts and the per-app table keep their 30 s poll because the underlying files only regenerate once a minute. Also adds /api/system/history as a thin range-query wrapper over the existing 24 h JSON ring buffer — the binary ring backend will slot in behind it in the next phase without changing the response shape. Signed-off-by: librelad <librelad@digitalangels.vip>
This commit is contained in:
parent
31c71a212d
commit
9f7ad8f177
@ -1,26 +1,35 @@
|
||||
// Live system metrics — the fast path behind the Admin → System gauges.
|
||||
// Live system metrics — the fast path behind the Admin → System gauges and the
|
||||
// dashboard "pulse" tiles.
|
||||
//
|
||||
// The periodic, host-side picture (disk, network, docker, per-app, 24h history)
|
||||
// is produced by the webui_system_metrics generator into frontend/data/system/.
|
||||
// This endpoint exists only to make the headline gauges *tick* every couple of
|
||||
// seconds, so it is deliberately the cheapest, safest thing it can be:
|
||||
// - reads only /proc (no subprocess spawn, no docker, no privileges)
|
||||
// - CPU% from an in-memory delta of the previous /proc/stat read
|
||||
// - a single-flight + short cache so N open tabs cause ~1 /proc read/sec
|
||||
// Periodic host-side data (disks, network, docker, per-app, 24 h history) is
|
||||
// produced by the webui_system_metrics generator into frontend/data/system/.
|
||||
// This file serves the *live* path: CPU / memory / load read straight from
|
||||
// /proc, optionally fused with the latest host JSON snapshot so a single SSE
|
||||
// message carries everything a client needs to draw a frame.
|
||||
//
|
||||
// Endpoints:
|
||||
// GET /live — single-shot JSON snapshot (kept for callers that still poll)
|
||||
// GET /stream — Server-Sent Events; pushes a fused sample once per second.
|
||||
// One /proc read per second across all subscribers (shared
|
||||
// ticker), so 100 open tabs still cost one read/sec.
|
||||
//
|
||||
// Namespace note: this runs *inside* the libreportal container. /proc/stat,
|
||||
// /proc/meminfo and /proc/loadavg are not namespaced, so they report host-wide
|
||||
// values that match the generator's numbers. /proc/net/dev IS per-netns (it
|
||||
// would show only this container's traffic), so network is intentionally left
|
||||
// to the host-side generator and not served here.
|
||||
// would show only this container's traffic), so the host generator owns
|
||||
// network/disk and we splice its latest snapshot into each SSE message.
|
||||
const express = require('express');
|
||||
const fs = require('fs').promises;
|
||||
const path = require('path');
|
||||
const os = require('os');
|
||||
|
||||
const router = express.Router();
|
||||
|
||||
const CORES = os.cpus().length || 1;
|
||||
const MIN_INTERVAL_MS = 750; // serve cache to anything faster than this
|
||||
const STREAM_TICK_MS = 1000; // SSE push cadence — 1 Hz live feel
|
||||
const HEARTBEAT_MS = 25000; // SSE comment frame to keep proxies from idling out
|
||||
const HOST_JSON_DIR = path.join(__dirname, '..', '..', 'frontend', 'data', 'system');
|
||||
|
||||
let prevCpu = null; // { total, idle } from the last read
|
||||
let cache = null; // { sample, at }
|
||||
@ -102,4 +111,163 @@ router.get('/live', async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// SSE live stream
|
||||
// ---------------------------------------------------------------------------
|
||||
// One ticker for the whole process. Subscribers join/leave; the ticker only
|
||||
// runs while at least one is connected, so an idle WebUI costs nothing.
|
||||
|
||||
const subscribers = new Set();
|
||||
let tickHandle = null;
|
||||
let heartbeatHandle = null;
|
||||
let lastSample = null;
|
||||
let hostJson = { metrics: null, disk: null, memory: null, apps: null };
|
||||
let hostJsonLoadedAt = 0;
|
||||
const HOST_JSON_REFRESH_MS = 5000; // re-read host snapshots every 5 s (they regen at most 1×/min)
|
||||
|
||||
// Read a JSON file but never throw — missing/invalid → previous value.
|
||||
async function readJsonSafe(file, fallback = null) {
|
||||
try {
|
||||
const txt = await fs.readFile(file, 'utf8');
|
||||
return JSON.parse(txt);
|
||||
} catch (_) {
|
||||
return fallback;
|
||||
}
|
||||
}
|
||||
|
||||
// Refresh the cached host-side JSON if it's been at least HOST_JSON_REFRESH_MS
|
||||
// since the last read. Cheap when the files haven't changed because the OS
|
||||
// page cache makes the read essentially free.
|
||||
async function refreshHostJson(now) {
|
||||
if (now - hostJsonLoadedAt < HOST_JSON_REFRESH_MS) return;
|
||||
hostJsonLoadedAt = now;
|
||||
const [metrics, disk, memory, apps] = await Promise.all([
|
||||
readJsonSafe(path.join(HOST_JSON_DIR, 'metrics.json'), hostJson.metrics),
|
||||
readJsonSafe(path.join(HOST_JSON_DIR, 'disk_usage.json'), hostJson.disk),
|
||||
readJsonSafe(path.join(HOST_JSON_DIR, 'memory_usage.json'), hostJson.memory),
|
||||
readJsonSafe(path.join(HOST_JSON_DIR, 'metrics_apps.json'), hostJson.apps)
|
||||
]);
|
||||
hostJson = { metrics, disk, memory, apps };
|
||||
}
|
||||
|
||||
function ssePayload(s) {
|
||||
// Fuse the live in-container sample with the latest host-side snapshot so
|
||||
// a client gets everything it needs from one stream. The host fields tick
|
||||
// slowly (≤ 1/min) but live alongside the 1 Hz CPU/mem feed.
|
||||
const m = hostJson.metrics || {};
|
||||
return {
|
||||
t: s.t,
|
||||
cpu: s.cpu,
|
||||
memory: s.memory,
|
||||
disks: Array.isArray(m.disks) ? m.disks : [],
|
||||
network: m.network || { rx_rate: 0, tx_rate: 0 },
|
||||
docker: m.docker || null,
|
||||
apps: (hostJson.apps && Array.isArray(hostJson.apps.apps)) ? hostJson.apps.apps : []
|
||||
};
|
||||
}
|
||||
|
||||
async function tick() {
|
||||
if (subscribers.size === 0) { // nothing to do — defensive
|
||||
stopTicker();
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const s = await sample();
|
||||
const now = Date.now();
|
||||
cache = { sample: s, at: now };
|
||||
await refreshHostJson(now);
|
||||
const payload = ssePayload(s);
|
||||
lastSample = payload;
|
||||
const frame = `data: ${JSON.stringify(payload)}\n\n`;
|
||||
for (const res of subscribers) {
|
||||
try { res.write(frame); } catch (_) { /* will be reaped on close */ }
|
||||
}
|
||||
} catch (_) { /* swallow — try again next tick */ }
|
||||
}
|
||||
|
||||
function startTicker() {
|
||||
if (tickHandle) return;
|
||||
tick(); // fire immediately so the first frame is fresh
|
||||
tickHandle = setInterval(tick, STREAM_TICK_MS);
|
||||
// Heartbeat keeps proxies (Traefik/nginx) from idling the connection out;
|
||||
// SSE comments start with ":" and are ignored by EventSource.
|
||||
heartbeatHandle = setInterval(() => {
|
||||
for (const res of subscribers) {
|
||||
try { res.write(': hb\n\n'); } catch (_) {}
|
||||
}
|
||||
}, HEARTBEAT_MS);
|
||||
}
|
||||
|
||||
function stopTicker() {
|
||||
if (tickHandle) { clearInterval(tickHandle); tickHandle = null; }
|
||||
if (heartbeatHandle) { clearInterval(heartbeatHandle); heartbeatHandle = null; }
|
||||
}
|
||||
|
||||
router.get('/stream', async (req, res) => {
|
||||
// SSE handshake. `no-transform` tells the compression middleware not to
|
||||
// gzip this response (gzip buffers and would break streaming). `X-Accel-
|
||||
// Buffering: no` tells nginx/Traefik to flush each event immediately.
|
||||
res.set({
|
||||
'Content-Type': 'text/event-stream; charset=utf-8',
|
||||
'Cache-Control': 'no-store, no-transform',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no'
|
||||
});
|
||||
res.flushHeaders?.();
|
||||
// Initial "retry" hint — if the connection dies the browser will reopen
|
||||
// after this many ms (default 3000 is fine but explicit is clearer).
|
||||
res.write('retry: 3000\n\n');
|
||||
|
||||
subscribers.add(res);
|
||||
startTicker();
|
||||
|
||||
// If we already have a fresh sample, ship it right now so the client doesn't
|
||||
// have to wait STREAM_TICK_MS for its first frame.
|
||||
if (lastSample) {
|
||||
try { res.write(`data: ${JSON.stringify(lastSample)}\n\n`); } catch (_) {}
|
||||
}
|
||||
|
||||
const cleanup = () => {
|
||||
subscribers.delete(res);
|
||||
if (subscribers.size === 0) stopTicker();
|
||||
};
|
||||
req.on('close', cleanup);
|
||||
req.on('error', cleanup);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// History range query
|
||||
// ---------------------------------------------------------------------------
|
||||
// Reads the 24 h ring buffer the host-side generator writes and returns a
|
||||
// slice. `range` is minutes back from now (1..1440). `keys` is a comma-list of
|
||||
// metric names to project (defaults to the whole point).
|
||||
//
|
||||
// This is a thin wrapper today — the buffer lives on disk as metrics_history.
|
||||
// json. A binary ring-buffer backend will slot in here in Phase 2 without
|
||||
// changing the response shape.
|
||||
router.get('/history', async (req, res) => {
|
||||
const range = Math.max(1, Math.min(1440, parseInt(req.query.range, 10) || 60));
|
||||
const keys = typeof req.query.keys === 'string' && req.query.keys.length
|
||||
? req.query.keys.split(',').map((s) => s.trim()).filter(Boolean)
|
||||
: null;
|
||||
try {
|
||||
const file = path.join(HOST_JSON_DIR, 'metrics_history.json');
|
||||
const raw = await fs.readFile(file, 'utf8');
|
||||
const parsed = JSON.parse(raw);
|
||||
const all = Array.isArray(parsed?.points) ? parsed.points : [];
|
||||
const sliced = all.slice(-range);
|
||||
const points = keys
|
||||
? sliced.map((p) => {
|
||||
const out = { t: p.t };
|
||||
for (const k of keys) if (k in p) out[k] = p[k];
|
||||
return out;
|
||||
})
|
||||
: sliced;
|
||||
res.set('Cache-Control', 'no-store');
|
||||
res.json({ range, points, updated: parsed?.updated || null });
|
||||
} catch (_) {
|
||||
res.status(404).json({ error: 'history_unavailable', points: [] });
|
||||
}
|
||||
});
|
||||
|
||||
module.exports = router;
|
||||
|
||||
@ -88,6 +88,7 @@
|
||||
<script src="/js/utils/ui-helpers.js"></script>
|
||||
<script src="/js/utils/router.js"></script>
|
||||
<script src="/js/utils/data-loader.js"></script>
|
||||
<script src="/js/utils/system-live.js"></script>
|
||||
<script src="/js/utils/dismissible.js"></script>
|
||||
<script src="/js/components/eo-modal.js"></script>
|
||||
<script src="/js/components/dashboard.js"></script>
|
||||
|
||||
@ -1,13 +1,21 @@
|
||||
// Admin → System — in-depth host + per-app statistics. Live ring gauges for the
|
||||
// headline numbers, SVG trend charts driven by the metrics history ring buffer,
|
||||
// a Docker summary, and a per-app resource table. Data comes from the
|
||||
// frontend/data/system/*.json files the webui_system_metrics generator refreshes
|
||||
// every minute; this page just polls and draws. Renders into #config-section.
|
||||
// a Docker summary, and a per-app resource table.
|
||||
//
|
||||
// Two data paths:
|
||||
// - Live (1 Hz, via LiveSystem SSE): CPU%, memory, load, disks, network,
|
||||
// docker totals. Updates the gauges in place each second so they tick like
|
||||
// a real instrument.
|
||||
// - Periodic (every 30 s, via fetch): the history ring buffer + per-app
|
||||
// table. These regenerate at most once a minute on the host, so polling
|
||||
// them faster would be wasted bandwidth.
|
||||
// Renders into #config-section.
|
||||
class AdminSystem {
|
||||
constructor(rootId = 'config-section') {
|
||||
this.rootId = rootId;
|
||||
this.range = 60; // minutes of history to chart
|
||||
this._timer = null;
|
||||
this._unsubLive = null;
|
||||
this.d = {};
|
||||
}
|
||||
|
||||
@ -18,15 +26,28 @@ class AdminSystem {
|
||||
if (r) r.innerHTML = '<div class="admin-page"><div class="backup-empty-state">Loading system stats…</div></div>';
|
||||
await this.refresh();
|
||||
this.bind();
|
||||
// Data regenerates ~1/min; poll at 30s so the view tracks it without
|
||||
// hammering. Stop once the user has navigated off this page.
|
||||
this._stopLive();
|
||||
// 1 Hz live gauges via the shared EventSource manager.
|
||||
if (window.LiveSystem) {
|
||||
this._unsubLive = window.LiveSystem.subscribe((s) => this._applyLive(s));
|
||||
}
|
||||
// History/per-app refresh stays slower — those files only regenerate
|
||||
// once a minute on the host. Stop both paths once the user navigates off.
|
||||
if (this._timer) clearInterval(this._timer);
|
||||
this._timer = setInterval(() => {
|
||||
if (!document.querySelector('.sys-page')) { clearInterval(this._timer); this._timer = null; return; }
|
||||
if (!document.querySelector('.sys-page')) {
|
||||
clearInterval(this._timer); this._timer = null;
|
||||
this._stopLive();
|
||||
return;
|
||||
}
|
||||
this.refresh();
|
||||
}, 30000);
|
||||
}
|
||||
|
||||
_stopLive() {
|
||||
if (this._unsubLive) { try { this._unsubLive(); } catch (_) {} this._unsubLive = null; }
|
||||
}
|
||||
|
||||
async refresh() {
|
||||
const [metrics, history, apps, appsHist, info] = await Promise.all([
|
||||
this.fetchJson('/data/system/metrics.json'),
|
||||
@ -86,6 +107,43 @@ class AdminSystem {
|
||||
</div>`;
|
||||
}
|
||||
|
||||
// Shared by full render() and the 1 Hz live path so both produce identical
|
||||
// gauge markup; only `this.d.metrics` differs in source.
|
||||
_gaugesHtml() {
|
||||
const C = window.LPCharts;
|
||||
const m = this.d.metrics || {};
|
||||
const cpu = m.cpu || {}, mem = m.memory || {};
|
||||
const disks = Array.isArray(m.disks) ? m.disks : [];
|
||||
const rootDisk = disks.find(d => d.mount === '/') || disks[0] || {};
|
||||
return `
|
||||
${C.gauge(cpu.percent || 0, { label: 'CPU', sublabel: `${cpu.cores || '?'} cores` })}
|
||||
${C.gauge(mem.percent || 0, { label: 'Memory', sublabel: `${this.bytes(mem.used)} / ${this.bytes(mem.total)}` })}
|
||||
${C.gauge(rootDisk.percent || 0, { label: 'Disk', sublabel: rootDisk.mount || '/' })}
|
||||
${C.gauge(cpu.load1_percent || 0, { label: 'Load', display: (cpu.load1 ?? 0), suffix: '', sublabel: `1m · ${cpu.load5 ?? '–'}/${cpu.load15 ?? '–'}` })}`;
|
||||
}
|
||||
|
||||
// Fold a live SSE sample into this.d.metrics and refresh the in-page
|
||||
// gauges + "updated" stamp without rebuilding the heavier sections.
|
||||
// The payload shape matches the host generator's metrics.json so we can
|
||||
// assign straight in; absent fields keep their previous value.
|
||||
_applyLive(s) {
|
||||
if (!s || !document.querySelector('.sys-page')) return;
|
||||
const m = this.d.metrics || {};
|
||||
this.d.metrics = {
|
||||
...m,
|
||||
cpu: s.cpu || m.cpu,
|
||||
memory: s.memory || m.memory,
|
||||
disks: Array.isArray(s.disks) && s.disks.length ? s.disks : m.disks,
|
||||
network: s.network || m.network,
|
||||
docker: s.docker || m.docker,
|
||||
updated: new Date(s.t || Date.now()).toISOString()
|
||||
};
|
||||
const gaugesEl = document.querySelector('.sys-page .sys-gauges');
|
||||
if (gaugesEl) gaugesEl.innerHTML = this._gaugesHtml();
|
||||
const subEl = document.querySelector('.sys-page .page-header-title p');
|
||||
if (subEl) subEl.textContent = `Live host and per-app statistics. Updated ${new Date(s.t || Date.now()).toLocaleTimeString()}.`;
|
||||
}
|
||||
|
||||
render() {
|
||||
const root = this.root();
|
||||
if (!root) return;
|
||||
@ -96,14 +154,7 @@ class AdminSystem {
|
||||
const disks = Array.isArray(m.disks) ? m.disks : [];
|
||||
const rootDisk = disks.find(d => d.mount === '/') || disks[0] || {};
|
||||
|
||||
// Gauges
|
||||
const gauges = `
|
||||
<div class="sys-gauges">
|
||||
${C.gauge(cpu.percent || 0, { label: 'CPU', sublabel: `${cpu.cores || '?'} cores` })}
|
||||
${C.gauge(mem.percent || 0, { label: 'Memory', sublabel: `${this.bytes(mem.used)} / ${this.bytes(mem.total)}` })}
|
||||
${C.gauge(rootDisk.percent || 0, { label: 'Disk', sublabel: rootDisk.mount || '/' })}
|
||||
${C.gauge(cpu.load1_percent || 0, { label: 'Load', display: (cpu.load1 ?? 0), suffix: '', sublabel: `1m · ${cpu.load5 ?? '–'}/${cpu.load15 ?? '–'}` })}
|
||||
</div>`;
|
||||
const gauges = `<div class="sys-gauges">${this._gaugesHtml()}</div>`;
|
||||
|
||||
// Trend charts
|
||||
const rx = this.series('net_rx'), tx = this.series('net_tx');
|
||||
|
||||
@ -446,6 +446,11 @@ async function loadSystemInfo() {
|
||||
// Update disk usage chart
|
||||
updateDiskChart(diskChartData);
|
||||
|
||||
// Attach the 1 Hz live stream so the headline values tick like an
|
||||
// instrument. The static fetch above gave us a complete first paint; the
|
||||
// stream takes over for tactile updates without a page refresh.
|
||||
attachDashboardLive();
|
||||
|
||||
} catch (error) {
|
||||
console.error('Error loading system info:', error);
|
||||
// Fallback values
|
||||
@ -464,6 +469,41 @@ async function loadSystemInfo() {
|
||||
}
|
||||
}
|
||||
|
||||
// Wire the dashboard's headline values to the 1 Hz live SSE stream. Idempotent
|
||||
// — repeated calls swap the previous subscription for a fresh one so a SPA
|
||||
// re-mount of the dashboard doesn't double up. We listen via LiveSystem (a
|
||||
// singleton EventSource manager), so adding this subscription is essentially
|
||||
// free even with the Admin → System page open in another tab — same backend
|
||||
// ticker feeds both. Cleanup hangs off a route-change check: if the dashboard
|
||||
// DOM goes away we drop the sub on the next sample.
|
||||
let _dashboardLiveUnsub = null;
|
||||
function attachDashboardLive() {
|
||||
if (!window.LiveSystem) return;
|
||||
if (_dashboardLiveUnsub) { try { _dashboardLiveUnsub(); } catch (_) {} _dashboardLiveUnsub = null; }
|
||||
_dashboardLiveUnsub = window.LiveSystem.subscribe((s) => {
|
||||
const memoryEl = document.getElementById('memory-info');
|
||||
const diskCard = document.getElementById('disk-circle-fill');
|
||||
if (!memoryEl && !diskCard) {
|
||||
// Dashboard isn't on screen anymore — release the sub.
|
||||
if (_dashboardLiveUnsub) { try { _dashboardLiveUnsub(); } catch (_) {} _dashboardLiveUnsub = null; }
|
||||
return;
|
||||
}
|
||||
if (memoryEl && s && s.memory && Number.isFinite(s.memory.total) && s.memory.total > 0) {
|
||||
const usedGb = (s.memory.used / 1073741824).toFixed(2);
|
||||
const totalGb = (s.memory.total / 1073741824).toFixed(2);
|
||||
const pct = (s.memory.percent ?? 0).toFixed(1);
|
||||
const next = `${usedGb} GB / ${totalGb} GB - ${pct}%`;
|
||||
if (memoryEl.textContent !== next) memoryEl.textContent = next;
|
||||
}
|
||||
if (diskCard && Array.isArray(s?.disks) && s.disks.length) {
|
||||
const root = s.disks.find((d) => d.mount === '/') || s.disks[0];
|
||||
if (root && Number.isFinite(root.total) && root.total > 0) {
|
||||
updateDiskChart({ used: root.used, total: root.total });
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for dashboard elements to be available
|
||||
function waitForDashboardElements() {
|
||||
return new Promise((resolve) => {
|
||||
|
||||
111
containers/libreportal/frontend/js/utils/system-live.js
Normal file
111
containers/libreportal/frontend/js/utils/system-live.js
Normal file
@ -0,0 +1,111 @@
|
||||
// LibrePortal — live system telemetry client.
|
||||
//
|
||||
// Singleton EventSource manager. Subscribers register a callback that receives
|
||||
// each fused live sample pushed by the backend /api/system/stream endpoint.
|
||||
//
|
||||
// Design intent:
|
||||
// - One EventSource per tab no matter how many widgets subscribe — joining a
|
||||
// stream is cheap, but on a busy page (dashboard + admin → system both
|
||||
// mounted) several connections would each pin their own /proc reader on
|
||||
// the backend. One shared connection means one backend ticker, full stop.
|
||||
// - The connection only opens while at least one subscriber wants it, and
|
||||
// closes the moment the last unsubscribes — an idle WebUI tab uses zero
|
||||
// bandwidth and no server resources.
|
||||
// - Page Visibility integration: a backgrounded tab closes the stream and
|
||||
// reopens it on return, so a phone left in the pocket doesn't keep a
|
||||
// connection warm.
|
||||
// - Auto-reconnect with capped exponential backoff (1 s → 30 s).
|
||||
// - Late subscribers get the last-known sample synchronously so the UI can
|
||||
// draw a frame before the next push lands.
|
||||
window.LiveSystem = (() => {
|
||||
const ENDPOINT = '/api/system/stream';
|
||||
const MAX_BACKOFF_MS = 30000;
|
||||
|
||||
let es = null;
|
||||
let last = null;
|
||||
let backoff = 0;
|
||||
let reopenTimer = null;
|
||||
const subs = new Set();
|
||||
|
||||
function emit(payload) {
|
||||
for (const fn of subs) {
|
||||
try { fn(payload); } catch (_) { /* a misbehaving sub mustn't poison others */ }
|
||||
}
|
||||
}
|
||||
|
||||
function open() {
|
||||
if (es || subs.size === 0 || document.hidden) return;
|
||||
try {
|
||||
es = new EventSource(ENDPOINT);
|
||||
} catch (_) {
|
||||
scheduleReopen();
|
||||
return;
|
||||
}
|
||||
es.onopen = () => { backoff = 0; };
|
||||
es.onmessage = (ev) => {
|
||||
// Ignore the keepalive heartbeat (sent as a comment, doesn't fire
|
||||
// onmessage, but a stray empty data: line might).
|
||||
if (!ev.data) return;
|
||||
let payload;
|
||||
try { payload = JSON.parse(ev.data); } catch (_) { return; }
|
||||
last = payload;
|
||||
emit(payload);
|
||||
};
|
||||
es.onerror = () => {
|
||||
// EventSource auto-reconnects on its own, but only while the connection
|
||||
// hasn't been closed. If the server returned 401/5xx the browser may
|
||||
// close it; force a manual cycle with backoff so we don't hammer.
|
||||
close();
|
||||
scheduleReopen();
|
||||
};
|
||||
}
|
||||
|
||||
function close() {
|
||||
if (es) {
|
||||
try { es.close(); } catch (_) {}
|
||||
es = null;
|
||||
}
|
||||
}
|
||||
|
||||
function scheduleReopen() {
|
||||
if (reopenTimer || subs.size === 0) return;
|
||||
backoff = backoff ? Math.min(MAX_BACKOFF_MS, backoff * 2) : 1000;
|
||||
reopenTimer = setTimeout(() => { reopenTimer = null; open(); }, backoff);
|
||||
}
|
||||
|
||||
// Visibility: hide → close, show → reopen if anyone still cares.
|
||||
document.addEventListener('visibilitychange', () => {
|
||||
if (document.hidden) {
|
||||
close();
|
||||
if (reopenTimer) { clearTimeout(reopenTimer); reopenTimer = null; }
|
||||
} else {
|
||||
open();
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
// Register a callback to receive each live sample. Returns an
|
||||
// unsubscribe function. If a sample is already in hand it fires
|
||||
// synchronously so callers can render immediately.
|
||||
subscribe(fn) {
|
||||
if (typeof fn !== 'function') return () => {};
|
||||
subs.add(fn);
|
||||
if (last) {
|
||||
try { fn(last); } catch (_) {}
|
||||
}
|
||||
open();
|
||||
return () => {
|
||||
subs.delete(fn);
|
||||
if (subs.size === 0) {
|
||||
close();
|
||||
if (reopenTimer) { clearTimeout(reopenTimer); reopenTimer = null; }
|
||||
}
|
||||
};
|
||||
},
|
||||
// Fetch the last sample synchronously (or null if the stream hasn't
|
||||
// produced one yet). Useful for snapshot-style reads without a sub.
|
||||
get last() { return last; },
|
||||
// Test hook: count of active subscribers.
|
||||
get subCount() { return subs.size; }
|
||||
};
|
||||
})();
|
||||
Loading…
x
Reference in New Issue
Block a user