LibrePortal/scripts/crontab/task/crontab_task_processor.sh
librelad 099751b72c fix(rootless): task processor status writes via runFileWrite, not bare redirect
updateTaskFields wrote its temp with a plain 'jq … > "$tmp"' shell redirect,
which runs as the processor's own user (the manager). But TASK_DIR is owned by
the docker install user and the manager can't create files in it, so the
redirect failed and the status write silently no-op'd — every task stayed
'queued', got reprocessed in an endless loop, and follow-on tasks (e.g. the
setup 'finalize' after 'config') never ran. The fix mirrors writeAtomic:
capture jq's output, write the temp through runFileWrite (the privileged
helper), then chmod + atomic mv.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: librelad <librelad@digitalangels.vip>
2026-05-24 14:36:22 +01:00

461 lines
18 KiB
Bash
Executable File

#!/bin/bash
# ============================================================================
# LibrePortal Task Processor — v2
#
# Design goals:
# 1. Single source of truth: the task file (`<id>.json`). `status` is the
# only field anyone reads to know what a task is doing.
# 2. Single instance: enforced via flock. A second process exits cleanly.
# 3. Push-based: the daemon blocks on a FIFO read and wakes within
# milliseconds when Node writes a task id to the FIFO.
# 4. Subprocess isolation: every command runs inside `setsid bash -c …`
# so that a non-zero exit (or signal, or `exit 1` from checkSuccess in
# non-interactive mode) kills the subprocess, not the processor.
# 5. Atomic writes: every state transition is `tmp + mv`. Readers never
# see a half-written file.
# 6. Heartbeat: the processor stamps `heartbeat_at` every 5s while a task
# runs. Stale heartbeats (>60s on a `running` task) are recovered to
# `failed` on the next idle cycle.
# 7. Cancellable: a `<id>.cancel` marker file triggers SIGTERM → SIGKILL
# to the task's process group.
# ============================================================================
script_task_processor_flag="$1"
# Source guard — DO NOT remove. mainLoop is an infinite loop.
[[ "$script_task_processor_flag" != "start_script" ]] && return 0 2>/dev/null
# ============================================================================
# PATHS & CONSTANTS
# ============================================================================
TASK_DIR="${TASK_DIR:-/docker/containers/libreportal/frontend/data/tasks}"
LOCK_FILE="$TASK_DIR/.processor.lock"
FIFO="$TASK_DIR/.queue.fifo"
LOG_FILE="$TASK_DIR/task_processor.log"
# Cron's default PATH is minimal (/usr/bin:/bin) and doesn't include the
# /usr/local/bin where the libreportal CLI lives. Augment it so eval'd commands
# can find their binaries.
export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:$PATH"
HEARTBEAT_INTERVAL=5
HEARTBEAT_STALE_SECS=60
IDLE_POLL_SECS=3 # max time we'll block on FIFO before re-scanning anyway
# ============================================================================
# LOGGING
# ============================================================================
logInfo() { echo "$(date '+%Y-%m-%d %H:%M:%S') [INFO ] $*" | runFileWrite -a "$LOG_FILE"; }
logError() { echo "$(date '+%Y-%m-%d %H:%M:%S') [ERROR] $*" | runFileWrite -a "$LOG_FILE"; }
logDebug() { [[ "${LOG_LEVEL:-INFO}" == "DEBUG" ]] && echo "$(date '+%Y-%m-%d %H:%M:%S') [DEBUG] $*" | runFileWrite -a "$LOG_FILE"; }
# ============================================================================
# FILE OPS — atomic writes, JSON helpers
# ============================================================================
# Atomically replace a file's content. POSIX rename(2) is atomic on the same
# filesystem, so concurrent readers either see the old or the new file —
# never a half-written one.
writeAtomic() {
local target="$1"; local content="$2"
local tmp="${target}.tmp.$$"
printf '%s' "$content" | runFileWrite "$tmp"
runFileOp chmod 644 "$tmp" 2>/dev/null
runFileOp mv "$tmp" "$target"
}
# Update one or more fields on a task file, atomically. Args after taskFile
# are alternating <key> <value> pairs.
# updateTaskFields "$file" status running heartbeat_at "$(date -Iseconds)"
updateTaskFields() {
local taskFile="$1"; shift
[[ -f "$taskFile" ]] || return 1
command -v jq >/dev/null 2>&1 || return 1
local jqArgs=() jqExpr='.'
while [[ $# -ge 2 ]]; do
local key="$1"; local value="$2"; shift 2
jqArgs+=(--arg "$key" "$value")
jqExpr="$jqExpr | .$key = \$$key"
done
# The processor runs as the manager user, which CANNOT create files in the
# docker-install-owned TASK_DIR. A plain `jq … > "$tmp"` redirect therefore
# failed silently, so neither `status: running` nor `status: completed` was
# ever written and the task was reprocessed forever. Capture jq's output and
# write the temp via runFileWrite (the privileged helper, like writeAtomic),
# then chmod + atomic mv as before.
local updated
updated=$(jq "${jqArgs[@]}" "$jqExpr" "$taskFile" 2>/dev/null) || return 1
[[ -z "$updated" ]] && return 1
local tmp="${taskFile}.tmp.$$"
printf '%s' "$updated" | runFileWrite "$tmp" \
&& runFileOp chmod 644 "$tmp" 2>/dev/null \
&& runFileOp mv "$tmp" "$taskFile"
}
readTaskField() {
local taskFile="$1"; local field="$2"
[[ -f "$taskFile" ]] || return 1
command -v jq >/dev/null 2>&1 || return 1
jq -r --arg f "$field" '.[$f] // empty' "$taskFile" 2>/dev/null
}
# ============================================================================
# SETUP / SINGLE INSTANCE
# ============================================================================
setupTaskDir() {
runFileOp mkdir -p "$TASK_DIR"
if [[ ! -p "$FIFO" ]]; then
# Remove any stale non-FIFO file at that path before creating the FIFO.
[[ -e "$FIFO" ]] && runFileOp rm -f "$FIFO"
runFileOp mkfifo "$FIFO" 2>/dev/null
fi
runFileOp chmod 666 "$FIFO" 2>/dev/null
runFileOp chmod 755 "$TASK_DIR" 2>/dev/null
if [[ -n "$docker_install_user" ]]; then
runFileOp chown -R "$docker_install_user":"$docker_install_user" "$TASK_DIR" 2>/dev/null
fi
}
# Open the FIFO in read-write mode on fd 3. With <>, Linux returns from open()
# immediately (process is both reader and writer) so subsequent reads with a
# timeout actually time out instead of hanging forever waiting for a writer.
openFifoReader() {
if [[ -p "$FIFO" ]]; then
exec 3<> "$FIFO" || return 1
return 0
fi
return 1
}
acquireSingletonLock() {
exec 200>"$LOCK_FILE"
if ! flock -n 200; then
logInfo "Another task processor is already running; this instance will exit."
exit 0
fi
echo $$ | runFileWrite "${LOCK_FILE}.pid" 2>/dev/null
}
# ============================================================================
# ORPHAN RECOVERY
# ============================================================================
# `running` tasks whose heartbeat is older than HEARTBEAT_STALE_SECS, or
# tasks that have no heartbeat and a started_at older than the same threshold,
# are treated as dead (the processor that owned them is gone).
recoverOrphans() {
command -v jq >/dev/null 2>&1 || return 0
local now; now=$(date +%s)
# Batch the read: one `jq` invocation across every task file emits
# "<file>\t<heartbeat-or-started-or-empty>" for each running task. The
# previous loop spawned 1-2 jq processes per file, which dominated startup
# and idle-tick latency once a few dozen task files were on disk.
shopt -s nullglob
local files=( "$TASK_DIR"/task_*.json )
shopt -u nullglob
[[ ${#files[@]} -eq 0 ]] && return 0
while IFS=$'\t' read -r file stamp; do
[[ -n "$file" ]] || continue
[[ -n "$stamp" ]] || continue
local stampEpoch; stampEpoch=$(date -d "$stamp" +%s 2>/dev/null) || continue
if (( now - stampEpoch > HEARTBEAT_STALE_SECS )); then
local id; id=$(basename "$file" .json)
logInfo "Orphan recovery: $id (heartbeat age $((now - stampEpoch))s) -> failed"
updateTaskFields "$file" \
status failed \
error_message "Task interrupted — processor died mid-run." \
completed_at "$(date -Iseconds)" \
updated_at "$(date -Iseconds)"
fi
done < <(jq -r 'select(.status == "running") | "\(input_filename)\t\(.heartbeat_at // .started_at // "")"' "${files[@]}" 2>/dev/null)
}
# ============================================================================
# TASK EXECUTION
# ============================================================================
# A single task runs to completion in this function. The eval'd command lives
# in a `setsid bash -c` subshell so an `exit 1` from inside it cannot kill us.
runTask() {
local taskFile="$1"
local taskId; taskId=$(basename "$taskFile" .json)
local logFile="$TASK_DIR/${taskId}.log"
local cancelMarker="$TASK_DIR/${taskId}.cancel"
local command; command=$(readTaskField "$taskFile" command)
if [[ -z "$command" || "$command" == "null" ]]; then
logError "Task $taskId has no command; marking failed."
updateTaskFields "$taskFile" \
status failed \
error_message "Task file has no command field." \
completed_at "$(date -Iseconds)" \
updated_at "$(date -Iseconds)"
return 1
fi
logInfo "Task $taskId starting: $command"
# Any pre-existing cancel marker is stale; remove it.
[[ -f "$cancelMarker" ]] && runFileOp rm -f "$cancelMarker"
# Mark running with initial heartbeat.
local now; now=$(date -Iseconds)
updateTaskFields "$taskFile" \
status running \
started_at "$now" \
heartbeat_at "$now" \
updated_at "$now"
# Initialise the log file owned by the daemon's user so the bash redirection
# `>>"$logFile"` (which runs as that user, NOT under sudo) can append to it.
# Previously this used `sudo truncate` + `sudo chmod 644` which left the file
# root-owned and unwritable to the daemon, so the redirection failed and the
# task immediately exited with rc=1.
local daemonUser; daemonUser=$(id -un)
if [[ -f "$logFile" ]]; then
runFileOp chown "$daemonUser":"$daemonUser" "$logFile" 2>/dev/null
runFileOp chmod 664 "$logFile" 2>/dev/null
: > "$logFile" 2>/dev/null || runFileOp truncate -s 0 "$logFile" 2>/dev/null
else
: > "$logFile" 2>/dev/null || runFileOp install -o "$daemonUser" -g "$daemonUser" -m 664 /dev/null "$logFile"
fi
export LIBREPORTAL_NONINTERACTIVE=1
# Run the command in a subshell so:
# * `eval` inherits the daemon's full env (PATH, functions, vars). Switching
# to `bash -c` broke this because the libreportal CLI lives at
# /usr/local/bin/libreportal which a fresh bash subshell can't always find.
# * an `exit 1` from inside (e.g. checkSuccess in non-interactive mode)
# only terminates the subshell — the daemon keeps running.
# `set -m` gives the backgrounded subshell its own process group so we can
# kill -TERM the whole tree on cancel via `kill -TERM -$cmdPid`.
set -m
( cd "$HOME" 2>/dev/null || cd /; eval "$command" ) </dev/null >>"$logFile" 2>&1 &
local cmdPid=$!
set +m
# Heartbeat + cancel watcher. Runs alongside the command and exits as soon
# as the command's pid is gone.
#
# `set -m` so the watcher subshell becomes its own process group leader —
# we need that below to SIGKILL the bash subshell *and* any in-flight
# jq / sudo children with a single signal, so a heartbeat update that
# started just before the command exited can't write `status=running`
# back over our `status=completed` write.
#
# Inside the loop we poll in 1-second chunks instead of one
# `sleep $HEARTBEAT_INTERVAL`. With the long sleep the watcher only
# noticed the command had finished after the next 5s tick, which was the
# queued -> completed lag the user was seeing. Same trick gives us ~1s
# cancel-marker response instead of up to 5s.
set -m
(
while kill -0 "$cmdPid" 2>/dev/null; do
updateTaskFields "$taskFile" heartbeat_at "$(date -Iseconds)" >/dev/null 2>&1
for ((i=0; i<HEARTBEAT_INTERVAL; i++)); do
kill -0 "$cmdPid" 2>/dev/null || exit 0
if [[ -f "$cancelMarker" ]]; then
logInfo "Task $taskId cancel requested; sending SIGTERM."
kill -TERM "-$cmdPid" 2>/dev/null
sleep 5
if kill -0 "$cmdPid" 2>/dev/null; then
logInfo "Task $taskId did not exit on TERM; sending SIGKILL."
kill -KILL "-$cmdPid" 2>/dev/null
fi
runFileOp rm -f "$cancelMarker"
exit 0
fi
sleep 1
done
done
) &
local watcherPid=$!
set +m
# Wait for the command to finish (or to be killed by the watcher).
wait "$cmdPid"
local rc=$?
# Stop the watcher. SIGKILL on the *process group* so any jq/sudo child of
# the watcher that's mid-write also dies — otherwise their `sudo mv` can
# land after our final status write and stomp it back to running.
kill -KILL -- "-$watcherPid" 2>/dev/null
wait "$watcherPid" 2>/dev/null
local finalStatus="completed"
local errorMessage=""
if [[ -f "$cancelMarker" ]] || [[ $rc -eq 143 ]] || [[ $rc -eq 137 ]]; then
finalStatus="cancelled"
errorMessage="Cancelled by user."
runFileOp rm -f "$cancelMarker" 2>/dev/null
elif [[ $rc -ne 0 ]]; then
finalStatus="failed"
errorMessage="Command exited with status $rc."
fi
updateTaskFields "$taskFile" \
status "$finalStatus" \
completed_at "$(date -Iseconds)" \
updated_at "$(date -Iseconds)" \
exit_code "$rc" \
error_message "$errorMessage"
if [[ -n "$docker_install_user" ]]; then
runFileOp chown "$docker_install_user":"$docker_install_user" "$logFile" 2>/dev/null
fi
case "$finalStatus" in
completed) logInfo "Task $taskId completed (exit $rc)";;
cancelled) logInfo "Task $taskId cancelled";;
failed) logError "Task $taskId failed (exit $rc)";;
esac
}
# ============================================================================
# SCAN & DISPATCH
# ============================================================================
# Process every queued task in created-at order. We keep going until the
# directory has no more `queued` files; FIFO wake-ups will deliver new ones.
dispatchPending() {
command -v jq >/dev/null 2>&1 || { logError "jq not available; cannot dispatch."; return 1; }
while true; do
# Sort by mtime (oldest first) so FIFO order is preserved, then run a
# single `jq` over all of them — emits the path of every queued/pending
# task, in argument order. We pick the first; runTask consumes it; loop.
# This replaces the previous N-jq-per-pass scan that scaled poorly with
# how many task files were on disk and was the dominant queued ->
# running latency on healthy systems.
local listing
listing=$(ls -tr "$TASK_DIR"/task_*.json 2>/dev/null) || listing=""
[[ -z "$listing" ]] && return 0
local IFS=$'\n'
local files=( $listing )
unset IFS
local nextFile
nextFile=$(jq -r 'select(.status == "queued" or .status == "pending") | input_filename' "${files[@]}" 2>/dev/null | head -n 1)
[[ -z "$nextFile" ]] && return 0
runTask "$nextFile"
done
}
# Fast path used by the FIFO wake-up. Skips the whole-dir scan + per-file jq
# spawn that dispatchPending does — with many tasks on disk that scan was the
# dominant queued -> running latency.
dispatchSpecific() {
local taskId="$1"
[[ -z "$taskId" ]] && return 0
# Guard against junk in the FIFO.
[[ "$taskId" =~ ^task_[0-9]+_[A-Za-z0-9]+$ ]] || return 0
local taskFile="$TASK_DIR/${taskId}.json"
[[ -f "$taskFile" ]] || return 0
local s; s=$(readTaskField "$taskFile" status)
if [[ "$s" == "queued" || "$s" == "pending" ]]; then
runTask "$taskFile"
fi
}
# ============================================================================
# HOUSEKEEPING
# ============================================================================
cleanupZeroByteFiles() {
# Use the bash builtin `-s` (file size > 0) instead of forking `stat` per
# file — at 100+ task files the stat fork was a measurable share of the
# idle-tick cost.
for f in "$TASK_DIR"/task_*; do
[[ -f "$f" ]] || continue
[[ -s "$f" ]] && continue
runFileOp rm -f "$f"
done
}
# ============================================================================
# MAIN LOOP
# ============================================================================
# Block on the FIFO with a timeout. Two paths:
#
# * FIFO wake-up — Node poked us with the new task id. Skip the full
# housekeeping pass (orphan recovery + zero-byte cleanup re-iterate every
# task file and shell out to jq for each; with N tasks on disk that's the
# dominant queued -> running latency). Dispatch the specific task by id,
# then drain any other queued ones, then back to read.
#
# * Idle timeout — no Node activity. Run the full housekeeping pass; this
# is what catches orphaned-running tasks from a previously-killed
# processor and tasks that were created while the FIFO was unhealthy.
#
# With a healthy Node side, queued -> running latency is now bounded by one
# `read`, one jq invocation, and the runTask setup — typically well under
# 100ms regardless of how many task files are on disk.
mainLoop() {
logInfo "Task processor v2 entering main loop"
# Open the FIFO once and reuse the descriptor. `< "$FIFO"` per-iteration would
# block at open() waiting for a writer, defeating the read timeout entirely.
local fifoReady=0
if openFifoReader; then fifoReady=1; fi
# One full housekeeping pass at startup so anything left behind from a
# previous processor gets noticed before we enter the fast path.
recoverOrphans
dispatchPending
cleanupZeroByteFiles
local signal rc
while true; do
signal=""
if [[ "$fifoReady" -eq 1 ]]; then
# `read -u 3` reads from the fd we opened in <> mode. -t is the timeout.
read -t "$IDLE_POLL_SECS" -u 3 signal
rc=$?
else
sleep "$IDLE_POLL_SECS"
rc=1
# Try to (re)open the FIFO if it's appeared since startup.
[[ -p "$FIFO" ]] && openFifoReader && fifoReady=1
fi
if [[ $rc -eq 0 && -n "$signal" ]]; then
# Fast path: dispatch the specific task we were poked about, then drain
# anything else that piled up while we were running it.
dispatchSpecific "$signal"
dispatchPending
else
# Idle timeout (or FIFO unhealthy) — full housekeeping pass.
recoverOrphans
dispatchPending
cleanupZeroByteFiles
fi
done
}
# ============================================================================
# ENTRY POINT
# ============================================================================
run_task_processor() {
logInfo "=== LibrePortal task processor starting ==="
setupTaskDir
acquireSingletonLock
cleanupZeroByteFiles
recoverOrphans
mainLoop
}
run_task_processor