feat: record job_finished_at server-side

This commit is contained in:
2026-05-29 15:47:03 +02:00
parent 0fbd3f9952
commit f8b28aa43f
4 changed files with 55 additions and 23 deletions
+3 -2
View File
@@ -64,7 +64,8 @@ pub fn execute(job_id: &str) -> Result<()> {
} }
let exit_code = ssh::read_job_exitcode(&worker, &job.id); let exit_code = ssh::read_job_exitcode(&worker, &job.id);
let now = Utc::now().timestamp(); let finished_at =
ssh::read_job_finished_at(&worker, &job.id).unwrap_or_else(|| Utc::now().timestamp());
if let Some(ec) = exit_code { if let Some(ec) = exit_code {
ssh::run_capture( ssh::run_capture(
@@ -80,7 +81,7 @@ pub fn execute(job_id: &str) -> Result<()> {
JobStatus::Failed JobStatus::Failed
}, },
exit_code: Some(ec), exit_code: Some(ec),
finished_at: Some(now), finished_at: Some(finished_at),
..job ..job
})?; })?;
+8 -6
View File
@@ -47,11 +47,13 @@ pub fn execute(all: bool) -> Result<()> {
.map(|(worker_name, indices)| { .map(|(worker_name, indices)| {
let worker_cfg = cfg.get_worker(worker_name).cloned(); let worker_cfg = cfg.get_worker(worker_name).cloned();
let ids: Vec<String> = indices.iter().map(|&i| jobs[i].id.clone()).collect(); let ids: Vec<String> = indices.iter().map(|&i| jobs[i].id.clone()).collect();
std::thread::spawn(move || -> Option<HashMap<String, Option<i32>>> { std::thread::spawn(
move || -> Option<HashMap<String, Option<ssh::JobPollResult>>> {
let w = worker_cfg?; let w = worker_cfg?;
let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect(); let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect();
ssh::poll_jobs(&w, &id_refs).ok() ssh::poll_jobs(&w, &id_refs).ok()
}) },
)
}) })
.collect(); .collect();
@@ -60,14 +62,14 @@ pub fn execute(all: bool) -> Result<()> {
match handle.join().unwrap_or(None) { match handle.join().unwrap_or(None) {
Some(results) => { Some(results) => {
for &i in indices { for &i in indices {
if let Some(Some(ec)) = results.get(&jobs[i].id) { if let Some(Some(res)) = results.get(&jobs[i].id) {
jobs[i].status = if *ec == 0 { jobs[i].status = if res.exit_code == 0 {
JobStatus::Done JobStatus::Done
} else { } else {
JobStatus::Failed JobStatus::Failed
}; };
jobs[i].exit_code = Some(*ec); jobs[i].exit_code = Some(res.exit_code);
jobs[i].finished_at = Some(now); jobs[i].finished_at = Some(res.finished_at.unwrap_or(now));
db::save(&jobs[i])?; db::save(&jobs[i])?;
} }
} }
+4 -2
View File
@@ -131,7 +131,8 @@ pub fn execute(
// ── 5. Reconcile status after returning ─────────────────────────────────── // ── 5. Reconcile status after returning ───────────────────────────────────
let exit_code = ssh::read_job_exitcode(&worker, &id); let exit_code = ssh::read_job_exitcode(&worker, &id);
let now = Utc::now().timestamp(); let finished_at =
ssh::read_job_finished_at(&worker, &id).unwrap_or_else(|| Utc::now().timestamp());
if let Some(ec) = exit_code { if let Some(ec) = exit_code {
// Job is done. The session might still be alive if the user Ctrl+B D'd // Job is done. The session might still be alive if the user Ctrl+B D'd
@@ -149,7 +150,7 @@ pub fn execute(
JobStatus::Failed JobStatus::Failed
}, },
exit_code: Some(ec), exit_code: Some(ec),
finished_at: Some(now), finished_at: Some(finished_at),
..job ..job
})?; })?;
@@ -199,6 +200,7 @@ pub(crate) fn build_run_sh(session: &str, job_dir: &str, work_dir: &str, worker:
bash -c \"$cmd\" 2>&1 | tee {job_dir}/output.log\n\ bash -c \"$cmd\" 2>&1 | tee {job_dir}/output.log\n\
EXIT_CODE=${{PIPESTATUS[0]}}\n\ EXIT_CODE=${{PIPESTATUS[0]}}\n\
echo \"$EXIT_CODE\" > {job_dir}/exitcode\n\ echo \"$EXIT_CODE\" > {job_dir}/exitcode\n\
date +%s > {job_dir}/finished_at\n\
\n\ \n\
# Update status bar to show result\n\ # Update status bar to show result\n\
if [ \"$EXIT_CODE\" -eq 0 ]; then\n\ if [ \"$EXIT_CODE\" -eq 0 ]; then\n\
+35 -8
View File
@@ -155,11 +155,29 @@ pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
.and_then(|s| s.trim().parse().ok()) .and_then(|s| s.trim().parse().ok())
} }
/// Read the finish timestamp written by run.sh on the worker, if the job has finished.
pub fn read_job_finished_at(worker: &WorkerConfig, job_id: &str) -> Option<i64> {
let cmd = format!("cat ~/.p/jobs/{}/finished_at 2>/dev/null", job_id);
run_capture(worker, &cmd)
.ok()
.and_then(|s| s.trim().parse().ok())
}
/// Result of polling a single job from the worker.
pub struct JobPollResult {
pub exit_code: i32,
/// Unix timestamp recorded server-side when the job finished.
pub finished_at: Option<i64>,
}
/// Poll multiple jobs on one worker in a single SSH call. /// Poll multiple jobs on one worker in a single SSH call.
/// Uses a 3-second connect timeout so `p ls` stays responsive with /// Uses a 3-second connect timeout so `p ls` stays responsive with
/// unreachable workers. /// unreachable workers.
/// Returns a map of job_id → exit_code (None = still running). /// Returns a map of job_id → JobPollResult (None = still running).
pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result<HashMap<String, Option<i32>>> { pub fn poll_jobs(
worker: &WorkerConfig,
job_ids: &[&str],
) -> Result<HashMap<String, Option<JobPollResult>>> {
if job_ids.is_empty() { if job_ids.is_empty() {
return Ok(HashMap::new()); return Ok(HashMap::new());
} }
@@ -167,7 +185,8 @@ pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result<HashMap<Stri
let script = format!( let script = format!(
"for id in {}; do \ "for id in {}; do \
ec=$(cat ~/.p/jobs/$id/exitcode 2>/dev/null); \ ec=$(cat ~/.p/jobs/$id/exitcode 2>/dev/null); \
echo \"$id:$ec\"; \ fa=$(cat ~/.p/jobs/$id/finished_at 2>/dev/null); \
echo \"$id:$ec:$fa\"; \
done", done",
job_ids.join(" ") job_ids.join(" ")
); );
@@ -175,14 +194,22 @@ pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result<HashMap<Stri
let output = run_with_timeout(worker, &script, 3)?; let output = run_with_timeout(worker, &script, 3)?;
let mut map = HashMap::new(); let mut map = HashMap::new();
for line in output.lines() { for line in output.lines() {
if let Some((id, ec)) = line.split_once(':') { let parts: Vec<&str> = line.splitn(3, ':').collect();
let exit_code = if ec.is_empty() { if parts.len() < 2 {
continue;
}
let id = parts[0];
let ec_str = parts[1];
let fa_str = if parts.len() == 3 { parts[2] } else { "" };
let result = if ec_str.is_empty() {
None None
} else { } else {
ec.trim().parse().ok() ec_str.trim().parse::<i32>().ok().map(|ec| JobPollResult {
exit_code: ec,
finished_at: fa_str.trim().parse().ok(),
})
}; };
map.insert(id.to_string(), exit_code); map.insert(id.to_string(), result);
}
} }
Ok(map) Ok(map)
} }