librelad 875a60f90f LibrePortal v0.1.0 — initial release
A free, open, self-hosted app platform (GNU AGPLv3): one-click app deploys,
Traefik reverse proxy with automatic SSL, rootless Docker support, gluetun
VPN routing, and a web dashboard to manage it all.

Free & open forever to self-host; optional paid hosted services fund it.
See PROMISE.md.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

Signed-off-by: librelad <librelad@digitalangels.vip>
2026-05-21 20:37:54 +01:00

431 lines
15 KiB
JavaScript
Executable File

// Task API + Server-Sent Events feed.
//
// Single source of truth: the task file under FRONTEND_DATA/tasks/<id>.json.
// Status field defines lifecycle (queued -> running -> completed|failed|cancelled).
// We never write `current.json` or `queue.json` from here — those are gone.
//
// Push model: clients subscribe to GET /api/tasks/events (SSE). We watch the
// tasks dir with fs.watch and emit events whenever:
// - a task file is created or modified -> task.upsert (full task object)
// - a task file is deleted -> task.deleted (id only)
// - a task .log file grows -> task.log (taskId, appendedText)
//
// Latency from a bash write to a connected client receiving the event is
// typically under 50ms.
const express = require('express');
const fs = require('fs');
const fsp = require('fs').promises;
const path = require('path');
const { requireAuth } = require('../utils/middleware.js');
const TASKS_DIR = path.join(__dirname, '..', '..', 'frontend', 'data', 'tasks');
const FIFO_PATH = path.join(TASKS_DIR, '.queue.fifo');
const PROCESSOR_LOCK = path.join(TASKS_DIR, '.processor.lock');
// =====================================================================
// SSE HUB
// =====================================================================
// One Set of `res` objects. Every event goes to all of them.
const sseClients = new Set();
let nextClientId = 1;
function sseBroadcast(event, data) {
if (sseClients.size === 0) return;
const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
for (const client of sseClients) {
try { client.res.write(payload); } catch { /* client gone; cleanup below */ }
}
}
function attachSseClient(req, res) {
const id = nextClientId++;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders?.();
// Initial hello so the client knows the connection is live.
res.write(`event: ready\ndata: ${JSON.stringify({ at: Date.now() })}\n\n`);
// Periodic comment-ping so intermediaries don't time the connection out.
const ping = setInterval(() => {
try { res.write(': ping\n\n'); } catch { /* will be cleaned on close */ }
}, 25_000);
const client = { id, res };
sseClients.add(client);
req.on('close', () => {
clearInterval(ping);
sseClients.delete(client);
});
}
// =====================================================================
// FILESYSTEM WATCH
// =====================================================================
// Single dir watcher. Per-file tracking is kept in `logTails` so we know
// where each task's log ended last time we read it (for incremental tail).
const logTails = new Map(); // taskId -> last position in bytes
const upsertDebounce = new Map(); // filename -> timer (debounce rapid writes)
async function emitTaskUpsert(filename) {
const id = filename.replace(/\.json$/, '');
const fullPath = path.join(TASKS_DIR, filename);
try {
const text = await fsp.readFile(fullPath, 'utf8');
if (!text.trim()) return;
const task = JSON.parse(text);
sseBroadcast('task.upsert', task);
} catch {
// File may have been deleted between watch event and read. Treat as deleted.
sseBroadcast('task.deleted', { id });
}
}
// Per-task coalescing. fs.watch fires multiple events for a single append
// (a `change` and sometimes a `rename`), and a naive emitLogTail reads
// `prev` at entry but only writes `logTails.set(...)` after stat+open+read.
// Concurrent invocations therefore see the same `prev`, read the same
// range, and broadcast the chunk twice — clients render duplicate lines.
// State per id: `undefined` = idle, `false` = running, `true` = running and
// another event arrived while running (re-run after current pass).
const tailInflight = new Map();
async function emitLogTail(filename) {
const id = filename.replace(/\.log$/, '');
if (tailInflight.has(id)) {
tailInflight.set(id, true);
return;
}
tailInflight.set(id, false);
try {
while (true) {
await emitLogTailOnce(id, filename);
if (tailInflight.get(id)) {
tailInflight.set(id, false);
continue;
}
break;
}
} finally {
tailInflight.delete(id);
}
}
async function emitLogTailOnce(id, filename) {
const fullPath = path.join(TASKS_DIR, filename);
let stat;
try { stat = await fsp.stat(fullPath); } catch { return; }
const prev = logTails.get(id) || 0;
// Truncated? Reset the cursor.
const start = stat.size < prev ? 0 : prev;
if (stat.size === start) return;
let chunk;
try {
const fh = await fsp.open(fullPath, 'r');
try {
const buf = Buffer.alloc(stat.size - start);
await fh.read(buf, 0, buf.length, start);
chunk = buf.toString('utf8');
} finally { await fh.close(); }
} catch { return; }
logTails.set(id, stat.size);
if (chunk) sseBroadcast('task.log', { id, chunk });
}
function startTasksWatcher() {
if (!fs.existsSync(TASKS_DIR)) {
try { fs.mkdirSync(TASKS_DIR, { recursive: true }); } catch {}
}
// Single recursive=false watch on the tasks dir is enough — task files and
// their .log siblings live there.
try {
fs.watch(TASKS_DIR, { persistent: true }, (eventType, filename) => {
if (!filename) return;
// Skip hidden files (.processor.lock, .queue.fifo, …).
if (filename.startsWith('.')) return;
// .json -> task upsert/delete
if (filename.startsWith('task_') && filename.endsWith('.json')) {
clearTimeout(upsertDebounce.get(filename));
upsertDebounce.set(filename, setTimeout(() => {
upsertDebounce.delete(filename);
const fullPath = path.join(TASKS_DIR, filename);
fs.access(fullPath, (err) => {
if (err) {
const id = filename.replace(/\.json$/, '');
logTails.delete(id);
sseBroadcast('task.deleted', { id });
} else {
emitTaskUpsert(filename).catch(() => {});
}
});
}, 30));
return;
}
// .log -> incremental tail
if (filename.startsWith('task_') && filename.endsWith('.log')) {
emitLogTail(filename).catch(() => {});
return;
}
});
} catch (err) {
console.error('[tasks] failed to start fs watcher:', err.message);
}
}
// =====================================================================
// FIFO WAKE-UP
// =====================================================================
// Best-effort poke at the bash processor. Never throws: if the FIFO doesn't
// exist or no reader is attached, we ignore the error and rely on the
// processor's idle timeout (≤3s) to pick the task up.
// Per-task fs.watchFile polling fallback. fs.watch (inotify) is the primary
// notifier but can miss events on Docker bind-mounts; this 1s polling pass
// ensures status flips reach SSE within ~1s even if inotify silently drops.
// Self-disarms once the task hits a terminal state.
const activePolls = new Map();
function armActiveTaskPoll(taskId) {
if (activePolls.has(taskId)) return;
const filePath = path.join(TASKS_DIR, `${taskId}.json`);
let lastStatus = null;
fs.watchFile(filePath, { interval: 1000 }, () => {
fs.readFile(filePath, 'utf8', (err, data) => {
if (err) { disarmActiveTaskPoll(taskId); return; }
let task; try { task = JSON.parse(data); } catch { return; }
if (task.status !== lastStatus) {
lastStatus = task.status;
sseBroadcast('task.upsert', task);
}
if (task.status === 'completed' || task.status === 'failed' || task.status === 'cancelled') {
disarmActiveTaskPoll(taskId);
}
});
});
activePolls.set(taskId, filePath);
}
function disarmActiveTaskPoll(taskId) {
const fp = activePolls.get(taskId);
if (!fp) return;
fs.unwatchFile(fp);
activePolls.delete(taskId);
}
function pokeFifo(taskId) {
fs.open(FIFO_PATH, fs.constants.O_WRONLY | fs.constants.O_NONBLOCK, (err, fd) => {
if (err) return; // No reader / FIFO missing — fine.
fs.write(fd, `${taskId}\n`, () => fs.close(fd, () => {}));
});
}
// =====================================================================
// HELPERS
// =====================================================================
function generateTaskId() {
return `task_${Date.now()}_${Math.random().toString(36).slice(2, 11)}`;
}
function isValidTaskId(id) {
return typeof id === 'string' && /^task_[0-9]+_[a-z0-9]+$/i.test(id);
}
async function readTask(id) {
const text = await fsp.readFile(path.join(TASKS_DIR, `${id}.json`), 'utf8');
return JSON.parse(text);
}
async function writeTaskAtomic(id, task) {
const final = path.join(TASKS_DIR, `${id}.json`);
const tmp = `${final}.tmp.${process.pid}.${Date.now()}`;
await fsp.writeFile(tmp, JSON.stringify(task, null, 2), 'utf8');
await fsp.rename(tmp, final);
}
// =====================================================================
// ROUTES
// =====================================================================
const router = express.Router();
// SSE feed. Held open for the life of the page.
router.get('/events', requireAuth, (req, res) => {
attachSseClient(req, res);
});
// List all tasks (returns lightweight summaries).
router.get('/', requireAuth, async (req, res) => {
try {
const entries = await fsp.readdir(TASKS_DIR);
const out = [];
for (const entry of entries) {
if (!entry.startsWith('task_') || !entry.endsWith('.json')) continue;
try {
const text = await fsp.readFile(path.join(TASKS_DIR, entry), 'utf8');
if (!text.trim()) continue;
const task = JSON.parse(text);
out.push(task);
} catch { /* skip unreadable entries */ }
}
out.sort((a, b) => String(b.createdAt || b.created_at || '').localeCompare(String(a.createdAt || a.created_at || '')));
res.json(out);
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// Read a single task.
router.get('/:id', requireAuth, async (req, res) => {
const { id } = req.params;
if (!isValidTaskId(id)) return res.status(400).json({ error: 'Invalid task id' });
try {
const task = await readTask(id);
res.json(task);
} catch (err) {
if (err.code === 'ENOENT') return res.status(404).json({ error: 'Task not found' });
res.status(500).json({ error: err.message });
}
});
// Create a task.
router.post('/', requireAuth, async (req, res) => {
try {
const { command, type = 'custom', app = null, config = '' } = req.body || {};
if (typeof command !== 'string' || !command.trim()) {
return res.status(400).json({ error: '`command` is required' });
}
const id = generateTaskId();
const task = {
id,
command,
type,
app,
config,
status: 'queued',
createdAt: new Date().toISOString(),
startedAt: null,
completedAt: null,
heartbeatAt: null,
exitCode: null,
errorMessage: null
};
await writeTaskAtomic(id, task);
pokeFifo(id);
sseBroadcast('task.upsert', task);
// fs.watch is unreliable on Docker bind-mounts; add a 1s polling
// fallback for this task until it terminates so a missed inotify
// event can't strand it as "running" in the UI.
armActiveTaskPoll(id);
res.status(201).json(task);
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// Cancel a task. Drops a `<id>.cancel` marker that the processor's heartbeat
// loop notices and then SIGTERMs the running command.
router.post('/:id/cancel', requireAuth, async (req, res) => {
const { id } = req.params;
if (!isValidTaskId(id)) return res.status(400).json({ error: 'Invalid task id' });
try {
const task = await readTask(id);
if (task.status !== 'running' && task.status !== 'queued') {
return res.status(409).json({ error: `Task is ${task.status}; cannot cancel.` });
}
if (task.status === 'queued') {
const updated = {
...task,
status: 'cancelled',
completedAt: new Date().toISOString(),
errorMessage: 'Cancelled before start.'
};
await writeTaskAtomic(id, updated);
sseBroadcast('task.upsert', updated);
return res.json(updated);
}
// Drop the cancel marker; processor picks it up within HEARTBEAT_INTERVAL.
await fsp.writeFile(path.join(TASKS_DIR, `${id}.cancel`), '', 'utf8');
res.json({ ok: true });
} catch (err) {
if (err.code === 'ENOENT') return res.status(404).json({ error: 'Task not found' });
res.status(500).json({ error: err.message });
}
});
// Read full log (non-streaming). Use `position` for incremental polling
// fallback if SSE isn't connected for some reason.
router.get('/:id/log', requireAuth, async (req, res) => {
const { id } = req.params;
if (!isValidTaskId(id)) return res.status(400).json({ error: 'Invalid task id' });
const pos = parseInt(req.query.position, 10) || 0;
try {
const text = await fsp.readFile(path.join(TASKS_DIR, `${id}.log`), 'utf8');
res.type('text/plain').send(pos > 0 ? text.slice(pos) : text);
} catch (err) {
if (err.code === 'ENOENT') return res.status(404).type('text/plain').send('');
res.status(500).json({ error: err.message });
}
});
// Delete a task. Default behaviour blocks deletion of running/queued tasks
// so the user can't accidentally orphan an in-flight workload — but
// `?force=1` overrides that, which is needed when a stuck task can't be
// cancelled (e.g. the bash processor has died, the cancel marker isn't
// being picked up, etc.) and the user just wants the row gone.
router.delete('/:id', requireAuth, async (req, res) => {
const { id } = req.params;
if (!isValidTaskId(id)) return res.status(400).json({ error: 'Invalid task id' });
const force = req.query.force === '1' || req.query.force === 'true';
try {
const task = await readTask(id);
if (!force && (task.status === 'running' || task.status === 'queued')) {
return res.status(409).json({ error: `Task is ${task.status}; cancel first.` });
}
await fsp.unlink(path.join(TASKS_DIR, `${id}.json`)).catch(() => {});
await fsp.unlink(path.join(TASKS_DIR, `${id}.log`)).catch(() => {});
// Drop any leftover cancel marker so the processor (when it does come
// back up) doesn't try to act on a task file that no longer exists.
await fsp.unlink(path.join(TASKS_DIR, `${id}.cancel`)).catch(() => {});
logTails.delete(id);
sseBroadcast('task.deleted', { id });
res.json({ ok: true, forced: force });
} catch (err) {
if (err.code === 'ENOENT') return res.status(404).json({ error: 'Task not found' });
res.status(500).json({ error: err.message });
}
});
// Health-check style endpoint: is the bash processor alive?
router.get('/_meta/health', requireAuth, async (req, res) => {
let processorAlive = false;
try {
const pidText = await fsp.readFile(`${PROCESSOR_LOCK}.pid`, 'utf8');
const pid = parseInt(pidText.trim(), 10);
processorAlive = Number.isFinite(pid) && pid > 0;
// We can't easily verify the pid is alive from inside the container if the
// processor runs on the host — but the existence of the pid file is a
// reasonable proxy.
} catch { /* processor not running */ }
res.json({
processorAlive,
sseClients: sseClients.size,
tasksDir: TASKS_DIR
});
});
// Init the watcher exactly once when the module is required.
startTasksWatcher();
module.exports = router;