feat: parallel ls polling with timeout, rsync progress, system dependency check on worker register
All checks were successful
CI / Check, test, lint (push) Successful in 29s

This commit is contained in:
2026-05-20 18:06:01 +02:00
parent b27e92b6f7
commit a8730ca0f5
4 changed files with 95 additions and 31 deletions

View File

@@ -27,8 +27,10 @@ pub fn execute(all: bool) -> Result<()> {
// ── Reconcile running jobs ──────────────────────────────────────────────── // ── Reconcile running jobs ────────────────────────────────────────────────
// //
// Group running jobs by worker, then make one SSH call per worker to // Poll each worker in its own thread so an unreachable worker doesn't
// check all of them at once. Update and persist any that have finished. // block the display of jobs on reachable ones. poll_jobs uses a 3-second
// SSH connect timeout, so the whole ls completes in ~3 s worst-case
// regardless of how many workers are down.
let mut by_worker: HashMap<String, Vec<usize>> = HashMap::new(); let mut by_worker: HashMap<String, Vec<usize>> = HashMap::new();
for (i, job) in jobs.iter().enumerate() { for (i, job) in jobs.iter().enumerate() {
@@ -37,23 +39,28 @@ pub fn execute(all: bool) -> Result<()> {
} }
} }
for (worker_name, indices) in &by_worker { // Stable vec so we can zip with JoinHandles after collecting.
let Some(worker_cfg) = cfg.get_worker(worker_name) else { let poll_tasks: Vec<(String, Vec<usize>)> = by_worker.into_iter().collect();
// Worker was deregistered; mark jobs unknown.
for &i in indices {
jobs[i].status = JobStatus::Unknown;
}
continue;
};
let ids: Vec<&str> = indices.iter().map(|&i| jobs[i].id.as_str()).collect(); let handles: Vec<_> = poll_tasks
.iter()
.map(|(worker_name, indices)| {
let worker_cfg = cfg.get_worker(worker_name).cloned();
let ids: Vec<String> = indices.iter().map(|&i| jobs[i].id.clone()).collect();
std::thread::spawn(move || -> Option<HashMap<String, Option<i32>>> {
let w = worker_cfg?;
let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect();
ssh::poll_jobs(&w, &id_refs).ok()
})
})
.collect();
match ssh::poll_jobs(worker_cfg, &ids) { let now = chrono::Utc::now().timestamp();
Ok(results) => { for (handle, (_, indices)) in handles.into_iter().zip(poll_tasks.iter()) {
let now = chrono::Utc::now().timestamp(); match handle.join().unwrap_or(None) {
Some(results) => {
for &i in indices { for &i in indices {
let id = jobs[i].id.clone(); if let Some(Some(ec)) = results.get(&jobs[i].id) {
if let Some(Some(ec)) = results.get(&id) {
jobs[i].status = if *ec == 0 { jobs[i].status = if *ec == 0 {
JobStatus::Done JobStatus::Done
} else { } else {
@@ -65,8 +72,7 @@ pub fn execute(all: bool) -> Result<()> {
} }
} }
} }
Err(_) => { None => {
// Worker unreachable; show unknown rather than stale "running".
for &i in indices { for &i in indices {
jobs[i].status = JobStatus::Unknown; jobs[i].status = JobStatus::Unknown;
} }
@@ -81,39 +87,36 @@ pub fn execute(all: bool) -> Result<()> {
} }
fn print_table(jobs: &[Job]) { fn print_table(jobs: &[Job]) {
// Compute column widths from content, with a minimum equal to the header. let id_w = 8;
let id_w = 8; // always show 8-char short IDs
let worker_w = jobs let worker_w = jobs
.iter() .iter()
.map(|j| j.worker.len()) .map(|j| j.worker.len())
.max() .max()
.unwrap_or(0) .unwrap_or(0)
.max(6); // "WORKER" .max(6);
let cwd_w = jobs let cwd_w = jobs
.iter() .iter()
.map(|j| j.cwd_display().len().min(24)) .map(|j| j.cwd_display().len().min(24))
.max() .max()
.unwrap_or(0) .unwrap_or(0)
.max(3); // "CWD" .max(3);
let cmd_w = jobs let cmd_w = jobs
.iter() .iter()
.map(|j| j.command_display(24).len()) .map(|j| j.command_display(24).len())
.max() .max()
.unwrap_or(0) .unwrap_or(0)
.max(7); // "COMMAND" .max(7);
let status_w = jobs let status_w = jobs
.iter() .iter()
.map(|j| j.status_display().len()) .map(|j| j.status_display().len())
.max() .max()
.unwrap_or(0) .unwrap_or(0)
.max(6); // "STATUS" .max(6);
// Header
println!( println!(
"{:<id_w$} {:<worker_w$} {:<cwd_w$} {:<cmd_w$} {:<status_w$} DURATION", "{:<id_w$} {:<worker_w$} {:<cwd_w$} {:<cmd_w$} {:<status_w$} DURATION",
"ID", "WORKER", "CWD", "COMMAND", "STATUS", "ID", "WORKER", "CWD", "COMMAND", "STATUS",
); );
// Separator
println!( println!(
"{:<id_w$} {:<worker_w$} {:<cwd_w$} {:<cmd_w$} {:<status_w$} --------", "{:<id_w$} {:<worker_w$} {:<cwd_w$} {:<cmd_w$} {:<status_w$} --------",
"-".repeat(id_w), "-".repeat(id_w),
@@ -138,7 +141,6 @@ fn print_table(jobs: &[Job]) {
} }
} }
/// Truncate a string to `max` display characters, appending `…` if needed.
fn truncate(s: &str, max: usize) -> String { fn truncate(s: &str, max: usize) -> String {
if s.len() > max { if s.len() > max {
format!("{}", &s[..max.saturating_sub(1)]) format!("{}", &s[..max.saturating_sub(1)])

View File

@@ -37,7 +37,31 @@ pub fn execute(connection: &str, name: Option<&str>) -> Result<()> {
println!("Registered '{}' and set as default worker.", name); println!("Registered '{}' and set as default worker.", name);
} else { } else {
println!("Registered '{}'.", name); println!("Registered '{}'.", name);
println!("Run 'p default {}' to make it the default.", name); println!("Run 'p worker default {}' to make it the default.", name);
}
// Check that the worker has the tools p needs.
let worker = cfg.get_worker(&name).unwrap();
print!("Checking worker dependencies... ");
match ssh::check_dependencies(worker) {
Ok(ref missing) if missing.is_empty() => println!("ok"),
Ok(missing) => {
println!();
println!(
"warning: '{}' is missing required tools: {}",
name,
missing.join(", ")
);
println!(" install them and re-register, or jobs will fail at launch");
}
Err(_) => {
println!();
println!(
"note: could not reach '{}' to check dependencies (worker may be offline)",
name
);
println!(" run 'p worker ls --check' once it's available");
}
} }
Ok(()) Ok(())

View File

@@ -50,6 +50,30 @@ pub fn ssh_args(worker: &WorkerConfig) -> Vec<String> {
// ── SSH execution ───────────────────────────────────────────────────────────── // ── SSH execution ─────────────────────────────────────────────────────────────
/// Run a command over SSH with a connect timeout and BatchMode (no interactive
/// prompts). Used for non-blocking checks where hanging is unacceptable.
fn run_with_timeout(worker: &WorkerConfig, remote_cmd: &str, timeout_secs: u32) -> Result<String> {
let mut args = vec![
"-o".to_string(),
format!("ConnectTimeout={}", timeout_secs),
"-o".to_string(),
"BatchMode=yes".to_string(),
];
args.extend(ssh_args(worker));
args.push(remote_cmd.to_string());
let out = Command::new("ssh")
.args(&args)
.output()
.context("failed to spawn ssh")?;
if !out.status.success() {
let err = String::from_utf8_lossy(&out.stderr);
anyhow::bail!("ssh command failed: {}", err.trim());
}
Ok(String::from_utf8_lossy(&out.stdout).into_owned())
}
/// Run a command over SSH with an interactive terminal (inherits stdin/stdout/stderr). /// Run a command over SSH with an interactive terminal (inherits stdin/stdout/stderr).
/// ///
/// Forces `TERM=xterm-256color` on the remote to avoid failures with terminal /// Forces `TERM=xterm-256color` on the remote to avoid failures with terminal
@@ -132,13 +156,14 @@ pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
} }
/// Poll multiple jobs on one worker in a single SSH call. /// Poll multiple jobs on one worker in a single SSH call.
/// Uses a 3-second connect timeout so `p ls` stays responsive with
/// unreachable workers.
/// Returns a map of job_id → exit_code (None = still running). /// Returns a map of job_id → exit_code (None = still running).
pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result<HashMap<String, Option<i32>>> { pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result<HashMap<String, Option<i32>>> {
if job_ids.is_empty() { if job_ids.is_empty() {
return Ok(HashMap::new()); return Ok(HashMap::new());
} }
// One SSH call: for each job, emit "UUID:exitcode" or "UUID:" if still running.
let script = format!( let script = format!(
"for id in {}; do \ "for id in {}; do \
ec=$(cat ~/.p/jobs/$id/exitcode 2>/dev/null); \ ec=$(cat ~/.p/jobs/$id/exitcode 2>/dev/null); \
@@ -147,7 +172,7 @@ pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result<HashMap<Stri
job_ids.join(" ") job_ids.join(" ")
); );
let output = run_capture(worker, &script)?; let output = run_with_timeout(worker, &script, 3)?;
let mut map = HashMap::new(); let mut map = HashMap::new();
for line in output.lines() { for line in output.lines() {
if let Some((id, ec)) = line.split_once(':') { if let Some((id, ec)) = line.split_once(':') {
@@ -162,6 +187,19 @@ pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result<HashMap<Stri
Ok(map) Ok(map)
} }
/// Check that the required worker-side tools (tmux, base64) are installed.
/// Returns the names of any missing tools. Uses a 5-second timeout so
/// `p worker register` doesn't hang on offline workers.
pub fn check_dependencies(worker: &WorkerConfig) -> Result<Vec<String>> {
let script = "missing=''; \
command -v tmux >/dev/null 2>&1 || missing=\"$missing tmux\"; \
command -v base64 >/dev/null 2>&1 || missing=\"$missing base64\"; \
printf '%s' \"$missing\"";
let out = run_with_timeout(worker, script, 5)?;
Ok(out.split_whitespace().map(|s| s.to_string()).collect())
}
// ── Tests ───────────────────────────────────────────────────────────────────── // ── Tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)] #[cfg(test)]

View File

@@ -13,7 +13,7 @@ pub fn push_dir(worker: &WorkerConfig, local_dir: &Path, remote_path: &str) -> R
let (user_host, port) = parse_connection(&worker.connection); let (user_host, port) = parse_connection(&worker.connection);
let mut cmd = Command::new("rsync"); let mut cmd = Command::new("rsync");
cmd.args(["-az", "--filter=:- .gitignore"]); cmd.args(["-az", "--info=progress2", "--filter=:- .gitignore"]);
if let Some(p) = port { if let Some(p) = port {
cmd.arg(format!("-e=ssh -p {}", p)); cmd.arg(format!("-e=ssh -p {}", p));