#!/bin/bash # ============================================================================ # LibrePortal Task Processor — v2 # # Design goals: # 1. Single source of truth: the task file (`.json`). `status` is the # only field anyone reads to know what a task is doing. # 2. Single instance: enforced via flock. A second process exits cleanly. # 3. Push-based: the daemon blocks on a FIFO read and wakes within # milliseconds when Node writes a task id to the FIFO. # 4. Subprocess isolation: every command runs inside `setsid bash -c …` # so that a non-zero exit (or signal, or `exit 1` from checkSuccess in # non-interactive mode) kills the subprocess, not the processor. # 5. Atomic writes: every state transition is `tmp + mv`. Readers never # see a half-written file. # 6. Heartbeat: the processor stamps `heartbeat_at` every 5s while a task # runs. Stale heartbeats (>60s on a `running` task) are recovered to # `failed` on the next idle cycle. # 7. Cancellable: a `.cancel` marker file triggers SIGTERM → SIGKILL # to the task's process group. # ============================================================================ script_task_processor_flag="$1" # Source guard — DO NOT remove. mainLoop is an infinite loop. [[ "$script_task_processor_flag" != "start_script" ]] && return 0 2>/dev/null # --- Load the privilege helpers + docker-type config ------------------------ # systemd launches this script standalone, so the de-sudo helpers # (runFileOp/runFileWrite) and the config they key off (rooted vs rootless) are # NOT otherwise in scope. Without them every privileged write into the # docker-install-owned task dir fails ("command not found") and tasks loop # forever. Load them here — these files are pure function/var defs, safe to # source, no side effects. # Self-locate the scripts dir (where systemd launched us) so we can load the # relocatable path roots — paths.sh sets docker_dir/containers_dir/configs_dir/… # (the data + backup roots can't be derived from our own location). LP_SELF_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" 2>/dev/null && pwd)" LP_SCRIPTS="${install_scripts_dir:-$(cd "$LP_SELF_DIR/../.." 2>/dev/null && pwd)/}" [[ -f "${LP_SCRIPTS}source/paths.sh" ]] && source "${LP_SCRIPTS}source/paths.sh" LP_SCRIPTS="${install_scripts_dir:-$LP_SCRIPTS}" LP_DOCKER_CFG="${configs_dir:-/libreportal-system/configs/}general/general_docker_install" [[ -f "$LP_DOCKER_CFG" ]] && \ eval "$(grep -E '^CFG_DOCKER_INSTALL_(TYPE|USER)=' "$LP_DOCKER_CFG" | sed 's/[[:space:]]*#.*//')" : "${sudo_user_name:=libreportal}" : "${containers_dir:=/libreportal-containers/}" : "${docker_dir:=/libreportal-system}" for _lp_f in docker/command/run_privileged.sh \ docker/command/docker_run_install.sh \ checks/requirements/check_install_type.sh; do [[ -f "${LP_SCRIPTS}${_lp_f}" ]] && source "${LP_SCRIPTS}${_lp_f}" done command -v resolveDockerInstallUser >/dev/null 2>&1 && resolveDockerInstallUser # ============================================================================ # PATHS & CONSTANTS # ============================================================================ TASK_DIR="${TASK_DIR:-${containers_dir:-/libreportal-containers/}libreportal/frontend/data/tasks}" LOCK_FILE="$TASK_DIR/.processor.lock" FIFO="$TASK_DIR/.queue.fifo" LOG_FILE="$TASK_DIR/task_processor.log" # Cron's default PATH is minimal (/usr/bin:/bin) and doesn't include the # /usr/local/bin where the libreportal CLI lives. Augment it so eval'd commands # can find their binaries. export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:$PATH" HEARTBEAT_INTERVAL=5 HEARTBEAT_STALE_SECS=60 IDLE_POLL_SECS=3 # max time we'll block on FIFO before re-scanning anyway # ============================================================================ # LOGGING # ============================================================================ logInfo() { echo "$(date '+%Y-%m-%d %H:%M:%S') [INFO ] $*" | runFileWrite -a "$LOG_FILE"; } logError() { echo "$(date '+%Y-%m-%d %H:%M:%S') [ERROR] $*" | runFileWrite -a "$LOG_FILE"; } logDebug() { [[ "${LOG_LEVEL:-INFO}" == "DEBUG" ]] && echo "$(date '+%Y-%m-%d %H:%M:%S') [DEBUG] $*" | runFileWrite -a "$LOG_FILE"; } # ============================================================================ # FILE OPS — atomic writes, JSON helpers # ============================================================================ # Atomically replace a file's content. POSIX rename(2) is atomic on the same # filesystem, so concurrent readers either see the old or the new file — # never a half-written one. writeAtomic() { local target="$1"; local content="$2" local tmp="${target}.tmp.$$" printf '%s' "$content" | runFileWrite "$tmp" runFileOp chmod 644 "$tmp" 2>/dev/null runFileOp mv "$tmp" "$target" } # Update one or more fields on a task file, atomically. Args after taskFile # are alternating pairs. # updateTaskFields "$file" status running heartbeat_at "$(date -Iseconds)" updateTaskFields() { local taskFile="$1"; shift [[ -f "$taskFile" ]] || return 1 command -v jq >/dev/null 2>&1 || return 1 local jqArgs=() jqExpr='.' while [[ $# -ge 2 ]]; do local key="$1"; local value="$2"; shift 2 jqArgs+=(--arg "$key" "$value") jqExpr="$jqExpr | .$key = \$$key" done # The processor runs as the manager user, which CANNOT create files in the # docker-install-owned TASK_DIR. A plain `jq … > "$tmp"` redirect therefore # failed silently, so neither `status: running` nor `status: completed` was # ever written and the task was reprocessed forever. Capture jq's output and # write the temp via runFileWrite (the privileged helper, like writeAtomic), # then chmod + atomic mv as before. local updated updated=$(jq "${jqArgs[@]}" "$jqExpr" "$taskFile" 2>/dev/null) || return 1 [[ -z "$updated" ]] && return 1 local tmp="${taskFile}.tmp.$$" printf '%s' "$updated" | runFileWrite "$tmp" \ && runFileOp chmod 644 "$tmp" 2>/dev/null \ && runFileOp mv "$tmp" "$taskFile" } readTaskField() { local taskFile="$1"; local field="$2" [[ -f "$taskFile" ]] || return 1 command -v jq >/dev/null 2>&1 || return 1 jq -r --arg f "$field" '.[$f] // empty' "$taskFile" 2>/dev/null } # ============================================================================ # SETUP / SINGLE INSTANCE # ============================================================================ setupTaskDir() { runFileOp mkdir -p "$TASK_DIR" if [[ ! -p "$FIFO" ]]; then # Remove any stale non-FIFO file at that path before creating the FIFO. [[ -e "$FIFO" ]] && runFileOp rm -f "$FIFO" runFileOp mkfifo "$FIFO" 2>/dev/null fi runFileOp chmod 666 "$FIFO" 2>/dev/null runFileOp chmod 755 "$TASK_DIR" 2>/dev/null # The processor (manager user) can't create files in the docker-install-owned # task dir, so pre-create the lock AS the dir owner, world-writable, so the # `exec 200>"$LOCK_FILE"` in acquireSingletonLock (run as the manager) can open # it. Create-if-absent to keep a stable inode for flock across restarts. [[ -e "$LOCK_FILE" ]] || runFileOp install -m 666 /dev/null "$LOCK_FILE" 2>/dev/null runFileOp chmod 666 "$LOCK_FILE" 2>/dev/null # Establish ownership via the root-owned helper: the unprivileged dir owner # can't reclaim files an earlier run left root/manager-owned (e.g. a root-owned # task_processor.log), which would then block the daemon's log appends. runOwnership taskdir 2>/dev/null } # Open the FIFO in read-write mode on fd 3. With <>, Linux returns from open() # immediately (process is both reader and writer) so subsequent reads with a # timeout actually time out instead of hanging forever waiting for a writer. openFifoReader() { if [[ -p "$FIFO" ]]; then exec 3<> "$FIFO" || return 1 return 0 fi return 1 } acquireSingletonLock() { exec 200>"$LOCK_FILE" if ! flock -n 200; then logInfo "Another task processor is already running; this instance will exit." exit 0 fi echo $$ | runFileWrite "${LOCK_FILE}.pid" 2>/dev/null } # ============================================================================ # ORPHAN RECOVERY # ============================================================================ # `running` tasks whose heartbeat is older than HEARTBEAT_STALE_SECS, or # tasks that have no heartbeat and a started_at older than the same threshold, # are treated as dead (the processor that owned them is gone). recoverOrphans() { command -v jq >/dev/null 2>&1 || return 0 local now; now=$(date +%s) # Batch the read: one `jq` invocation across every task file emits # "\t" for each running task. The # previous loop spawned 1-2 jq processes per file, which dominated startup # and idle-tick latency once a few dozen task files were on disk. shopt -s nullglob local files=( "$TASK_DIR"/task_*.json ) shopt -u nullglob [[ ${#files[@]} -eq 0 ]] && return 0 while IFS=$'\t' read -r file stamp; do [[ -n "$file" ]] || continue [[ -n "$stamp" ]] || continue local stampEpoch; stampEpoch=$(date -d "$stamp" +%s 2>/dev/null) || continue if (( now - stampEpoch > HEARTBEAT_STALE_SECS )); then local id; id=$(basename "$file" .json) logInfo "Orphan recovery: $id (heartbeat age $((now - stampEpoch))s) -> failed" updateTaskFields "$file" \ status failed \ error_message "Task interrupted — processor died mid-run." \ completed_at "$(date -Iseconds)" \ updated_at "$(date -Iseconds)" fi done < <(jq -r 'select(.status == "running") | "\(input_filename)\t\(.heartbeat_at // .started_at // "")"' "${files[@]}" 2>/dev/null) } # ============================================================================ # TASK EXECUTION # ============================================================================ # A single task runs to completion in this function. The eval'd command lives # in a `setsid bash -c` subshell so an `exit 1` from inside it cannot kill us. runTask() { local taskFile="$1" local taskId; taskId=$(basename "$taskFile" .json) local logFile="$TASK_DIR/${taskId}.log" local cancelMarker="$TASK_DIR/${taskId}.cancel" local command; command=$(readTaskField "$taskFile" command) if [[ -z "$command" || "$command" == "null" ]]; then logError "Task $taskId has no command; marking failed." updateTaskFields "$taskFile" \ status failed \ error_message "Task file has no command field." \ completed_at "$(date -Iseconds)" \ updated_at "$(date -Iseconds)" return 1 fi logInfo "Task $taskId starting: $command" # Any pre-existing cancel marker is stale; remove it. [[ -f "$cancelMarker" ]] && runFileOp rm -f "$cancelMarker" # Mark running with initial heartbeat. local now; now=$(date -Iseconds) updateTaskFields "$taskFile" \ status running \ started_at "$now" \ heartbeat_at "$now" \ updated_at "$now" # Initialise the log file owned by the daemon's user so the bash redirection # `>>"$logFile"` (which runs as that user, NOT under sudo) can append to it. # Previously this used `sudo truncate` + `sudo chmod 644` which left the file # root-owned and unwritable to the daemon, so the redirection failed and the # task immediately exited with rc=1. # TASK_DIR is owned by the docker install user, so the manager-user processor # can't create the log there directly. Create/truncate it AS the dir owner via # runFileOp and leave it world-writable for the run so the eval's # `>>"$logFile"` append (which runs as the manager, NOT under sudo) succeeds. # Re-owned to the dir owner when the task finishes. runFileOp install -m 666 /dev/null "$logFile" 2>/dev/null \ || runFileOp truncate -s 0 "$logFile" 2>/dev/null runFileOp chmod 666 "$logFile" 2>/dev/null export LIBREPORTAL_NONINTERACTIVE=1 # Run the command in a subshell so: # * `eval` inherits the daemon's full env (PATH, functions, vars). Switching # to `bash -c` broke this because the libreportal CLI lives at # /usr/local/bin/libreportal which a fresh bash subshell can't always find. # * an `exit 1` from inside (e.g. checkSuccess in non-interactive mode) # only terminates the subshell — the daemon keeps running. # `set -m` gives the backgrounded subshell its own process group so we can # kill -TERM the whole tree on cancel via `kill -TERM -$cmdPid`. set -m ( cd "$HOME" 2>/dev/null || cd /; eval "$command" ) >"$logFile" 2>&1 & local cmdPid=$! set +m # Heartbeat + cancel watcher. Runs alongside the command and exits as soon # as the command's pid is gone. # # `set -m` so the watcher subshell becomes its own process group leader — # we need that below to SIGKILL the bash subshell *and* any in-flight # jq / sudo children with a single signal, so a heartbeat update that # started just before the command exited can't write `status=running` # back over our `status=completed` write. # # Inside the loop we poll in 1-second chunks instead of one # `sleep $HEARTBEAT_INTERVAL`. With the long sleep the watcher only # noticed the command had finished after the next 5s tick, which was the # queued -> completed lag the user was seeing. Same trick gives us ~1s # cancel-marker response instead of up to 5s. set -m ( while kill -0 "$cmdPid" 2>/dev/null; do updateTaskFields "$taskFile" heartbeat_at "$(date -Iseconds)" >/dev/null 2>&1 for ((i=0; i/dev/null || exit 0 if [[ -f "$cancelMarker" ]]; then logInfo "Task $taskId cancel requested; sending SIGTERM." kill -TERM "-$cmdPid" 2>/dev/null sleep 5 if kill -0 "$cmdPid" 2>/dev/null; then logInfo "Task $taskId did not exit on TERM; sending SIGKILL." kill -KILL "-$cmdPid" 2>/dev/null fi runFileOp rm -f "$cancelMarker" exit 0 fi sleep 1 done done ) & local watcherPid=$! set +m # Wait for the command to finish (or to be killed by the watcher). wait "$cmdPid" local rc=$? # Stop the watcher. SIGKILL on the *process group* so any jq/sudo child of # the watcher that's mid-write also dies — otherwise their `sudo mv` can # land after our final status write and stomp it back to running. kill -KILL -- "-$watcherPid" 2>/dev/null wait "$watcherPid" 2>/dev/null local finalStatus="completed" local errorMessage="" if [[ -f "$cancelMarker" ]] || [[ $rc -eq 143 ]] || [[ $rc -eq 137 ]]; then finalStatus="cancelled" errorMessage="Cancelled by user." runFileOp rm -f "$cancelMarker" 2>/dev/null elif [[ $rc -ne 0 ]]; then finalStatus="failed" errorMessage="Command exited with status $rc." fi updateTaskFields "$taskFile" \ status "$finalStatus" \ completed_at "$(date -Iseconds)" \ updated_at "$(date -Iseconds)" \ exit_code "$rc" \ error_message "$errorMessage" if [[ -n "$docker_install_user" ]]; then runFileOp chown "$docker_install_user":"$docker_install_user" "$logFile" 2>/dev/null fi case "$finalStatus" in completed) logInfo "Task $taskId completed (exit $rc)";; cancelled) logInfo "Task $taskId cancelled";; failed) logError "Task $taskId failed (exit $rc)";; esac } # ============================================================================ # SCAN & DISPATCH # ============================================================================ # Process every queued task in created-at order. We keep going until the # directory has no more `queued` files; FIFO wake-ups will deliver new ones. dispatchPending() { command -v jq >/dev/null 2>&1 || { logError "jq not available; cannot dispatch."; return 1; } while true; do # Sort by mtime (oldest first) so FIFO order is preserved, then run a # single `jq` over all of them — emits the path of every queued/pending # task, in argument order. We pick the first; runTask consumes it; loop. # This replaces the previous N-jq-per-pass scan that scaled poorly with # how many task files were on disk and was the dominant queued -> # running latency on healthy systems. local listing listing=$(ls -tr "$TASK_DIR"/task_*.json 2>/dev/null) || listing="" [[ -z "$listing" ]] && return 0 local IFS=$'\n' local files=( $listing ) unset IFS local nextFile nextFile=$(jq -r 'select(.status == "queued" or .status == "pending") | input_filename' "${files[@]}" 2>/dev/null | head -n 1) [[ -z "$nextFile" ]] && return 0 runTask "$nextFile" done } # Fast path used by the FIFO wake-up. Skips the whole-dir scan + per-file jq # spawn that dispatchPending does — with many tasks on disk that scan was the # dominant queued -> running latency. dispatchSpecific() { local taskId="$1" [[ -z "$taskId" ]] && return 0 # Guard against junk in the FIFO. [[ "$taskId" =~ ^task_[0-9]+_[A-Za-z0-9]+$ ]] || return 0 local taskFile="$TASK_DIR/${taskId}.json" [[ -f "$taskFile" ]] || return 0 local s; s=$(readTaskField "$taskFile" status) if [[ "$s" == "queued" || "$s" == "pending" ]]; then runTask "$taskFile" fi } # ============================================================================ # HOUSEKEEPING # ============================================================================ cleanupZeroByteFiles() { # Use the bash builtin `-s` (file size > 0) instead of forking `stat` per # file — at 100+ task files the stat fork was a measurable share of the # idle-tick cost. for f in "$TASK_DIR"/task_*; do [[ -f "$f" ]] || continue [[ -s "$f" ]] && continue runFileOp rm -f "$f" done } # ============================================================================ # MAIN LOOP # ============================================================================ # Block on the FIFO with a timeout. Two paths: # # * FIFO wake-up — Node poked us with the new task id. Skip the full # housekeeping pass (orphan recovery + zero-byte cleanup re-iterate every # task file and shell out to jq for each; with N tasks on disk that's the # dominant queued -> running latency). Dispatch the specific task by id, # then drain any other queued ones, then back to read. # # * Idle timeout — no Node activity. Run the full housekeeping pass; this # is what catches orphaned-running tasks from a previously-killed # processor and tasks that were created while the FIFO was unhealthy. # # With a healthy Node side, queued -> running latency is now bounded by one # `read`, one jq invocation, and the runTask setup — typically well under # 100ms regardless of how many task files are on disk. mainLoop() { logInfo "Task processor v2 entering main loop" # Open the FIFO once and reuse the descriptor. `< "$FIFO"` per-iteration would # block at open() waiting for a writer, defeating the read timeout entirely. local fifoReady=0 if openFifoReader; then fifoReady=1; fi # One full housekeeping pass at startup so anything left behind from a # previous processor gets noticed before we enter the fast path. recoverOrphans dispatchPending cleanupZeroByteFiles local signal rc while true; do signal="" if [[ "$fifoReady" -eq 1 ]]; then # `read -u 3` reads from the fd we opened in <> mode. -t is the timeout. read -t "$IDLE_POLL_SECS" -u 3 signal rc=$? else sleep "$IDLE_POLL_SECS" rc=1 # Try to (re)open the FIFO if it's appeared since startup. [[ -p "$FIFO" ]] && openFifoReader && fifoReady=1 fi if [[ $rc -eq 0 && -n "$signal" ]]; then # Fast path: dispatch the specific task we were poked about, then drain # anything else that piled up while we were running it. dispatchSpecific "$signal" dispatchPending else # Idle timeout (or FIFO unhealthy) — full housekeeping pass. recoverOrphans dispatchPending cleanupZeroByteFiles fi done } # ============================================================================ # ENTRY POINT # ============================================================================ run_task_processor() { logInfo "=== LibrePortal task processor starting ===" setupTaskDir acquireSingletonLock cleanupZeroByteFiles recoverOrphans mainLoop } run_task_processor