/** * TaskEventBus — single SSE connection for the page. * * Connects to /api/tasks/events. Translates the server's SSE events * (task.upsert, task.deleted, task.log) into the existing window-level * CustomEvents that the rest of the UI already listens for: * - taskCreated (when a brand-new task appears) * - taskUpdated (status change while still active) * - taskCompleted (status -> completed | failed | cancelled) * - taskLog (new log lines for a running task) * - taskDeleted (task removed) * * The bus also exposes a `tasks` Map keyed by id holding the latest known * task object — components can read this synchronously instead of fetching. */ class TaskEventBus { constructor() { this.tasks = new Map(); // id -> latest task object this.eventSource = null; this.reconnectTimer = null; this.connected = false; // Track previous status per task so we can decide created vs updated vs completed. this._lastStatus = new Map(); } start() { if (this.eventSource) return; this._open(); } stop() { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } if (this.eventSource) { this.eventSource.close(); this.eventSource = null; } this.connected = false; } // Convenience accessors used by UI components. getTask(id) { return this.tasks.get(id) || null; } getRunningTasks() { const out = []; for (const t of this.tasks.values()) { if (t.status === 'running' || t.status === 'queued' || t.status === 'pending') out.push(t); } return out; } getRunningForApp(appName) { return this.getRunningTasks().filter(t => t.app === appName); } // ---- internals -------------------------------------------------------- _open() { try { this.eventSource = new EventSource('/api/tasks/events'); } catch (err) { this._scheduleReconnect(); return; } this.eventSource.addEventListener('ready', () => { this.connected = true; window.dispatchEvent(new CustomEvent('taskBusReady')); }); this.eventSource.addEventListener('task.upsert', (e) => { let task; try { task = JSON.parse(e.data); } catch { return; } if (!task || !task.id) return; this._handleUpsert(task); }); this.eventSource.addEventListener('task.deleted', (e) => { let payload; try { payload = JSON.parse(e.data); } catch { return; } if (!payload || !payload.id) return; this.tasks.delete(payload.id); this._lastStatus.delete(payload.id); window.dispatchEvent(new CustomEvent('taskDeleted', { detail: { id: payload.id } })); }); this.eventSource.addEventListener('task.log', (e) => { let payload; try { payload = JSON.parse(e.data); } catch { return; } if (!payload || !payload.id || typeof payload.chunk !== 'string') return; window.dispatchEvent(new CustomEvent('taskLog', { detail: { id: payload.id, chunk: payload.chunk } })); }); this.eventSource.onerror = () => { // Browser will auto-retry, but we want a deterministic backoff on top // so we don't hammer the server during a long outage. this.connected = false; this.eventSource && this.eventSource.close(); this.eventSource = null; this._scheduleReconnect(); }; } _scheduleReconnect() { if (this.reconnectTimer) return; this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; this._open(); }, 3000); } _handleUpsert(task) { const prevStatus = this._lastStatus.get(task.id); const isNew = !this.tasks.has(task.id); this.tasks.set(task.id, task); this._lastStatus.set(task.id, task.status); const detail = { taskId: task.id, appName: task.app || null, action: task.type || 'unknown', status: task.status, task, timestamp: Date.now() }; if (isNew) { window.dispatchEvent(new CustomEvent('taskCreated', { detail })); } const isTerminal = task.status === 'completed' || task.status === 'failed' || task.status === 'cancelled'; const wasTerminal = prevStatus === 'completed' || prevStatus === 'failed' || prevStatus === 'cancelled'; if (isTerminal && !wasTerminal) { window.dispatchEvent(new CustomEvent('taskCompleted', { detail })); } else if (!isNew) { window.dispatchEvent(new CustomEvent('taskUpdated', { detail })); } } } // One instance per page. window.taskEventBus = window.taskEventBus || new TaskEventBus(); window.TaskEventBus = TaskEventBus;