feat: switch from tmux to simple nohup for remote jobs
CI / Check, test, lint (push) Successful in 28s
CI / Check, test, lint (push) Successful in 28s
- Remove tmux-based execution; jobs now run via nohup + shell script - Remove p-agent daemon from workspace - Remove `p attach` command (use `p logs -f` instead) - Add `-d/--detach` flag to start job without streaming - Stream output via SSH tail -F - Ctrl+C detaches from stream; job keeps running - `p stop` kills via PID file - Worker requirements reduced to just rsync (no tmux needed)
This commit is contained in:
Generated
+7
-14
@@ -73,12 +73,6 @@ version = "1.5.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
|
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "base64"
|
|
||||||
version = "0.22.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bitflags"
|
name = "bitflags"
|
||||||
version = "2.11.1"
|
version = "2.11.1"
|
||||||
@@ -476,7 +470,6 @@ name = "p"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"base64",
|
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
"ctrlc",
|
"ctrlc",
|
||||||
@@ -484,16 +477,10 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
|
"shell-words",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "p-agent"
|
|
||||||
version = "0.1.0"
|
|
||||||
dependencies = [
|
|
||||||
"anyhow",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project-lite"
|
name = "pin-project-lite"
|
||||||
version = "0.2.17"
|
version = "0.2.17"
|
||||||
@@ -619,6 +606,12 @@ dependencies = [
|
|||||||
"unsafe-libyaml",
|
"unsafe-libyaml",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "shell-words"
|
||||||
|
version = "1.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "dc6fe69c597f9c37bfeeeeeb33da3530379845f10be461a66d16d03eca2ded77"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "shlex"
|
name = "shlex"
|
||||||
version = "1.3.0"
|
version = "1.3.0"
|
||||||
|
|||||||
+1
-1
@@ -1,3 +1,3 @@
|
|||||||
[workspace]
|
[workspace]
|
||||||
members = ["p", "agent"]
|
members = ["p"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|||||||
@@ -1,11 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "p-agent"
|
|
||||||
version = "0.1.0"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[[bin]]
|
|
||||||
name = "p-agent"
|
|
||||||
path = "src/main.rs"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
anyhow = "1"
|
|
||||||
@@ -1,4 +0,0 @@
|
|||||||
fn main() {
|
|
||||||
println!("p-agent — worker-side agent for p");
|
|
||||||
println!("(not yet implemented)");
|
|
||||||
}
|
|
||||||
+2
-2
@@ -22,7 +22,7 @@ dirs = "5"
|
|||||||
uuid = { version = "1", features = ["v4"] }
|
uuid = { version = "1", features = ["v4"] }
|
||||||
# Timestamps and duration display
|
# Timestamps and duration display
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
# Encoding commands passed through SSH → tmux → bash
|
# Shell escaping for commands
|
||||||
base64 = "0.22"
|
shell-words = "1"
|
||||||
# Ctrl+C handling (detach message)
|
# Ctrl+C handling (detach message)
|
||||||
ctrlc = "3"
|
ctrlc = "3"
|
||||||
|
|||||||
+1
-7
@@ -5,7 +5,7 @@ use clap::{Parser, Subcommand};
|
|||||||
name = "p",
|
name = "p",
|
||||||
about = "Push jobs to remote worker machines",
|
about = "Push jobs to remote worker machines",
|
||||||
long_about = "\
|
long_about = "\
|
||||||
Push jobs to remote worker machines with directory sync and attach/detach support.
|
Push jobs to remote worker machines with directory sync and log streaming.
|
||||||
|
|
||||||
Run a command on the default worker:
|
Run a command on the default worker:
|
||||||
p -- <command>
|
p -- <command>
|
||||||
@@ -33,12 +33,6 @@ pub enum Command {
|
|||||||
all: bool,
|
all: bool,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Re-attach to the tmux session of a running job
|
|
||||||
Attach {
|
|
||||||
/// Job ID or unambiguous prefix
|
|
||||||
job_id: String,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// Print captured output of a job (running or finished)
|
/// Print captured output of a job (running or finished)
|
||||||
Logs {
|
Logs {
|
||||||
/// Job ID or unambiguous prefix
|
/// Job ID or unambiguous prefix
|
||||||
|
|||||||
@@ -1,97 +0,0 @@
|
|||||||
use anyhow::{Context, Result};
|
|
||||||
use chrono::Utc;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
config, db,
|
|
||||||
job::{Job, JobStatus},
|
|
||||||
ssh,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub fn execute(job_id: &str) -> Result<()> {
|
|
||||||
let cfg = config::load()?;
|
|
||||||
let job = db::find(job_id)?.with_context(|| format!("job '{}' not found", job_id))?;
|
|
||||||
|
|
||||||
// Allow attach on Running (normal) and Unknown (worker was temporarily
|
|
||||||
// unreachable — the job may still be live).
|
|
||||||
match job.status {
|
|
||||||
JobStatus::Running | JobStatus::Unknown => {}
|
|
||||||
_ => anyhow::bail!(
|
|
||||||
"job {} is not running (status: {})\n\
|
|
||||||
use 'p logs {}' to view its output",
|
|
||||||
job.short_id(),
|
|
||||||
job.status.as_str(),
|
|
||||||
job.short_id(),
|
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
let worker = cfg.resolve_worker(Some(&job.worker))?.clone();
|
|
||||||
let session = format!("p-{}", job.short_id());
|
|
||||||
let sid = job.short_id().to_string();
|
|
||||||
|
|
||||||
{
|
|
||||||
let sid = sid.clone();
|
|
||||||
ctrlc::set_handler(move || {
|
|
||||||
eprintln!(
|
|
||||||
"\nDetached. Use 'p attach {sid}' to re-attach or 'p logs {sid}' to view output."
|
|
||||||
);
|
|
||||||
std::process::exit(0);
|
|
||||||
})
|
|
||||||
.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
let attach_status = ssh::run_interactive(&worker, &format!("tmux attach -t '{}'", session))?;
|
|
||||||
|
|
||||||
// If tmux exited non-zero the session doesn't exist on the worker.
|
|
||||||
if !attach_status.success() {
|
|
||||||
let exit_code = ssh::read_job_exitcode(&worker, &job.id);
|
|
||||||
if exit_code.is_none() {
|
|
||||||
// No exit code either — the job was lost without completing
|
|
||||||
// (e.g. worker restarted, tmux server killed).
|
|
||||||
eprintln!(
|
|
||||||
"error: tmux session for job {} no longer exists on '{}'.",
|
|
||||||
sid, worker.name
|
|
||||||
);
|
|
||||||
eprintln!("The job was likely interrupted (worker restart or tmux server exit).");
|
|
||||||
eprintln!("Use 'p logs {}' to see whatever output was captured.", sid);
|
|
||||||
db::save(&Job {
|
|
||||||
status: JobStatus::Unknown,
|
|
||||||
..job
|
|
||||||
})?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
// Exit code exists — job finished cleanly but the session was already
|
|
||||||
// cleaned up. Fall through to normal reconciliation.
|
|
||||||
}
|
|
||||||
|
|
||||||
let exit_code = ssh::read_job_exitcode(&worker, &job.id);
|
|
||||||
let finished_at =
|
|
||||||
ssh::read_job_finished_at(&worker, &job.id).unwrap_or_else(|| Utc::now().timestamp());
|
|
||||||
|
|
||||||
if let Some(ec) = exit_code {
|
|
||||||
ssh::run_capture(
|
|
||||||
&worker,
|
|
||||||
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session),
|
|
||||||
)
|
|
||||||
.ok();
|
|
||||||
|
|
||||||
db::save(&Job {
|
|
||||||
status: if ec == 0 {
|
|
||||||
JobStatus::Done
|
|
||||||
} else {
|
|
||||||
JobStatus::Failed
|
|
||||||
},
|
|
||||||
exit_code: Some(ec),
|
|
||||||
finished_at: Some(finished_at),
|
|
||||||
..job
|
|
||||||
})?;
|
|
||||||
|
|
||||||
eprintln!("Job {} finished with exit code {}.", sid, ec);
|
|
||||||
} else {
|
|
||||||
eprintln!(
|
|
||||||
"Detached from job {}. Use 'p attach {}' to re-attach.",
|
|
||||||
sid, sid
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
+49
-3
@@ -1,6 +1,11 @@
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
|
use chrono::Utc;
|
||||||
|
|
||||||
use crate::{config, db, job::JobStatus, ssh};
|
use crate::{
|
||||||
|
config, db,
|
||||||
|
job::{Job, JobStatus},
|
||||||
|
ssh,
|
||||||
|
};
|
||||||
|
|
||||||
pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
||||||
let cfg = config::load()?;
|
let cfg = config::load()?;
|
||||||
@@ -19,10 +24,12 @@ pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
|||||||
job.short_id()
|
job.short_id()
|
||||||
);
|
);
|
||||||
print_log(worker, &log)?;
|
print_log(worker, &log)?;
|
||||||
|
reconcile_and_print_exitcode(worker, &job)?;
|
||||||
} else {
|
} else {
|
||||||
// Stream live output and exit automatically when the job finishes.
|
// Stream live output and exit automatically when the job finishes.
|
||||||
//
|
//
|
||||||
// The script backgrounds `tail -f` (so its stdout still flows to the
|
// Use -F (--follow=name) to handle the file not existing yet.
|
||||||
|
// The script backgrounds `tail -F` (so its stdout still flows to the
|
||||||
// client through the SSH pipe), then polls for the exitcode file that
|
// client through the SSH pipe), then polls for the exitcode file that
|
||||||
// run.sh writes the moment the job exits. A 1-second sleep after
|
// run.sh writes the moment the job exits. A 1-second sleep after
|
||||||
// detecting the exitcode file gives tail time to drain any bytes that
|
// detecting the exitcode file gives tail time to drain any bytes that
|
||||||
@@ -33,7 +40,7 @@ pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
|||||||
// shell, terminating the whole script including the tail background job.
|
// shell, terminating the whole script including the tail background job.
|
||||||
let exitcode_path = format!("~/.p/jobs/{}/exitcode", job.id);
|
let exitcode_path = format!("~/.p/jobs/{}/exitcode", job.id);
|
||||||
let follow_cmd = format!(
|
let follow_cmd = format!(
|
||||||
"tail -n +1 -f {log} & \
|
"tail -n +1 -F {log} & \
|
||||||
TAIL_PID=$!; \
|
TAIL_PID=$!; \
|
||||||
while ! [ -f {exitcode} ]; do sleep 1; done; \
|
while ! [ -f {exitcode} ]; do sleep 1; done; \
|
||||||
sleep 1; \
|
sleep 1; \
|
||||||
@@ -43,9 +50,16 @@ pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
|||||||
exitcode = exitcode_path,
|
exitcode = exitcode_path,
|
||||||
);
|
);
|
||||||
ssh::run_output(worker, &follow_cmd)?;
|
ssh::run_output(worker, &follow_cmd)?;
|
||||||
|
|
||||||
|
// Reconcile state after streaming ends
|
||||||
|
reconcile_and_print_exitcode(worker, &job)?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
print_log(worker, &log)?;
|
print_log(worker, &log)?;
|
||||||
|
if job.status == JobStatus::Running {
|
||||||
|
// Check if job has finished
|
||||||
|
reconcile_and_print_exitcode(worker, &job)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -59,3 +73,35 @@ fn print_log(worker: &crate::config::WorkerConfig, log: &str) -> Result<()> {
|
|||||||
print!("{}", out);
|
print!("{}", out);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if the job has finished and update local state. Print exit code if done.
|
||||||
|
fn reconcile_and_print_exitcode(worker: &crate::config::WorkerConfig, job: &Job) -> Result<()> {
|
||||||
|
if job.status != JobStatus::Running {
|
||||||
|
// Already finished
|
||||||
|
if let Some(ec) = job.exit_code {
|
||||||
|
eprintln!("[Job done: exit {}]", ec);
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let exit_code = ssh::read_job_exitcode(worker, &job.id);
|
||||||
|
if let Some(ec) = exit_code {
|
||||||
|
let finished_at =
|
||||||
|
ssh::read_job_finished_at(worker, &job.id).unwrap_or_else(|| Utc::now().timestamp());
|
||||||
|
|
||||||
|
db::save(&Job {
|
||||||
|
status: if ec == 0 {
|
||||||
|
JobStatus::Done
|
||||||
|
} else {
|
||||||
|
JobStatus::Failed
|
||||||
|
},
|
||||||
|
exit_code: Some(ec),
|
||||||
|
finished_at: Some(finished_at),
|
||||||
|
..job.clone()
|
||||||
|
})?;
|
||||||
|
|
||||||
|
eprintln!("[Job done: exit {}]", ec);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
pub mod attach;
|
|
||||||
pub mod logs;
|
pub mod logs;
|
||||||
pub mod ls;
|
pub mod ls;
|
||||||
pub mod prune;
|
pub mod prune;
|
||||||
|
|||||||
@@ -16,14 +16,14 @@ pub fn execute(job_id: &str, force: bool) -> Result<()> {
|
|||||||
sid
|
sid
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// Kill the tmux session before wiping the files.
|
// Kill the job via PID before wiping the files.
|
||||||
let session = format!("p-{}", sid);
|
|
||||||
let worker = cfg.resolve_worker(Some(&job.worker))?;
|
let worker = cfg.resolve_worker(Some(&job.worker))?;
|
||||||
ssh::run_capture(
|
let pid_path = format!("~/.p/jobs/{}/pid", job.id);
|
||||||
worker,
|
let kill_cmd = format!(
|
||||||
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session),
|
"PID=$(cat {} 2>/dev/null) && kill \"$PID\" 2>/dev/null",
|
||||||
)
|
pid_path
|
||||||
.ok();
|
);
|
||||||
|
ssh::run_capture(worker, &kill_cmd).ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove remote job dir and work dir (best-effort — warn if unreachable).
|
// Remove remote job dir and work dir (best-effort — warn if unreachable).
|
||||||
|
|||||||
+55
-161
@@ -1,5 +1,4 @@
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use base64::{engine::general_purpose::STANDARD as B64, Engine};
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use std::io::IsTerminal;
|
use std::io::IsTerminal;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@@ -25,7 +24,6 @@ pub fn execute(
|
|||||||
|
|
||||||
let id = Uuid::new_v4().to_string();
|
let id = Uuid::new_v4().to_string();
|
||||||
let short_id = &id[..8];
|
let short_id = &id[..8];
|
||||||
let session = format!("p-{}", short_id);
|
|
||||||
|
|
||||||
// Resolve the remote home directory once using an absolute path so that
|
// Resolve the remote home directory once using an absolute path so that
|
||||||
// every subsequent call — SSH, rsync, and scripts on the worker — all
|
// every subsequent call — SSH, rsync, and scripts on the worker — all
|
||||||
@@ -78,27 +76,31 @@ pub fn execute(
|
|||||||
};
|
};
|
||||||
db::save(&job)?;
|
db::save(&job)?;
|
||||||
|
|
||||||
// ── 4. Write files and start the job on the worker ────────────────────────
|
// ── 4. Write metadata and start the job on the worker ─────────────────────
|
||||||
|
|
||||||
let cmd_b64 = B64.encode(&cmd_str);
|
|
||||||
let run_sh = build_run_sh(&session, &job_dir, &work_dir, &worker.name);
|
|
||||||
let run_sh_b64 = B64.encode(&run_sh);
|
|
||||||
|
|
||||||
|
// Write cmd, started_at, and the run script
|
||||||
|
let run_sh = build_run_sh(&job_dir, &work_dir, &cmd_str);
|
||||||
let setup = format!(
|
let setup = format!(
|
||||||
"printf '%s' '{cmd_b64}' > {job_dir}/cmd && \
|
"printf '%s' {} > {job_dir}/cmd && \
|
||||||
printf '%s' '{run_sh_b64}' | base64 -d > {job_dir}/run.sh && \
|
|
||||||
chmod +x {job_dir}/run.sh && \
|
|
||||||
date +%s > {job_dir}/started_at && \
|
date +%s > {job_dir}/started_at && \
|
||||||
tmux new-session -d -s '{session}' {job_dir}/run.sh",
|
touch {job_dir}/output.log && \
|
||||||
job_dir = job_dir,
|
printf '%s' {} > {job_dir}/run.sh && \
|
||||||
cmd_b64 = cmd_b64,
|
chmod +x {job_dir}/run.sh",
|
||||||
run_sh_b64 = run_sh_b64,
|
shell_words::quote(&cmd_str),
|
||||||
session = session,
|
shell_words::quote(&run_sh),
|
||||||
|
job_dir = job_dir
|
||||||
);
|
);
|
||||||
|
ssh::run_capture(&worker, &setup).context("failed to write job metadata")?;
|
||||||
|
|
||||||
ssh::run_capture(&worker, &setup).context("failed to set up job on worker")?;
|
// Start the run script with nohup
|
||||||
|
// The PID is saved to a file for later termination via `p stop`
|
||||||
|
let launch_cmd = format!(
|
||||||
|
"cd {} && nohup {}/run.sh & echo $! > {}/pid",
|
||||||
|
work_dir, job_dir, job_dir
|
||||||
|
);
|
||||||
|
ssh::run_capture(&worker, &launch_cmd).context("failed to start job on worker")?;
|
||||||
|
|
||||||
// ── 4. Attach or detach ───────────────────────────────────────────────────────
|
// ── 5. Detach or stream output ────────────────────────────────────────────
|
||||||
|
|
||||||
if detach {
|
if detach {
|
||||||
// Non-interactive mode: print only the job UUID to stdout and exit.
|
// Non-interactive mode: print only the job UUID to stdout and exit.
|
||||||
@@ -108,41 +110,43 @@ pub fn execute(
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Interactive mode: attach to the tmux session.
|
// Interactive mode: stream output.log in real-time.
|
||||||
// run.sh keeps the session alive after the job finishes (via `read`),
|
// Ctrl+C detaches from the stream — the job keeps running.
|
||||||
// so there is no race between job completion and our attach call.
|
|
||||||
//
|
|
||||||
// Ctrl+B D detaches cleanly mid-run; the job keeps going in the background.
|
|
||||||
// Any other key on the "press any key" screen triggers `tmux detach-client`
|
|
||||||
// from within run.sh — no [exited] or [detached] flash.
|
|
||||||
|
|
||||||
let sid = short_id.to_string();
|
let sid = short_id.to_string();
|
||||||
ctrlc::set_handler(move || {
|
ctrlc::set_handler(move || {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"\nDetached. Use 'p attach {sid}' to re-attach or 'p logs {sid}' to view output."
|
"\nDetached. Use 'p logs -f {}' to resume watching or 'p stop {}' to kill.",
|
||||||
|
sid, sid
|
||||||
);
|
);
|
||||||
std::process::exit(0);
|
std::process::exit(0);
|
||||||
})
|
})
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
eprintln!("Job {} started on {}.", short_id, worker.name);
|
eprintln!("Job {} started on {}.", short_id, worker.name);
|
||||||
ssh::run_interactive(&worker, &format!("tmux attach -t '{}'", session))?;
|
|
||||||
|
|
||||||
// ── 5. Reconcile status after returning ───────────────────────────────────
|
// Stream the log file, exiting when the job finishes
|
||||||
|
// Use -F (--follow=name) to handle the file not existing yet
|
||||||
|
let log_path = format!("{}/output.log", job_dir);
|
||||||
|
let exitcode_path = format!("{}/exitcode", job_dir);
|
||||||
|
let stream_cmd = format!(
|
||||||
|
"tail -n +1 -F {} & \
|
||||||
|
TAIL_PID=$!; \
|
||||||
|
while ! [ -f {} ]; do sleep 1; done; \
|
||||||
|
sleep 0.5; \
|
||||||
|
kill \"$TAIL_PID\" 2>/dev/null; \
|
||||||
|
wait \"$TAIL_PID\" 2>/dev/null",
|
||||||
|
log_path, exitcode_path
|
||||||
|
);
|
||||||
|
ssh::run_output(&worker, &stream_cmd)?;
|
||||||
|
|
||||||
|
// ── 6. Reconcile status after returning ───────────────────────────────────
|
||||||
|
|
||||||
let exit_code = ssh::read_job_exitcode(&worker, &id);
|
let exit_code = ssh::read_job_exitcode(&worker, &id);
|
||||||
let finished_at =
|
let finished_at =
|
||||||
ssh::read_job_finished_at(&worker, &id).unwrap_or_else(|| Utc::now().timestamp());
|
ssh::read_job_finished_at(&worker, &id).unwrap_or_else(|| Utc::now().timestamp());
|
||||||
|
|
||||||
if let Some(ec) = exit_code {
|
if let Some(ec) = exit_code {
|
||||||
// Job is done. The session might still be alive if the user Ctrl+B D'd
|
|
||||||
// from the "press any key" screen — kill it to clean up the lingering `read`.
|
|
||||||
ssh::run_capture(
|
|
||||||
&worker,
|
|
||||||
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session),
|
|
||||||
)
|
|
||||||
.ok();
|
|
||||||
|
|
||||||
db::save(&Job {
|
db::save(&Job {
|
||||||
status: if ec == 0 {
|
status: if ec == 0 {
|
||||||
JobStatus::Done
|
JobStatus::Done
|
||||||
@@ -154,145 +158,35 @@ pub fn execute(
|
|||||||
..job
|
..job
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
eprintln!("Job {} finished with exit code {}.", short_id, ec);
|
eprintln!("[Job done: exit {}]", ec);
|
||||||
} else {
|
} else {
|
||||||
// User detached mid-run (Ctrl+B D). Job is still running on the worker.
|
// Connection dropped or job lost
|
||||||
db::save(&Job {
|
db::save(&Job {
|
||||||
status: JobStatus::Running,
|
status: JobStatus::Unknown,
|
||||||
..job
|
..job
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"Detached from job {}. Use 'p attach {}' to re-attach or 'p logs {}' to view output.",
|
"Connection lost. Job {} may still be running. Use 'p logs -f {}' to resume.",
|
||||||
short_id, short_id, short_id
|
short_id, short_id
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build the shell script that runs inside the tmux pane.
|
/// Build the shell script that runs on the worker.
|
||||||
///
|
fn build_run_sh(job_dir: &str, work_dir: &str, cmd_str: &str) -> String {
|
||||||
/// `pub(crate)` so it can be tested.
|
|
||||||
pub(crate) fn build_run_sh(session: &str, job_dir: &str, work_dir: &str, worker: &str) -> String {
|
|
||||||
// Truncate worker name for display to avoid overflowing the status bar.
|
|
||||||
let worker_display = if worker.len() > 20 {
|
|
||||||
&worker[..20]
|
|
||||||
} else {
|
|
||||||
worker
|
|
||||||
};
|
|
||||||
|
|
||||||
format!(
|
format!(
|
||||||
// Shebang + status bar setup
|
r#"#!/bin/bash
|
||||||
"#!/bin/bash\n\
|
cd {work_dir}
|
||||||
tmux set-option -t '{session}' status on 2>/dev/null\n\
|
{cmd} 2>&1 | tee {job_dir}/output.log
|
||||||
tmux set-option -t '{session}' status-style 'fg=colour250,bg=colour235' 2>/dev/null\n\
|
EXIT_CODE=${{PIPESTATUS[0]}}
|
||||||
tmux set-option -t '{session}' status-left-length 60 2>/dev/null\n\
|
echo "$EXIT_CODE" > {job_dir}/exitcode
|
||||||
tmux set-option -t '{session}' status-right-length 20 2>/dev/null\n\
|
date +%s > {job_dir}/finished_at
|
||||||
tmux set-option -t '{session}' status-left \
|
"#,
|
||||||
'#[fg=colour39,bold] {session} #[fg=colour250,nobold]| {worker_display} ' 2>/dev/null\n\
|
|
||||||
tmux set-option -t '{session}' status-right \
|
|
||||||
'#[fg=colour250] running ' 2>/dev/null\n\
|
|
||||||
\n\
|
|
||||||
# Run the command, tee output to log\n\
|
|
||||||
cd {work_dir}\n\
|
|
||||||
cmd=$(base64 -d < {job_dir}/cmd)\n\
|
|
||||||
bash -c \"$cmd\" 2>&1 | tee {job_dir}/output.log\n\
|
|
||||||
EXIT_CODE=${{PIPESTATUS[0]}}\n\
|
|
||||||
echo \"$EXIT_CODE\" > {job_dir}/exitcode\n\
|
|
||||||
date +%s > {job_dir}/finished_at\n\
|
|
||||||
\n\
|
|
||||||
# Update status bar to show result\n\
|
|
||||||
if [ \"$EXIT_CODE\" -eq 0 ]; then\n\
|
|
||||||
tmux set-option -t '{session}' status-right \
|
|
||||||
'#[fg=colour2,bold] done [0] ' 2>/dev/null\n\
|
|
||||||
else\n\
|
|
||||||
tmux set-option -t '{session}' status-right \
|
|
||||||
\"#[fg=colour1,bold] done [$EXIT_CODE] \" 2>/dev/null\n\
|
|
||||||
fi\n\
|
|
||||||
\n\
|
|
||||||
# Keep the session open so the user can read final output\n\
|
|
||||||
printf '\\n\\033[2m--- done [exit %d] - press any key to detach ---\\033[0m\\n' \
|
|
||||||
\"$EXIT_CODE\"\n\
|
|
||||||
read -rn 1 -s\n\
|
|
||||||
\n\
|
|
||||||
# Detach cleanly — no [exited] / [detached] flash\n\
|
|
||||||
tmux detach-client -s '{session}' 2>/dev/null || true\n",
|
|
||||||
session = session,
|
|
||||||
worker_display = worker_display,
|
|
||||||
job_dir = job_dir,
|
|
||||||
work_dir = work_dir,
|
work_dir = work_dir,
|
||||||
|
cmd = cmd_str,
|
||||||
|
job_dir = job_dir
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Tests ─────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
fn sample_script() -> String {
|
|
||||||
build_run_sh(
|
|
||||||
"p-a3f2b091",
|
|
||||||
"/home/ubuntu/.p/jobs/a3f2b091-0000-0000-0000-000000000000",
|
|
||||||
"/home/ubuntu/.p/workdirs/a3f2b091-0000-0000-0000-000000000000",
|
|
||||||
"beefy",
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_starts_with_shebang() {
|
|
||||||
assert!(sample_script().starts_with("#!/bin/bash\n"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_contains_session_name() {
|
|
||||||
assert!(sample_script().contains("p-a3f2b091"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_contains_job_and_work_dirs() {
|
|
||||||
let s = sample_script();
|
|
||||||
assert!(s.contains("/home/ubuntu/.p/jobs/a3f2b091"));
|
|
||||||
assert!(s.contains("/home/ubuntu/.p/workdirs/a3f2b091"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_captures_exit_code_via_pipestatus() {
|
|
||||||
// Must use PIPESTATUS[0] to get the command's exit code, not tee's.
|
|
||||||
assert!(sample_script().contains("PIPESTATUS[0]"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_writes_exitcode_file() {
|
|
||||||
assert!(sample_script().contains("exitcode"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_tees_output_to_log() {
|
|
||||||
assert!(sample_script().contains("tee"));
|
|
||||||
assert!(sample_script().contains("output.log"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_waits_for_keypress_before_detach() {
|
|
||||||
let s = sample_script();
|
|
||||||
assert!(s.contains("read -rn 1 -s"));
|
|
||||||
assert!(s.contains("tmux detach-client"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_sets_up_status_bar() {
|
|
||||||
let s = sample_script();
|
|
||||||
assert!(s.contains("status on"));
|
|
||||||
assert!(s.contains("beefy"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn worker_name_truncated_at_20_chars() {
|
|
||||||
let s = build_run_sh("p-test", "/j", "/w", "a-very-long-worker-name-here");
|
|
||||||
// "a-very-long-worker-name-here" is 28 chars; only first 20 should appear.
|
|
||||||
assert!(s.contains("a-very-long-worker-n"));
|
|
||||||
assert!(!s.contains("a-very-long-worker-name-here"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
+14
-6
@@ -20,14 +20,22 @@ pub fn execute(job_id: &str) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let worker = cfg.resolve_worker(Some(&job.worker))?;
|
let worker = cfg.resolve_worker(Some(&job.worker))?;
|
||||||
let session = format!("p-{}", job.short_id());
|
|
||||||
let sid = job.short_id().to_string();
|
let sid = job.short_id().to_string();
|
||||||
|
|
||||||
// Kill the tmux session. This terminates the job process and run.sh.
|
// Read the PID file and kill the process
|
||||||
ssh::run_capture(
|
// Use a subshell that always succeeds to avoid SSH errors
|
||||||
worker,
|
let kill_cmd = format!(
|
||||||
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session),
|
"PID=$(cat ~/.p/jobs/{}/pid 2>/dev/null); \
|
||||||
)?;
|
if [ -n \"$PID\" ]; then \
|
||||||
|
kill \"$PID\" 2>/dev/null; \
|
||||||
|
sleep 1; \
|
||||||
|
kill -9 \"$PID\" 2>/dev/null; \
|
||||||
|
fi; \
|
||||||
|
echo 15 > ~/.p/jobs/{}/exitcode; \
|
||||||
|
date +%s > ~/.p/jobs/{}/finished_at",
|
||||||
|
job.id, job.id, job.id
|
||||||
|
);
|
||||||
|
ssh::run_capture(worker, &kill_cmd)?;
|
||||||
|
|
||||||
db::save(&Job {
|
db::save(&Job {
|
||||||
status: JobStatus::Stopped,
|
status: JobStatus::Stopped,
|
||||||
|
|||||||
@@ -33,6 +33,17 @@ pub fn execute(connection: &str, name: Option<&str>) -> Result<()> {
|
|||||||
|
|
||||||
config::save(&cfg)?;
|
config::save(&cfg)?;
|
||||||
|
|
||||||
|
// Check that the worker is reachable
|
||||||
|
let worker = cfg.get_worker(&name).unwrap();
|
||||||
|
print!("Checking worker connectivity... ");
|
||||||
|
if ssh::is_reachable(worker) {
|
||||||
|
println!("ok");
|
||||||
|
} else {
|
||||||
|
println!();
|
||||||
|
println!("note: could not reach '{}' (worker may be offline)", name);
|
||||||
|
println!(" run 'p worker ls --check' once it's available");
|
||||||
|
}
|
||||||
|
|
||||||
if first {
|
if first {
|
||||||
println!("Registered '{}' and set as default worker.", name);
|
println!("Registered '{}' and set as default worker.", name);
|
||||||
} else {
|
} else {
|
||||||
@@ -40,29 +51,5 @@ pub fn execute(connection: &str, name: Option<&str>) -> Result<()> {
|
|||||||
println!("Run 'p worker default {}' to make it the default.", name);
|
println!("Run 'p worker default {}' to make it the default.", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the worker has the tools p needs.
|
|
||||||
let worker = cfg.get_worker(&name).unwrap();
|
|
||||||
print!("Checking worker dependencies... ");
|
|
||||||
match ssh::check_dependencies(worker) {
|
|
||||||
Ok(ref missing) if missing.is_empty() => println!("ok"),
|
|
||||||
Ok(missing) => {
|
|
||||||
println!();
|
|
||||||
println!(
|
|
||||||
"warning: '{}' is missing required tools: {}",
|
|
||||||
name,
|
|
||||||
missing.join(", ")
|
|
||||||
);
|
|
||||||
println!(" install them and re-register, or jobs will fail at launch");
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
println!();
|
|
||||||
println!(
|
|
||||||
"note: could not reach '{}' to check dependencies (worker may be offline)",
|
|
||||||
name
|
|
||||||
);
|
|
||||||
println!(" run 'p worker ls --check' once it's available");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,7 +56,6 @@ fn main() -> Result<()> {
|
|||||||
let cli = cli::Cli::parse();
|
let cli = cli::Cli::parse();
|
||||||
match cli.command {
|
match cli.command {
|
||||||
cli::Command::Ls { all } => commands::ls::execute(all),
|
cli::Command::Ls { all } => commands::ls::execute(all),
|
||||||
cli::Command::Attach { job_id } => commands::attach::execute(&job_id),
|
|
||||||
cli::Command::Logs { job_id, follow } => commands::logs::execute(&job_id, follow),
|
cli::Command::Logs { job_id, follow } => commands::logs::execute(&job_id, follow),
|
||||||
cli::Command::Stop { job_id } => commands::stop::execute(&job_id),
|
cli::Command::Stop { job_id } => commands::stop::execute(&job_id),
|
||||||
cli::Command::Pull {
|
cli::Command::Pull {
|
||||||
|
|||||||
+4
-37
@@ -30,8 +30,8 @@ pub fn parse_connection(conn: &str) -> (String, Option<u16>) {
|
|||||||
/// Extract the bare hostname from a connection string (used as a default worker name).
|
/// Extract the bare hostname from a connection string (used as a default worker name).
|
||||||
pub fn hostname_from_connection(conn: &str) -> String {
|
pub fn hostname_from_connection(conn: &str) -> String {
|
||||||
let (user_host, _port) = parse_connection(conn);
|
let (user_host, _port) = parse_connection(conn);
|
||||||
match user_host.rsplit_once('@') {
|
match user_host.rfind('@') {
|
||||||
Some((_user, host)) => host.to_string(),
|
Some(i) => user_host[i + 1..].to_string(),
|
||||||
None => user_host,
|
None => user_host,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -74,26 +74,6 @@ fn run_with_timeout(worker: &WorkerConfig, remote_cmd: &str, timeout_secs: u32)
|
|||||||
Ok(String::from_utf8_lossy(&out.stdout).into_owned())
|
Ok(String::from_utf8_lossy(&out.stdout).into_owned())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run a command over SSH with an interactive terminal (inherits stdin/stdout/stderr).
|
|
||||||
///
|
|
||||||
/// Forces `TERM=xterm-256color` on the remote to avoid failures with terminal
|
|
||||||
/// emulators whose terminfo entry is not installed on the worker (e.g. ghostty,
|
|
||||||
/// kitty). The user's actual emulator capabilities are unaffected — only the
|
|
||||||
/// terminfo lookup on the remote side changes.
|
|
||||||
pub fn run_interactive(
|
|
||||||
worker: &WorkerConfig,
|
|
||||||
remote_cmd: &str,
|
|
||||||
) -> Result<std::process::ExitStatus> {
|
|
||||||
let mut args = vec!["-t".to_string()];
|
|
||||||
args.extend(ssh_args(worker));
|
|
||||||
args.push(format!("TERM=xterm-256color {}", remote_cmd));
|
|
||||||
|
|
||||||
Command::new("ssh")
|
|
||||||
.args(&args)
|
|
||||||
.status()
|
|
||||||
.context("failed to spawn ssh")
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Run a remote command and stream its output to the local terminal via a plain
|
/// Run a remote command and stream its output to the local terminal via a plain
|
||||||
/// SSH pipe (no PTY). Use this for output streaming where interactive terminal
|
/// SSH pipe (no PTY). Use this for output streaming where interactive terminal
|
||||||
/// features are not needed. Ctrl+C on the client kills ssh and the remote process.
|
/// features are not needed. Ctrl+C on the client kills ssh and the remote process.
|
||||||
@@ -147,7 +127,7 @@ pub fn is_reachable(worker: &WorkerConfig) -> bool {
|
|||||||
|
|
||||||
// ── Job-status helpers ────────────────────────────────────────────────────────
|
// ── Job-status helpers ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
/// Read the exit code written by run.sh on the worker, if the job has finished.
|
/// Read the exit code written by the job on the worker, if the job has finished.
|
||||||
pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
|
pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
|
||||||
let cmd = format!("cat ~/.p/jobs/{}/exitcode 2>/dev/null", job_id);
|
let cmd = format!("cat ~/.p/jobs/{}/exitcode 2>/dev/null", job_id);
|
||||||
run_capture(worker, &cmd)
|
run_capture(worker, &cmd)
|
||||||
@@ -155,7 +135,7 @@ pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
|
|||||||
.and_then(|s| s.trim().parse().ok())
|
.and_then(|s| s.trim().parse().ok())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read the finish timestamp written by run.sh on the worker, if the job has finished.
|
/// Read the finish timestamp written by the job on the worker, if the job has finished.
|
||||||
pub fn read_job_finished_at(worker: &WorkerConfig, job_id: &str) -> Option<i64> {
|
pub fn read_job_finished_at(worker: &WorkerConfig, job_id: &str) -> Option<i64> {
|
||||||
let cmd = format!("cat ~/.p/jobs/{}/finished_at 2>/dev/null", job_id);
|
let cmd = format!("cat ~/.p/jobs/{}/finished_at 2>/dev/null", job_id);
|
||||||
run_capture(worker, &cmd)
|
run_capture(worker, &cmd)
|
||||||
@@ -214,19 +194,6 @@ pub fn poll_jobs(
|
|||||||
Ok(map)
|
Ok(map)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check that the required worker-side tools (tmux, base64) are installed.
|
|
||||||
/// Returns the names of any missing tools. Uses a 5-second timeout so
|
|
||||||
/// `p worker register` doesn't hang on offline workers.
|
|
||||||
pub fn check_dependencies(worker: &WorkerConfig) -> Result<Vec<String>> {
|
|
||||||
let script = "missing=''; \
|
|
||||||
command -v tmux >/dev/null 2>&1 || missing=\"$missing tmux\"; \
|
|
||||||
command -v base64 >/dev/null 2>&1 || missing=\"$missing base64\"; \
|
|
||||||
printf '%s' \"$missing\"";
|
|
||||||
|
|
||||||
let out = run_with_timeout(worker, script, 5)?;
|
|
||||||
Ok(out.split_whitespace().map(|s| s.to_string()).collect())
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Tests ─────────────────────────────────────────────────────────────────────
|
// ── Tests ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
Reference in New Issue
Block a user