// Persistent metrics-history writer. // // Runs alongside the SSE ticker inside the libreportal container. Every // minute, on the bucket boundary, it composes a single sample from /proc plus // the latest host-side JSON snapshots and appends it to the 1-minute ring; // every 5 minutes it also pushes a (5-pt average) point into the 5-minute // ring. Independent of whether any client is subscribed to /api/system/stream // — the trend charts must keep filling even when nobody's watching. // // On startup, if the 1-minute ring is empty but the legacy metrics_history. // json exists, we backfill from it so first paint already has 24 h of data. const fs = require('fs').promises; const path = require('path'); const { MetricsRing } = require('./metrics-ring.js'); const ONE_MIN = 60; const FIVE_MIN = 300; const ONE_MIN_CAP = 1440; // 24 h const FIVE_MIN_CAP = 2016; // 7 d const HOST_JSON_DIR = path.join(__dirname, '..', '..', 'frontend', 'data', 'system'); const RING_1M = path.join(HOST_JSON_DIR, 'metrics_ring_1m.bin'); const RING_5M = path.join(HOST_JSON_DIR, 'metrics_ring_5m.bin'); const LEGACY_HIST = path.join(HOST_JSON_DIR, 'metrics_history.json'); const ring1 = new MetricsRing({ file: RING_1M, capacity: ONE_MIN_CAP, bucketSec: ONE_MIN }); const ring5 = new MetricsRing({ file: RING_5M, capacity: FIVE_MIN_CAP, bucketSec: FIVE_MIN }); // readSampleFn is injected so we don't have a circular require with system- // routes.js (which also wants to use sample()). let readSample = null; let readHostJson = null; let tickHandle = null; let started = false; // Floor a unix-seconds timestamp to the given bucket size. const floorBucket = (t, sec) => Math.floor(t / sec) * sec; async function safeJson(file) { try { return JSON.parse(await fs.readFile(file, 'utf8')); } catch (_) { return null; } } // Build a metrics point from a live /proc sample + the latest host JSON. async function composePoint(t) { const live = await readSample(); const hostMetrics = await safeJson(path.join(HOST_JSON_DIR, 'metrics.json')); const disks = Array.isArray(hostMetrics?.disks) ? hostMetrics.disks : []; const rootDisk = disks.find(d => d.mount === '/') || disks[0] || {}; const net = hostMetrics?.network || {}; return { t, cpu: Number(live?.cpu?.percent) || 0, mem: Number(live?.memory?.percent) || 0, swap: Number(live?.memory?.swap_percent) || 0, disk: Number(rootDisk.percent) || 0, load1: Number(live?.cpu?.load1) || 0, net_rx: Number(net.rx_rate) || 0, net_tx: Number(net.tx_rate) || 0, }; } // Average the latest 5 1-min points into one 5-min bucket. Keeps the same // shape as composePoint() so it slots straight into ring5.append. function averagePoints(pts, t) { if (!pts.length) return null; const sum = { cpu: 0, mem: 0, swap: 0, disk: 0, load1: 0, net_rx: 0, net_tx: 0 }; for (const p of pts) for (const k of Object.keys(sum)) sum[k] += Number(p[k]) || 0; const n = pts.length; return { t, cpu: sum.cpu / n, mem: sum.mem / n, swap: sum.swap / n, disk: sum.disk / n, load1: sum.load1 / n, net_rx: sum.net_rx / n, net_tx: sum.net_tx / n, }; } // Backfill the 1-min ring from the legacy JSON if and only if the ring is // empty. Idempotent; safe to call on every startup. async function backfillFromLegacy() { await ring1.open(); if ((await ring1.lastT()) > 0) return false; const j = await safeJson(LEGACY_HIST); const pts = Array.isArray(j?.points) ? j.points : []; if (!pts.length) return false; let last = 0; let appended = 0; for (const p of pts) { const t = floorBucket(Number(p.t) || 0, ONE_MIN); if (t <= last) continue; // points must advance monotonically last = t; await ring1.append({ t, cpu: Number(p.cpu) || 0, mem: Number(p.mem) || 0, swap: Number(p.swap) || 0, disk: Number(p.disk) || 0, load1: Number(p.load1) || 0, net_rx: Number(p.net_rx) || 0, net_tx: Number(p.net_tx) || 0, }); appended++; } return appended; } // Read one minute / five minute slice in the format the API returns. async function read(rangeMin, tier) { const r = tier === '5m' ? ring5 : ring1; const pts = await r.readLast(rangeMin); return pts; } // Single tick. Fires once per minute (give or take a few ms drift) and writes // at most one 1m point + optionally one 5m point. Idempotent within a bucket // — if the ring's last_t already matches the bucket we're about to write, // skip. async function tick() { try { const now = Math.floor(Date.now() / 1000); const bucket1 = floorBucket(now, ONE_MIN); const last1 = await ring1.lastT(); if (bucket1 <= last1) return; // already wrote this minute const point = await composePoint(bucket1); await ring1.append(point); const bucket5 = floorBucket(now, FIVE_MIN); const last5 = await ring5.lastT(); if (bucket5 > last5 && (now - bucket5) < ONE_MIN * 2) { // We've just crossed a 5-min boundary; average the last 5 1-min // points to form the 5-min point. Window the average to the // 5-min bucket so a long run-up doesn't smear into the new one. const recent = await ring1.readLast(5); const inWindow = recent.filter(p => p.t >= bucket5 && p.t < bucket5 + FIVE_MIN); const avgPts = inWindow.length ? inWindow : recent; const avg = averagePoints(avgPts, bucket5); if (avg) await ring5.append(avg); } } catch (err) { // Swallow — a single failed tick mustn't kill the writer. The next // boundary will retry. Log loudly enough to be findable but not so // loudly that a missing JSON file spams the console. if (process.env.METRICS_DEBUG) console.error('metrics-writer tick:', err.message); } } // Public API. Pass in the read functions so we don't double-require system- // routes.js (which owns the shared cpu/mem sampler). function start({ sampleFn, hostJsonFn } = {}) { if (started) return; started = true; readSample = sampleFn; readHostJson = hostJsonFn; // Defer the first real tick to the start of the next minute so the // boundary is clean. In the meantime, kick a backfill in the background. backfillFromLegacy().catch(() => {}); const align = () => { const ms = Date.now(); const toNextMin = ONE_MIN * 1000 - (ms % (ONE_MIN * 1000)); setTimeout(() => { tick(); tickHandle = setInterval(tick, ONE_MIN * 1000); }, toNextMin + 200); // tiny offset so the host generator has finished its own bucket }; align(); } function stop() { if (tickHandle) { clearInterval(tickHandle); tickHandle = null; } started = false; Promise.all([ring1.close(), ring5.close()]).catch(() => {}); } module.exports = { start, stop, read, // exposed for tests / introspection _ring1: ring1, _ring5: ring5, ONE_MIN_CAP, FIVE_MIN_CAP, };