#!/bin/bash # ============================================================================ # LibrePortal Task Processor — v2 # # Design goals: # 1. Single source of truth: the task file (`.json`). `status` is the # only field anyone reads to know what a task is doing. # 2. Single instance: enforced via flock. A second process exits cleanly. # 3. Push-based: the daemon blocks on a FIFO read and wakes within # milliseconds when Node writes a task id to the FIFO. # 4. Subprocess isolation: every command runs inside `setsid bash -c …` # so that a non-zero exit (or signal, or `exit 1` from checkSuccess in # non-interactive mode) kills the subprocess, not the processor. # 5. Atomic writes: every state transition is `tmp + mv`. Readers never # see a half-written file. # 6. Heartbeat: the processor stamps `heartbeat_at` every 5s while a task # runs. Stale heartbeats (>60s on a `running` task) are recovered to # `failed` on the next idle cycle. # 7. Cancellable: a `.cancel` marker file triggers SIGTERM → SIGKILL # to the task's process group. # ============================================================================ script_task_processor_flag="$1" # Source guard — DO NOT remove. mainLoop is an infinite loop. [[ "$script_task_processor_flag" != "start_script" ]] && return 0 2>/dev/null # ============================================================================ # PATHS & CONSTANTS # ============================================================================ TASK_DIR="${TASK_DIR:-/docker/containers/libreportal/frontend/data/tasks}" LOCK_FILE="$TASK_DIR/.processor.lock" FIFO="$TASK_DIR/.queue.fifo" LOG_FILE="$TASK_DIR/task_processor.log" # Cron's default PATH is minimal (/usr/bin:/bin) and doesn't include the # /usr/local/bin where the libreportal CLI lives. Augment it so eval'd commands # can find their binaries. export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:$PATH" HEARTBEAT_INTERVAL=5 HEARTBEAT_STALE_SECS=60 IDLE_POLL_SECS=3 # max time we'll block on FIFO before re-scanning anyway # ============================================================================ # LOGGING # ============================================================================ logInfo() { echo "$(date '+%Y-%m-%d %H:%M:%S') [INFO ] $*" | sudo tee -a "$LOG_FILE" >/dev/null; } logError() { echo "$(date '+%Y-%m-%d %H:%M:%S') [ERROR] $*" | sudo tee -a "$LOG_FILE" >/dev/null; } logDebug() { [[ "${LOG_LEVEL:-INFO}" == "DEBUG" ]] && echo "$(date '+%Y-%m-%d %H:%M:%S') [DEBUG] $*" | sudo tee -a "$LOG_FILE" >/dev/null; } # ============================================================================ # FILE OPS — atomic writes, JSON helpers # ============================================================================ # Atomically replace a file's content. POSIX rename(2) is atomic on the same # filesystem, so concurrent readers either see the old or the new file — # never a half-written one. writeAtomic() { local target="$1"; local content="$2" local tmp="${target}.tmp.$$" printf '%s' "$content" | sudo tee "$tmp" >/dev/null sudo chmod 644 "$tmp" 2>/dev/null sudo mv "$tmp" "$target" } # Update one or more fields on a task file, atomically. Args after taskFile # are alternating pairs. # updateTaskFields "$file" status running heartbeat_at "$(date -Iseconds)" updateTaskFields() { local taskFile="$1"; shift [[ -f "$taskFile" ]] || return 1 command -v jq >/dev/null 2>&1 || return 1 local jqArgs=() jqExpr='.' while [[ $# -ge 2 ]]; do local key="$1"; local value="$2"; shift 2 jqArgs+=(--arg "$key" "$value") jqExpr="$jqExpr | .$key = \$$key" done local tmp="${taskFile}.tmp.$$" jq "${jqArgs[@]}" "$jqExpr" "$taskFile" 2>/dev/null > "$tmp" \ && sudo chmod 644 "$tmp" 2>/dev/null \ && sudo mv "$tmp" "$taskFile" } readTaskField() { local taskFile="$1"; local field="$2" [[ -f "$taskFile" ]] || return 1 command -v jq >/dev/null 2>&1 || return 1 jq -r --arg f "$field" '.[$f] // empty' "$taskFile" 2>/dev/null } # ============================================================================ # SETUP / SINGLE INSTANCE # ============================================================================ setupTaskDir() { sudo mkdir -p "$TASK_DIR" if [[ ! -p "$FIFO" ]]; then # Remove any stale non-FIFO file at that path before creating the FIFO. [[ -e "$FIFO" ]] && sudo rm -f "$FIFO" sudo mkfifo "$FIFO" 2>/dev/null fi sudo chmod 666 "$FIFO" 2>/dev/null sudo chmod 755 "$TASK_DIR" 2>/dev/null if [[ -n "$docker_install_user" ]]; then sudo chown -R "$docker_install_user":"$docker_install_user" "$TASK_DIR" 2>/dev/null fi } # Open the FIFO in read-write mode on fd 3. With <>, Linux returns from open() # immediately (process is both reader and writer) so subsequent reads with a # timeout actually time out instead of hanging forever waiting for a writer. openFifoReader() { if [[ -p "$FIFO" ]]; then exec 3<> "$FIFO" || return 1 return 0 fi return 1 } acquireSingletonLock() { exec 200>"$LOCK_FILE" if ! flock -n 200; then logInfo "Another task processor is already running; this instance will exit." exit 0 fi echo $$ | sudo tee "${LOCK_FILE}.pid" >/dev/null 2>&1 } # ============================================================================ # ORPHAN RECOVERY # ============================================================================ # `running` tasks whose heartbeat is older than HEARTBEAT_STALE_SECS, or # tasks that have no heartbeat and a started_at older than the same threshold, # are treated as dead (the processor that owned them is gone). recoverOrphans() { command -v jq >/dev/null 2>&1 || return 0 local now; now=$(date +%s) # Batch the read: one `jq` invocation across every task file emits # "\t" for each running task. The # previous loop spawned 1-2 jq processes per file, which dominated startup # and idle-tick latency once a few dozen task files were on disk. shopt -s nullglob local files=( "$TASK_DIR"/task_*.json ) shopt -u nullglob [[ ${#files[@]} -eq 0 ]] && return 0 while IFS=$'\t' read -r file stamp; do [[ -n "$file" ]] || continue [[ -n "$stamp" ]] || continue local stampEpoch; stampEpoch=$(date -d "$stamp" +%s 2>/dev/null) || continue if (( now - stampEpoch > HEARTBEAT_STALE_SECS )); then local id; id=$(basename "$file" .json) logInfo "Orphan recovery: $id (heartbeat age $((now - stampEpoch))s) -> failed" updateTaskFields "$file" \ status failed \ error_message "Task interrupted — processor died mid-run." \ completed_at "$(date -Iseconds)" \ updated_at "$(date -Iseconds)" fi done < <(jq -r 'select(.status == "running") | "\(input_filename)\t\(.heartbeat_at // .started_at // "")"' "${files[@]}" 2>/dev/null) } # ============================================================================ # TASK EXECUTION # ============================================================================ # A single task runs to completion in this function. The eval'd command lives # in a `setsid bash -c` subshell so an `exit 1` from inside it cannot kill us. runTask() { local taskFile="$1" local taskId; taskId=$(basename "$taskFile" .json) local logFile="$TASK_DIR/${taskId}.log" local cancelMarker="$TASK_DIR/${taskId}.cancel" local command; command=$(readTaskField "$taskFile" command) if [[ -z "$command" || "$command" == "null" ]]; then logError "Task $taskId has no command; marking failed." updateTaskFields "$taskFile" \ status failed \ error_message "Task file has no command field." \ completed_at "$(date -Iseconds)" \ updated_at "$(date -Iseconds)" return 1 fi logInfo "Task $taskId starting: $command" # Any pre-existing cancel marker is stale; remove it. [[ -f "$cancelMarker" ]] && sudo rm -f "$cancelMarker" # Mark running with initial heartbeat. local now; now=$(date -Iseconds) updateTaskFields "$taskFile" \ status running \ started_at "$now" \ heartbeat_at "$now" \ updated_at "$now" # Initialise the log file owned by the daemon's user so the bash redirection # `>>"$logFile"` (which runs as that user, NOT under sudo) can append to it. # Previously this used `sudo truncate` + `sudo chmod 644` which left the file # root-owned and unwritable to the daemon, so the redirection failed and the # task immediately exited with rc=1. local daemonUser; daemonUser=$(id -un) if [[ -f "$logFile" ]]; then sudo chown "$daemonUser":"$daemonUser" "$logFile" 2>/dev/null sudo chmod 664 "$logFile" 2>/dev/null : > "$logFile" 2>/dev/null || sudo truncate -s 0 "$logFile" 2>/dev/null else : > "$logFile" 2>/dev/null || sudo install -o "$daemonUser" -g "$daemonUser" -m 664 /dev/null "$logFile" fi export LIBREPORTAL_NONINTERACTIVE=1 # Run the command in a subshell so: # * `eval` inherits the daemon's full env (PATH, functions, vars). Switching # to `bash -c` broke this because the libreportal CLI lives at # /usr/local/bin/libreportal which a fresh bash subshell can't always find. # * an `exit 1` from inside (e.g. checkSuccess in non-interactive mode) # only terminates the subshell — the daemon keeps running. # `set -m` gives the backgrounded subshell its own process group so we can # kill -TERM the whole tree on cancel via `kill -TERM -$cmdPid`. set -m ( cd "$HOME" 2>/dev/null || cd /; eval "$command" ) >"$logFile" 2>&1 & local cmdPid=$! set +m # Heartbeat + cancel watcher. Runs alongside the command and exits as soon # as the command's pid is gone. # # `set -m` so the watcher subshell becomes its own process group leader — # we need that below to SIGKILL the bash subshell *and* any in-flight # jq / sudo children with a single signal, so a heartbeat update that # started just before the command exited can't write `status=running` # back over our `status=completed` write. # # Inside the loop we poll in 1-second chunks instead of one # `sleep $HEARTBEAT_INTERVAL`. With the long sleep the watcher only # noticed the command had finished after the next 5s tick, which was the # queued -> completed lag the user was seeing. Same trick gives us ~1s # cancel-marker response instead of up to 5s. set -m ( while kill -0 "$cmdPid" 2>/dev/null; do updateTaskFields "$taskFile" heartbeat_at "$(date -Iseconds)" >/dev/null 2>&1 for ((i=0; i/dev/null || exit 0 if [[ -f "$cancelMarker" ]]; then logInfo "Task $taskId cancel requested; sending SIGTERM." kill -TERM "-$cmdPid" 2>/dev/null sleep 5 if kill -0 "$cmdPid" 2>/dev/null; then logInfo "Task $taskId did not exit on TERM; sending SIGKILL." kill -KILL "-$cmdPid" 2>/dev/null fi sudo rm -f "$cancelMarker" exit 0 fi sleep 1 done done ) & local watcherPid=$! set +m # Wait for the command to finish (or to be killed by the watcher). wait "$cmdPid" local rc=$? # Stop the watcher. SIGKILL on the *process group* so any jq/sudo child of # the watcher that's mid-write also dies — otherwise their `sudo mv` can # land after our final status write and stomp it back to running. kill -KILL -- "-$watcherPid" 2>/dev/null wait "$watcherPid" 2>/dev/null local finalStatus="completed" local errorMessage="" if [[ -f "$cancelMarker" ]] || [[ $rc -eq 143 ]] || [[ $rc -eq 137 ]]; then finalStatus="cancelled" errorMessage="Cancelled by user." sudo rm -f "$cancelMarker" 2>/dev/null elif [[ $rc -ne 0 ]]; then finalStatus="failed" errorMessage="Command exited with status $rc." fi updateTaskFields "$taskFile" \ status "$finalStatus" \ completed_at "$(date -Iseconds)" \ updated_at "$(date -Iseconds)" \ exit_code "$rc" \ error_message "$errorMessage" if [[ -n "$docker_install_user" ]]; then sudo chown "$docker_install_user":"$docker_install_user" "$logFile" 2>/dev/null fi case "$finalStatus" in completed) logInfo "Task $taskId completed (exit $rc)";; cancelled) logInfo "Task $taskId cancelled";; failed) logError "Task $taskId failed (exit $rc)";; esac } # ============================================================================ # SCAN & DISPATCH # ============================================================================ # Process every queued task in created-at order. We keep going until the # directory has no more `queued` files; FIFO wake-ups will deliver new ones. dispatchPending() { command -v jq >/dev/null 2>&1 || { logError "jq not available; cannot dispatch."; return 1; } while true; do # Sort by mtime (oldest first) so FIFO order is preserved, then run a # single `jq` over all of them — emits the path of every queued/pending # task, in argument order. We pick the first; runTask consumes it; loop. # This replaces the previous N-jq-per-pass scan that scaled poorly with # how many task files were on disk and was the dominant queued -> # running latency on healthy systems. local listing listing=$(ls -tr "$TASK_DIR"/task_*.json 2>/dev/null) || listing="" [[ -z "$listing" ]] && return 0 local IFS=$'\n' local files=( $listing ) unset IFS local nextFile nextFile=$(jq -r 'select(.status == "queued" or .status == "pending") | input_filename' "${files[@]}" 2>/dev/null | head -n 1) [[ -z "$nextFile" ]] && return 0 runTask "$nextFile" done } # Fast path used by the FIFO wake-up. Skips the whole-dir scan + per-file jq # spawn that dispatchPending does — with many tasks on disk that scan was the # dominant queued -> running latency. dispatchSpecific() { local taskId="$1" [[ -z "$taskId" ]] && return 0 # Guard against junk in the FIFO. [[ "$taskId" =~ ^task_[0-9]+_[A-Za-z0-9]+$ ]] || return 0 local taskFile="$TASK_DIR/${taskId}.json" [[ -f "$taskFile" ]] || return 0 local s; s=$(readTaskField "$taskFile" status) if [[ "$s" == "queued" || "$s" == "pending" ]]; then runTask "$taskFile" fi } # ============================================================================ # HOUSEKEEPING # ============================================================================ cleanupZeroByteFiles() { # Use the bash builtin `-s` (file size > 0) instead of forking `stat` per # file — at 100+ task files the stat fork was a measurable share of the # idle-tick cost. for f in "$TASK_DIR"/task_*; do [[ -f "$f" ]] || continue [[ -s "$f" ]] && continue sudo rm -f "$f" done } # ============================================================================ # MAIN LOOP # ============================================================================ # Block on the FIFO with a timeout. Two paths: # # * FIFO wake-up — Node poked us with the new task id. Skip the full # housekeeping pass (orphan recovery + zero-byte cleanup re-iterate every # task file and shell out to jq for each; with N tasks on disk that's the # dominant queued -> running latency). Dispatch the specific task by id, # then drain any other queued ones, then back to read. # # * Idle timeout — no Node activity. Run the full housekeeping pass; this # is what catches orphaned-running tasks from a previously-killed # processor and tasks that were created while the FIFO was unhealthy. # # With a healthy Node side, queued -> running latency is now bounded by one # `read`, one jq invocation, and the runTask setup — typically well under # 100ms regardless of how many task files are on disk. mainLoop() { logInfo "Task processor v2 entering main loop" # Open the FIFO once and reuse the descriptor. `< "$FIFO"` per-iteration would # block at open() waiting for a writer, defeating the read timeout entirely. local fifoReady=0 if openFifoReader; then fifoReady=1; fi # One full housekeeping pass at startup so anything left behind from a # previous processor gets noticed before we enter the fast path. recoverOrphans dispatchPending cleanupZeroByteFiles local signal rc while true; do signal="" if [[ "$fifoReady" -eq 1 ]]; then # `read -u 3` reads from the fd we opened in <> mode. -t is the timeout. read -t "$IDLE_POLL_SECS" -u 3 signal rc=$? else sleep "$IDLE_POLL_SECS" rc=1 # Try to (re)open the FIFO if it's appeared since startup. [[ -p "$FIFO" ]] && openFifoReader && fifoReady=1 fi if [[ $rc -eq 0 && -n "$signal" ]]; then # Fast path: dispatch the specific task we were poked about, then drain # anything else that piled up while we were running it. dispatchSpecific "$signal" dispatchPending else # Idle timeout (or FIFO unhealthy) — full housekeeping pass. recoverOrphans dispatchPending cleanupZeroByteFiles fi done } # ============================================================================ # ENTRY POINT # ============================================================================ run_task_processor() { logInfo "=== LibrePortal task processor starting ===" setupTaskDir acquireSingletonLock cleanupZeroByteFiles recoverOrphans mainLoop } run_task_processor