// Task API + Server-Sent Events feed. // // Single source of truth: the task file under FRONTEND_DATA/tasks/.json. // Status field defines lifecycle (queued -> running -> completed|failed|cancelled). // We never write `current.json` or `queue.json` from here — those are gone. // // Push model: clients subscribe to GET /api/tasks/events (SSE). We watch the // tasks dir with fs.watch and emit events whenever: // - a task file is created or modified -> task.upsert (full task object) // - a task file is deleted -> task.deleted (id only) // - a task .log file grows -> task.log (taskId, appendedText) // // Latency from a bash write to a connected client receiving the event is // typically under 50ms. const express = require('express'); const fs = require('fs'); const fsp = require('fs').promises; const path = require('path'); const { requireAuth } = require('../utils/middleware.js'); const TASKS_DIR = path.join(__dirname, '..', '..', 'frontend', 'data', 'tasks'); const FIFO_PATH = path.join(TASKS_DIR, '.queue.fifo'); const PROCESSOR_LOCK = path.join(TASKS_DIR, '.processor.lock'); // ===================================================================== // SSE HUB // ===================================================================== // One Set of `res` objects. Every event goes to all of them. const sseClients = new Set(); let nextClientId = 1; function sseBroadcast(event, data) { if (sseClients.size === 0) return; const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; for (const client of sseClients) { try { client.res.write(payload); } catch { /* client gone; cleanup below */ } } } function attachSseClient(req, res) { const id = nextClientId++; res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache, no-transform'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); res.flushHeaders?.(); // Initial hello so the client knows the connection is live. res.write(`event: ready\ndata: ${JSON.stringify({ at: Date.now() })}\n\n`); // Periodic comment-ping so intermediaries don't time the connection out. const ping = setInterval(() => { try { res.write(': ping\n\n'); } catch { /* will be cleaned on close */ } }, 25_000); const client = { id, res }; sseClients.add(client); req.on('close', () => { clearInterval(ping); sseClients.delete(client); }); } // ===================================================================== // FILESYSTEM WATCH // ===================================================================== // Single dir watcher. Per-file tracking is kept in `logTails` so we know // where each task's log ended last time we read it (for incremental tail). const logTails = new Map(); // taskId -> last position in bytes const upsertDebounce = new Map(); // filename -> timer (debounce rapid writes) async function emitTaskUpsert(filename) { const id = filename.replace(/\.json$/, ''); const fullPath = path.join(TASKS_DIR, filename); try { const text = await fsp.readFile(fullPath, 'utf8'); if (!text.trim()) return; const task = JSON.parse(text); sseBroadcast('task.upsert', task); } catch { // File may have been deleted between watch event and read. Treat as deleted. sseBroadcast('task.deleted', { id }); } } // Per-task coalescing. fs.watch fires multiple events for a single append // (a `change` and sometimes a `rename`), and a naive emitLogTail reads // `prev` at entry but only writes `logTails.set(...)` after stat+open+read. // Concurrent invocations therefore see the same `prev`, read the same // range, and broadcast the chunk twice — clients render duplicate lines. // State per id: `undefined` = idle, `false` = running, `true` = running and // another event arrived while running (re-run after current pass). const tailInflight = new Map(); async function emitLogTail(filename) { const id = filename.replace(/\.log$/, ''); if (tailInflight.has(id)) { tailInflight.set(id, true); return; } tailInflight.set(id, false); try { while (true) { await emitLogTailOnce(id, filename); if (tailInflight.get(id)) { tailInflight.set(id, false); continue; } break; } } finally { tailInflight.delete(id); } } async function emitLogTailOnce(id, filename) { const fullPath = path.join(TASKS_DIR, filename); let stat; try { stat = await fsp.stat(fullPath); } catch { return; } const prev = logTails.get(id) || 0; // Truncated? Reset the cursor. const start = stat.size < prev ? 0 : prev; if (stat.size === start) return; let chunk; try { const fh = await fsp.open(fullPath, 'r'); try { const buf = Buffer.alloc(stat.size - start); await fh.read(buf, 0, buf.length, start); chunk = buf.toString('utf8'); } finally { await fh.close(); } } catch { return; } logTails.set(id, stat.size); if (chunk) sseBroadcast('task.log', { id, chunk }); } function startTasksWatcher() { if (!fs.existsSync(TASKS_DIR)) { try { fs.mkdirSync(TASKS_DIR, { recursive: true }); } catch {} } // Single recursive=false watch on the tasks dir is enough — task files and // their .log siblings live there. try { fs.watch(TASKS_DIR, { persistent: true }, (eventType, filename) => { if (!filename) return; // Skip hidden files (.processor.lock, .queue.fifo, …). if (filename.startsWith('.')) return; // .json -> task upsert/delete if (filename.startsWith('task_') && filename.endsWith('.json')) { clearTimeout(upsertDebounce.get(filename)); upsertDebounce.set(filename, setTimeout(() => { upsertDebounce.delete(filename); const fullPath = path.join(TASKS_DIR, filename); fs.access(fullPath, (err) => { if (err) { const id = filename.replace(/\.json$/, ''); logTails.delete(id); sseBroadcast('task.deleted', { id }); } else { emitTaskUpsert(filename).catch(() => {}); } }); }, 30)); return; } // .log -> incremental tail if (filename.startsWith('task_') && filename.endsWith('.log')) { emitLogTail(filename).catch(() => {}); return; } }); } catch (err) { console.error('[tasks] failed to start fs watcher:', err.message); } } // ===================================================================== // FIFO WAKE-UP // ===================================================================== // Best-effort poke at the bash processor. Never throws: if the FIFO doesn't // exist or no reader is attached, we ignore the error and rely on the // processor's idle timeout (≤3s) to pick the task up. // Per-task fs.watchFile polling fallback. fs.watch (inotify) is the primary // notifier but can miss events on Docker bind-mounts; this 1s polling pass // ensures status flips reach SSE within ~1s even if inotify silently drops. // Self-disarms once the task hits a terminal state. const activePolls = new Map(); function armActiveTaskPoll(taskId) { if (activePolls.has(taskId)) return; const filePath = path.join(TASKS_DIR, `${taskId}.json`); let lastStatus = null; fs.watchFile(filePath, { interval: 1000 }, () => { fs.readFile(filePath, 'utf8', (err, data) => { if (err) { disarmActiveTaskPoll(taskId); return; } let task; try { task = JSON.parse(data); } catch { return; } if (task.status !== lastStatus) { lastStatus = task.status; sseBroadcast('task.upsert', task); } if (task.status === 'completed' || task.status === 'failed' || task.status === 'cancelled') { disarmActiveTaskPoll(taskId); } }); }); activePolls.set(taskId, filePath); } function disarmActiveTaskPoll(taskId) { const fp = activePolls.get(taskId); if (!fp) return; fs.unwatchFile(fp); activePolls.delete(taskId); } function pokeFifo(taskId) { fs.open(FIFO_PATH, fs.constants.O_WRONLY | fs.constants.O_NONBLOCK, (err, fd) => { if (err) return; // No reader / FIFO missing — fine. fs.write(fd, `${taskId}\n`, () => fs.close(fd, () => {})); }); } // ===================================================================== // HELPERS // ===================================================================== function generateTaskId() { return `task_${Date.now()}_${Math.random().toString(36).slice(2, 11)}`; } function isValidTaskId(id) { return typeof id === 'string' && /^task_[0-9]+_[a-z0-9]+$/i.test(id); } async function readTask(id) { const text = await fsp.readFile(path.join(TASKS_DIR, `${id}.json`), 'utf8'); return JSON.parse(text); } async function writeTaskAtomic(id, task) { const final = path.join(TASKS_DIR, `${id}.json`); const tmp = `${final}.tmp.${process.pid}.${Date.now()}`; await fsp.writeFile(tmp, JSON.stringify(task, null, 2), 'utf8'); await fsp.rename(tmp, final); } // ===================================================================== // ROUTES // ===================================================================== const router = express.Router(); // SSE feed. Held open for the life of the page. router.get('/events', requireAuth, (req, res) => { attachSseClient(req, res); }); // List all tasks (returns lightweight summaries). router.get('/', requireAuth, async (req, res) => { try { const entries = await fsp.readdir(TASKS_DIR); const out = []; for (const entry of entries) { if (!entry.startsWith('task_') || !entry.endsWith('.json')) continue; try { const text = await fsp.readFile(path.join(TASKS_DIR, entry), 'utf8'); if (!text.trim()) continue; const task = JSON.parse(text); out.push(task); } catch { /* skip unreadable entries */ } } out.sort((a, b) => String(b.createdAt || b.created_at || '').localeCompare(String(a.createdAt || a.created_at || ''))); res.json(out); } catch (err) { res.status(500).json({ error: err.message }); } }); // Read a single task. router.get('/:id', requireAuth, async (req, res) => { const { id } = req.params; if (!isValidTaskId(id)) return res.status(400).json({ error: 'Invalid task id' }); try { const task = await readTask(id); res.json(task); } catch (err) { if (err.code === 'ENOENT') return res.status(404).json({ error: 'Task not found' }); res.status(500).json({ error: err.message }); } }); // Create a task. router.post('/', requireAuth, async (req, res) => { try { const { command, type = 'custom', app = null, config = '' } = req.body || {}; if (typeof command !== 'string' || !command.trim()) { return res.status(400).json({ error: '`command` is required' }); } const id = generateTaskId(); const task = { id, command, type, app, config, status: 'queued', createdAt: new Date().toISOString(), startedAt: null, completedAt: null, heartbeatAt: null, exitCode: null, errorMessage: null }; await writeTaskAtomic(id, task); pokeFifo(id); sseBroadcast('task.upsert', task); // fs.watch is unreliable on Docker bind-mounts; add a 1s polling // fallback for this task until it terminates so a missed inotify // event can't strand it as "running" in the UI. armActiveTaskPoll(id); res.status(201).json(task); } catch (err) { res.status(500).json({ error: err.message }); } }); // Cancel a task. Drops a `.cancel` marker that the processor's heartbeat // loop notices and then SIGTERMs the running command. router.post('/:id/cancel', requireAuth, async (req, res) => { const { id } = req.params; if (!isValidTaskId(id)) return res.status(400).json({ error: 'Invalid task id' }); try { const task = await readTask(id); if (task.status !== 'running' && task.status !== 'queued') { return res.status(409).json({ error: `Task is ${task.status}; cannot cancel.` }); } if (task.status === 'queued') { const updated = { ...task, status: 'cancelled', completedAt: new Date().toISOString(), errorMessage: 'Cancelled before start.' }; await writeTaskAtomic(id, updated); sseBroadcast('task.upsert', updated); return res.json(updated); } // Drop the cancel marker; processor picks it up within HEARTBEAT_INTERVAL. await fsp.writeFile(path.join(TASKS_DIR, `${id}.cancel`), '', 'utf8'); res.json({ ok: true }); } catch (err) { if (err.code === 'ENOENT') return res.status(404).json({ error: 'Task not found' }); res.status(500).json({ error: err.message }); } }); // Read full log (non-streaming). Use `position` for incremental polling // fallback if SSE isn't connected for some reason. router.get('/:id/log', requireAuth, async (req, res) => { const { id } = req.params; if (!isValidTaskId(id)) return res.status(400).json({ error: 'Invalid task id' }); const pos = parseInt(req.query.position, 10) || 0; try { const text = await fsp.readFile(path.join(TASKS_DIR, `${id}.log`), 'utf8'); res.type('text/plain').send(pos > 0 ? text.slice(pos) : text); } catch (err) { if (err.code === 'ENOENT') return res.status(404).type('text/plain').send(''); res.status(500).json({ error: err.message }); } }); // Delete a task. Default behaviour blocks deletion of running/queued tasks // so the user can't accidentally orphan an in-flight workload — but // `?force=1` overrides that, which is needed when a stuck task can't be // cancelled (e.g. the bash processor has died, the cancel marker isn't // being picked up, etc.) and the user just wants the row gone. router.delete('/:id', requireAuth, async (req, res) => { const { id } = req.params; if (!isValidTaskId(id)) return res.status(400).json({ error: 'Invalid task id' }); const force = req.query.force === '1' || req.query.force === 'true'; try { const task = await readTask(id); if (!force && (task.status === 'running' || task.status === 'queued')) { return res.status(409).json({ error: `Task is ${task.status}; cancel first.` }); } await fsp.unlink(path.join(TASKS_DIR, `${id}.json`)).catch(() => {}); await fsp.unlink(path.join(TASKS_DIR, `${id}.log`)).catch(() => {}); // Drop any leftover cancel marker so the processor (when it does come // back up) doesn't try to act on a task file that no longer exists. await fsp.unlink(path.join(TASKS_DIR, `${id}.cancel`)).catch(() => {}); logTails.delete(id); sseBroadcast('task.deleted', { id }); res.json({ ok: true, forced: force }); } catch (err) { if (err.code === 'ENOENT') return res.status(404).json({ error: 'Task not found' }); res.status(500).json({ error: err.message }); } }); // Health-check style endpoint: is the bash processor alive? router.get('/_meta/health', requireAuth, async (req, res) => { let processorAlive = false; try { const pidText = await fsp.readFile(`${PROCESSOR_LOCK}.pid`, 'utf8'); const pid = parseInt(pidText.trim(), 10); processorAlive = Number.isFinite(pid) && pid > 0; // We can't easily verify the pid is alive from inside the container if the // processor runs on the host — but the existence of the pid file is a // reasonable proxy. } catch { /* processor not running */ } res.json({ processorAlive, sseClients: sseClients.size, tasksDir: TASKS_DIR }); }); // Init the watcher exactly once when the module is required. startTasksWatcher(); module.exports = router;