From f8b28aa43f1d2aedd509d6cad5d1464009b4fa65 Mon Sep 17 00:00:00 2001 From: Valentin Haudiquet Date: Fri, 29 May 2026 15:47:03 +0200 Subject: [PATCH] feat: record job_finished_at server-side --- p/src/commands/attach.rs | 5 +++-- p/src/commands/ls.rs | 20 +++++++++-------- p/src/commands/run.rs | 6 +++-- p/src/ssh.rs | 47 +++++++++++++++++++++++++++++++--------- 4 files changed, 55 insertions(+), 23 deletions(-) diff --git a/p/src/commands/attach.rs b/p/src/commands/attach.rs index 58510ff..8dc94b5 100644 --- a/p/src/commands/attach.rs +++ b/p/src/commands/attach.rs @@ -64,7 +64,8 @@ pub fn execute(job_id: &str) -> Result<()> { } let exit_code = ssh::read_job_exitcode(&worker, &job.id); - let now = Utc::now().timestamp(); + let finished_at = + ssh::read_job_finished_at(&worker, &job.id).unwrap_or_else(|| Utc::now().timestamp()); if let Some(ec) = exit_code { ssh::run_capture( @@ -80,7 +81,7 @@ pub fn execute(job_id: &str) -> Result<()> { JobStatus::Failed }, exit_code: Some(ec), - finished_at: Some(now), + finished_at: Some(finished_at), ..job })?; diff --git a/p/src/commands/ls.rs b/p/src/commands/ls.rs index 3e881c6..7671a58 100644 --- a/p/src/commands/ls.rs +++ b/p/src/commands/ls.rs @@ -47,11 +47,13 @@ pub fn execute(all: bool) -> Result<()> { .map(|(worker_name, indices)| { let worker_cfg = cfg.get_worker(worker_name).cloned(); let ids: Vec = indices.iter().map(|&i| jobs[i].id.clone()).collect(); - std::thread::spawn(move || -> Option>> { - let w = worker_cfg?; - let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect(); - ssh::poll_jobs(&w, &id_refs).ok() - }) + std::thread::spawn( + move || -> Option>> { + let w = worker_cfg?; + let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect(); + ssh::poll_jobs(&w, &id_refs).ok() + }, + ) }) .collect(); @@ -60,14 +62,14 @@ pub fn execute(all: bool) -> Result<()> { match handle.join().unwrap_or(None) { Some(results) => { for &i in indices { - if let Some(Some(ec)) = results.get(&jobs[i].id) { - jobs[i].status = if *ec == 0 { + if let Some(Some(res)) = results.get(&jobs[i].id) { + jobs[i].status = if res.exit_code == 0 { JobStatus::Done } else { JobStatus::Failed }; - jobs[i].exit_code = Some(*ec); - jobs[i].finished_at = Some(now); + jobs[i].exit_code = Some(res.exit_code); + jobs[i].finished_at = Some(res.finished_at.unwrap_or(now)); db::save(&jobs[i])?; } } diff --git a/p/src/commands/run.rs b/p/src/commands/run.rs index 7614c19..ba98eb8 100644 --- a/p/src/commands/run.rs +++ b/p/src/commands/run.rs @@ -131,7 +131,8 @@ pub fn execute( // ── 5. Reconcile status after returning ─────────────────────────────────── let exit_code = ssh::read_job_exitcode(&worker, &id); - let now = Utc::now().timestamp(); + let finished_at = + ssh::read_job_finished_at(&worker, &id).unwrap_or_else(|| Utc::now().timestamp()); if let Some(ec) = exit_code { // Job is done. The session might still be alive if the user Ctrl+B D'd @@ -149,7 +150,7 @@ pub fn execute( JobStatus::Failed }, exit_code: Some(ec), - finished_at: Some(now), + finished_at: Some(finished_at), ..job })?; @@ -199,6 +200,7 @@ pub(crate) fn build_run_sh(session: &str, job_dir: &str, work_dir: &str, worker: bash -c \"$cmd\" 2>&1 | tee {job_dir}/output.log\n\ EXIT_CODE=${{PIPESTATUS[0]}}\n\ echo \"$EXIT_CODE\" > {job_dir}/exitcode\n\ + date +%s > {job_dir}/finished_at\n\ \n\ # Update status bar to show result\n\ if [ \"$EXIT_CODE\" -eq 0 ]; then\n\ diff --git a/p/src/ssh.rs b/p/src/ssh.rs index 75fed86..e51ffca 100644 --- a/p/src/ssh.rs +++ b/p/src/ssh.rs @@ -155,11 +155,29 @@ pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option { .and_then(|s| s.trim().parse().ok()) } +/// Read the finish timestamp written by run.sh on the worker, if the job has finished. +pub fn read_job_finished_at(worker: &WorkerConfig, job_id: &str) -> Option { + let cmd = format!("cat ~/.p/jobs/{}/finished_at 2>/dev/null", job_id); + run_capture(worker, &cmd) + .ok() + .and_then(|s| s.trim().parse().ok()) +} + +/// Result of polling a single job from the worker. +pub struct JobPollResult { + pub exit_code: i32, + /// Unix timestamp recorded server-side when the job finished. + pub finished_at: Option, +} + /// Poll multiple jobs on one worker in a single SSH call. /// Uses a 3-second connect timeout so `p ls` stays responsive with /// unreachable workers. -/// Returns a map of job_id → exit_code (None = still running). -pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result>> { +/// Returns a map of job_id → JobPollResult (None = still running). +pub fn poll_jobs( + worker: &WorkerConfig, + job_ids: &[&str], +) -> Result>> { if job_ids.is_empty() { return Ok(HashMap::new()); } @@ -167,7 +185,8 @@ pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result/dev/null); \ - echo \"$id:$ec\"; \ + fa=$(cat ~/.p/jobs/$id/finished_at 2>/dev/null); \ + echo \"$id:$ec:$fa\"; \ done", job_ids.join(" ") ); @@ -175,14 +194,22 @@ pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result = line.splitn(3, ':').collect(); + if parts.len() < 2 { + continue; } + let id = parts[0]; + let ec_str = parts[1]; + let fa_str = if parts.len() == 3 { parts[2] } else { "" }; + let result = if ec_str.is_empty() { + None + } else { + ec_str.trim().parse::().ok().map(|ec| JobPollResult { + exit_code: ec, + finished_at: fa_str.trim().parse().ok(), + }) + }; + map.insert(id.to_string(), result); } Ok(map) }