Compare commits
2 Commits
0fbd3f9952
...
883114e2a3
| Author | SHA1 | Date | |
|---|---|---|---|
| 883114e2a3 | |||
| f8b28aa43f |
@@ -64,7 +64,8 @@ pub fn execute(job_id: &str) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let exit_code = ssh::read_job_exitcode(&worker, &job.id);
|
let exit_code = ssh::read_job_exitcode(&worker, &job.id);
|
||||||
let now = Utc::now().timestamp();
|
let finished_at =
|
||||||
|
ssh::read_job_finished_at(&worker, &job.id).unwrap_or_else(|| Utc::now().timestamp());
|
||||||
|
|
||||||
if let Some(ec) = exit_code {
|
if let Some(ec) = exit_code {
|
||||||
ssh::run_capture(
|
ssh::run_capture(
|
||||||
@@ -80,7 +81,7 @@ pub fn execute(job_id: &str) -> Result<()> {
|
|||||||
JobStatus::Failed
|
JobStatus::Failed
|
||||||
},
|
},
|
||||||
exit_code: Some(ec),
|
exit_code: Some(ec),
|
||||||
finished_at: Some(now),
|
finished_at: Some(finished_at),
|
||||||
..job
|
..job
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
|||||||
+23
-2
@@ -20,8 +20,29 @@ pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
|||||||
);
|
);
|
||||||
print_log(worker, &log)?;
|
print_log(worker, &log)?;
|
||||||
} else {
|
} else {
|
||||||
// Stream live output. Ctrl+C kills ssh; the job keeps running.
|
// Stream live output and exit automatically when the job finishes.
|
||||||
ssh::run_output(worker, &format!("tail -n +1 -f {}", log))?;
|
//
|
||||||
|
// The script backgrounds `tail -f` (so its stdout still flows to the
|
||||||
|
// client through the SSH pipe), then polls for the exitcode file that
|
||||||
|
// run.sh writes the moment the job exits. A 1-second sleep after
|
||||||
|
// detecting the exitcode file gives tail time to drain any bytes that
|
||||||
|
// were written to the log just before exitcode appeared. `kill` +
|
||||||
|
// `wait` let tail flush its buffer before the SSH connection closes.
|
||||||
|
//
|
||||||
|
// Ctrl+C still works: it kills ssh, which sends SIGHUP to the remote
|
||||||
|
// shell, terminating the whole script including the tail background job.
|
||||||
|
let exitcode_path = format!("~/.p/jobs/{}/exitcode", job.id);
|
||||||
|
let follow_cmd = format!(
|
||||||
|
"tail -n +1 -f {log} & \
|
||||||
|
TAIL_PID=$!; \
|
||||||
|
while ! [ -f {exitcode} ]; do sleep 1; done; \
|
||||||
|
sleep 1; \
|
||||||
|
kill \"$TAIL_PID\" 2>/dev/null; \
|
||||||
|
wait \"$TAIL_PID\" 2>/dev/null",
|
||||||
|
log = log,
|
||||||
|
exitcode = exitcode_path,
|
||||||
|
);
|
||||||
|
ssh::run_output(worker, &follow_cmd)?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
print_log(worker, &log)?;
|
print_log(worker, &log)?;
|
||||||
|
|||||||
+11
-9
@@ -47,11 +47,13 @@ pub fn execute(all: bool) -> Result<()> {
|
|||||||
.map(|(worker_name, indices)| {
|
.map(|(worker_name, indices)| {
|
||||||
let worker_cfg = cfg.get_worker(worker_name).cloned();
|
let worker_cfg = cfg.get_worker(worker_name).cloned();
|
||||||
let ids: Vec<String> = indices.iter().map(|&i| jobs[i].id.clone()).collect();
|
let ids: Vec<String> = indices.iter().map(|&i| jobs[i].id.clone()).collect();
|
||||||
std::thread::spawn(move || -> Option<HashMap<String, Option<i32>>> {
|
std::thread::spawn(
|
||||||
let w = worker_cfg?;
|
move || -> Option<HashMap<String, Option<ssh::JobPollResult>>> {
|
||||||
let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect();
|
let w = worker_cfg?;
|
||||||
ssh::poll_jobs(&w, &id_refs).ok()
|
let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect();
|
||||||
})
|
ssh::poll_jobs(&w, &id_refs).ok()
|
||||||
|
},
|
||||||
|
)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
@@ -60,14 +62,14 @@ pub fn execute(all: bool) -> Result<()> {
|
|||||||
match handle.join().unwrap_or(None) {
|
match handle.join().unwrap_or(None) {
|
||||||
Some(results) => {
|
Some(results) => {
|
||||||
for &i in indices {
|
for &i in indices {
|
||||||
if let Some(Some(ec)) = results.get(&jobs[i].id) {
|
if let Some(Some(res)) = results.get(&jobs[i].id) {
|
||||||
jobs[i].status = if *ec == 0 {
|
jobs[i].status = if res.exit_code == 0 {
|
||||||
JobStatus::Done
|
JobStatus::Done
|
||||||
} else {
|
} else {
|
||||||
JobStatus::Failed
|
JobStatus::Failed
|
||||||
};
|
};
|
||||||
jobs[i].exit_code = Some(*ec);
|
jobs[i].exit_code = Some(res.exit_code);
|
||||||
jobs[i].finished_at = Some(now);
|
jobs[i].finished_at = Some(res.finished_at.unwrap_or(now));
|
||||||
db::save(&jobs[i])?;
|
db::save(&jobs[i])?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -131,7 +131,8 @@ pub fn execute(
|
|||||||
// ── 5. Reconcile status after returning ───────────────────────────────────
|
// ── 5. Reconcile status after returning ───────────────────────────────────
|
||||||
|
|
||||||
let exit_code = ssh::read_job_exitcode(&worker, &id);
|
let exit_code = ssh::read_job_exitcode(&worker, &id);
|
||||||
let now = Utc::now().timestamp();
|
let finished_at =
|
||||||
|
ssh::read_job_finished_at(&worker, &id).unwrap_or_else(|| Utc::now().timestamp());
|
||||||
|
|
||||||
if let Some(ec) = exit_code {
|
if let Some(ec) = exit_code {
|
||||||
// Job is done. The session might still be alive if the user Ctrl+B D'd
|
// Job is done. The session might still be alive if the user Ctrl+B D'd
|
||||||
@@ -149,7 +150,7 @@ pub fn execute(
|
|||||||
JobStatus::Failed
|
JobStatus::Failed
|
||||||
},
|
},
|
||||||
exit_code: Some(ec),
|
exit_code: Some(ec),
|
||||||
finished_at: Some(now),
|
finished_at: Some(finished_at),
|
||||||
..job
|
..job
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
@@ -199,6 +200,7 @@ pub(crate) fn build_run_sh(session: &str, job_dir: &str, work_dir: &str, worker:
|
|||||||
bash -c \"$cmd\" 2>&1 | tee {job_dir}/output.log\n\
|
bash -c \"$cmd\" 2>&1 | tee {job_dir}/output.log\n\
|
||||||
EXIT_CODE=${{PIPESTATUS[0]}}\n\
|
EXIT_CODE=${{PIPESTATUS[0]}}\n\
|
||||||
echo \"$EXIT_CODE\" > {job_dir}/exitcode\n\
|
echo \"$EXIT_CODE\" > {job_dir}/exitcode\n\
|
||||||
|
date +%s > {job_dir}/finished_at\n\
|
||||||
\n\
|
\n\
|
||||||
# Update status bar to show result\n\
|
# Update status bar to show result\n\
|
||||||
if [ \"$EXIT_CODE\" -eq 0 ]; then\n\
|
if [ \"$EXIT_CODE\" -eq 0 ]; then\n\
|
||||||
|
|||||||
+37
-10
@@ -155,11 +155,29 @@ pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
|
|||||||
.and_then(|s| s.trim().parse().ok())
|
.and_then(|s| s.trim().parse().ok())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read the finish timestamp written by run.sh on the worker, if the job has finished.
|
||||||
|
pub fn read_job_finished_at(worker: &WorkerConfig, job_id: &str) -> Option<i64> {
|
||||||
|
let cmd = format!("cat ~/.p/jobs/{}/finished_at 2>/dev/null", job_id);
|
||||||
|
run_capture(worker, &cmd)
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.trim().parse().ok())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Result of polling a single job from the worker.
|
||||||
|
pub struct JobPollResult {
|
||||||
|
pub exit_code: i32,
|
||||||
|
/// Unix timestamp recorded server-side when the job finished.
|
||||||
|
pub finished_at: Option<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Poll multiple jobs on one worker in a single SSH call.
|
/// Poll multiple jobs on one worker in a single SSH call.
|
||||||
/// Uses a 3-second connect timeout so `p ls` stays responsive with
|
/// Uses a 3-second connect timeout so `p ls` stays responsive with
|
||||||
/// unreachable workers.
|
/// unreachable workers.
|
||||||
/// Returns a map of job_id → exit_code (None = still running).
|
/// Returns a map of job_id → JobPollResult (None = still running).
|
||||||
pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result<HashMap<String, Option<i32>>> {
|
pub fn poll_jobs(
|
||||||
|
worker: &WorkerConfig,
|
||||||
|
job_ids: &[&str],
|
||||||
|
) -> Result<HashMap<String, Option<JobPollResult>>> {
|
||||||
if job_ids.is_empty() {
|
if job_ids.is_empty() {
|
||||||
return Ok(HashMap::new());
|
return Ok(HashMap::new());
|
||||||
}
|
}
|
||||||
@@ -167,7 +185,8 @@ pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result<HashMap<Stri
|
|||||||
let script = format!(
|
let script = format!(
|
||||||
"for id in {}; do \
|
"for id in {}; do \
|
||||||
ec=$(cat ~/.p/jobs/$id/exitcode 2>/dev/null); \
|
ec=$(cat ~/.p/jobs/$id/exitcode 2>/dev/null); \
|
||||||
echo \"$id:$ec\"; \
|
fa=$(cat ~/.p/jobs/$id/finished_at 2>/dev/null); \
|
||||||
|
echo \"$id:$ec:$fa\"; \
|
||||||
done",
|
done",
|
||||||
job_ids.join(" ")
|
job_ids.join(" ")
|
||||||
);
|
);
|
||||||
@@ -175,14 +194,22 @@ pub fn poll_jobs(worker: &WorkerConfig, job_ids: &[&str]) -> Result<HashMap<Stri
|
|||||||
let output = run_with_timeout(worker, &script, 3)?;
|
let output = run_with_timeout(worker, &script, 3)?;
|
||||||
let mut map = HashMap::new();
|
let mut map = HashMap::new();
|
||||||
for line in output.lines() {
|
for line in output.lines() {
|
||||||
if let Some((id, ec)) = line.split_once(':') {
|
let parts: Vec<&str> = line.splitn(3, ':').collect();
|
||||||
let exit_code = if ec.is_empty() {
|
if parts.len() < 2 {
|
||||||
None
|
continue;
|
||||||
} else {
|
|
||||||
ec.trim().parse().ok()
|
|
||||||
};
|
|
||||||
map.insert(id.to_string(), exit_code);
|
|
||||||
}
|
}
|
||||||
|
let id = parts[0];
|
||||||
|
let ec_str = parts[1];
|
||||||
|
let fa_str = if parts.len() == 3 { parts[2] } else { "" };
|
||||||
|
let result = if ec_str.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
ec_str.trim().parse::<i32>().ok().map(|ec| JobPollResult {
|
||||||
|
exit_code: ec,
|
||||||
|
finished_at: fa_str.trim().parse().ok(),
|
||||||
|
})
|
||||||
|
};
|
||||||
|
map.insert(id.to_string(), result);
|
||||||
}
|
}
|
||||||
Ok(map)
|
Ok(map)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user