diff --git a/p/src/commands/ls.rs b/p/src/commands/ls.rs index d2f99f0..3e881c6 100644 --- a/p/src/commands/ls.rs +++ b/p/src/commands/ls.rs @@ -27,8 +27,10 @@ pub fn execute(all: bool) -> Result<()> { // ── Reconcile running jobs ──────────────────────────────────────────────── // - // Group running jobs by worker, then make one SSH call per worker to - // check all of them at once. Update and persist any that have finished. + // Poll each worker in its own thread so an unreachable worker doesn't + // block the display of jobs on reachable ones. poll_jobs uses a 3-second + // SSH connect timeout, so the whole ls completes in ~3 s worst-case + // regardless of how many workers are down. let mut by_worker: HashMap> = HashMap::new(); for (i, job) in jobs.iter().enumerate() { @@ -37,23 +39,28 @@ pub fn execute(all: bool) -> Result<()> { } } - for (worker_name, indices) in &by_worker { - let Some(worker_cfg) = cfg.get_worker(worker_name) else { - // Worker was deregistered; mark jobs unknown. - for &i in indices { - jobs[i].status = JobStatus::Unknown; - } - continue; - }; + // Stable vec so we can zip with JoinHandles after collecting. + let poll_tasks: Vec<(String, Vec)> = by_worker.into_iter().collect(); - let ids: Vec<&str> = indices.iter().map(|&i| jobs[i].id.as_str()).collect(); + let handles: Vec<_> = poll_tasks + .iter() + .map(|(worker_name, indices)| { + let worker_cfg = cfg.get_worker(worker_name).cloned(); + let ids: Vec = indices.iter().map(|&i| jobs[i].id.clone()).collect(); + std::thread::spawn(move || -> Option>> { + let w = worker_cfg?; + let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect(); + ssh::poll_jobs(&w, &id_refs).ok() + }) + }) + .collect(); - match ssh::poll_jobs(worker_cfg, &ids) { - Ok(results) => { - let now = chrono::Utc::now().timestamp(); + let now = chrono::Utc::now().timestamp(); + for (handle, (_, indices)) in handles.into_iter().zip(poll_tasks.iter()) { + match handle.join().unwrap_or(None) { + Some(results) => { for &i in indices { - let id = jobs[i].id.clone(); - if let Some(Some(ec)) = results.get(&id) { + if let Some(Some(ec)) = results.get(&jobs[i].id) { jobs[i].status = if *ec == 0 { JobStatus::Done } else { @@ -65,8 +72,7 @@ pub fn execute(all: bool) -> Result<()> { } } } - Err(_) => { - // Worker unreachable; show unknown rather than stale "running". + None => { for &i in indices { jobs[i].status = JobStatus::Unknown; } @@ -81,39 +87,36 @@ pub fn execute(all: bool) -> Result<()> { } fn print_table(jobs: &[Job]) { - // Compute column widths from content, with a minimum equal to the header. - let id_w = 8; // always show 8-char short IDs + let id_w = 8; let worker_w = jobs .iter() .map(|j| j.worker.len()) .max() .unwrap_or(0) - .max(6); // "WORKER" + .max(6); let cwd_w = jobs .iter() .map(|j| j.cwd_display().len().min(24)) .max() .unwrap_or(0) - .max(3); // "CWD" + .max(3); let cmd_w = jobs .iter() .map(|j| j.command_display(24).len()) .max() .unwrap_or(0) - .max(7); // "COMMAND" + .max(7); let status_w = jobs .iter() .map(|j| j.status_display().len()) .max() .unwrap_or(0) - .max(6); // "STATUS" + .max(6); - // Header println!( "{: String { if s.len() > max { format!("{}…", &s[..max.saturating_sub(1)]) diff --git a/p/src/commands/worker/register.rs b/p/src/commands/worker/register.rs index 74d7d05..1e7f110 100644 --- a/p/src/commands/worker/register.rs +++ b/p/src/commands/worker/register.rs @@ -37,7 +37,31 @@ pub fn execute(connection: &str, name: Option<&str>) -> Result<()> { println!("Registered '{}' and set as default worker.", name); } else { println!("Registered '{}'.", name); - println!("Run 'p default {}' to make it the default.", name); + println!("Run 'p worker default {}' to make it the default.", name); + } + + // Check that the worker has the tools p needs. + let worker = cfg.get_worker(&name).unwrap(); + print!("Checking worker dependencies... "); + match ssh::check_dependencies(worker) { + Ok(ref missing) if missing.is_empty() => println!("ok"), + Ok(missing) => { + println!(); + println!( + "warning: '{}' is missing required tools: {}", + name, + missing.join(", ") + ); + println!(" install them and re-register, or jobs will fail at launch"); + } + Err(_) => { + println!(); + println!( + "note: could not reach '{}' to check dependencies (worker may be offline)", + name + ); + println!(" run 'p worker ls --check' once it's available"); + } } Ok(()) diff --git a/p/src/ssh.rs b/p/src/ssh.rs index d2d51a8..75fed86 100644 --- a/p/src/ssh.rs +++ b/p/src/ssh.rs @@ -50,6 +50,30 @@ pub fn ssh_args(worker: &WorkerConfig) -> Vec { // ── SSH execution ───────────────────────────────────────────────────────────── +/// Run a command over SSH with a connect timeout and BatchMode (no interactive +/// prompts). Used for non-blocking checks where hanging is unacceptable. +fn run_with_timeout(worker: &WorkerConfig, remote_cmd: &str, timeout_secs: u32) -> Result { + let mut args = vec![ + "-o".to_string(), + format!("ConnectTimeout={}", timeout_secs), + "-o".to_string(), + "BatchMode=yes".to_string(), + ]; + args.extend(ssh_args(worker)); + args.push(remote_cmd.to_string()); + + let out = Command::new("ssh") + .args(&args) + .output() + .context("failed to spawn ssh")?; + + if !out.status.success() { + let err = String::from_utf8_lossy(&out.stderr); + anyhow::bail!("ssh command failed: {}", err.trim()); + } + Ok(String::from_utf8_lossy(&out.stdout).into_owned()) +} + /// Run a command over SSH with an interactive terminal (inherits stdin/stdout/stderr). /// /// Forces `TERM=xterm-256color` on the remote to avoid failures with terminal @@ -132,13 +156,14 @@ pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option { } /// Poll multiple jobs on one worker in a single SSH call. +/// Uses a 3-second connect timeout so `p ls` stays responsive with +/// unreachable workers. /// Returns a map of job_id → exit_code (None = still running). pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result>> { if job_ids.is_empty() { return Ok(HashMap::new()); } - // One SSH call: for each job, emit "UUID:exitcode" or "UUID:" if still running. let script = format!( "for id in {}; do \ ec=$(cat ~/.p/jobs/$id/exitcode 2>/dev/null); \ @@ -147,7 +172,7 @@ pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result Result Result> { + let script = "missing=''; \ + command -v tmux >/dev/null 2>&1 || missing=\"$missing tmux\"; \ + command -v base64 >/dev/null 2>&1 || missing=\"$missing base64\"; \ + printf '%s' \"$missing\""; + + let out = run_with_timeout(worker, script, 5)?; + Ok(out.split_whitespace().map(|s| s.to_string()).collect()) +} + // ── Tests ───────────────────────────────────────────────────────────────────── #[cfg(test)] diff --git a/p/src/sync.rs b/p/src/sync.rs index 3e7af35..61c9b60 100644 --- a/p/src/sync.rs +++ b/p/src/sync.rs @@ -13,7 +13,7 @@ pub fn push_dir(worker: &WorkerConfig, local_dir: &Path, remote_path: &str) -> R let (user_host, port) = parse_connection(&worker.connection); let mut cmd = Command::new("rsync"); - cmd.args(["-az", "--filter=:- .gitignore"]); + cmd.args(["-az", "--info=progress2", "--filter=:- .gitignore"]); if let Some(p) = port { cmd.arg(format!("-e=ssh -p {}", p));