refactor for "p worker" subcommand
This commit is contained in:
31
p/src/cli.rs
31
p/src/cli.rs
@@ -26,7 +26,7 @@ pub enum Command {
|
||||
/// List all jobs across all workers
|
||||
Ls,
|
||||
|
||||
/// Re-attach to the console of a running job
|
||||
/// Re-attach to the tmux session of a running job
|
||||
Attach {
|
||||
/// Job ID or unambiguous prefix
|
||||
job_id: String,
|
||||
@@ -66,7 +66,23 @@ pub enum Command {
|
||||
force: bool,
|
||||
},
|
||||
|
||||
/// Register a remote worker
|
||||
/// Manage registered workers
|
||||
Worker {
|
||||
#[command(subcommand)]
|
||||
command: WorkerCommand,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
pub enum WorkerCommand {
|
||||
/// List registered workers
|
||||
Ls {
|
||||
/// Also probe reachability over SSH (slow)
|
||||
#[arg(short, long)]
|
||||
check: bool,
|
||||
},
|
||||
|
||||
/// Register a new worker
|
||||
Register {
|
||||
/// SSH connection string: user@host, user@host:port, or an SSH config alias
|
||||
connection: String,
|
||||
@@ -75,16 +91,15 @@ pub enum Command {
|
||||
name: Option<String>,
|
||||
},
|
||||
|
||||
/// List registered workers
|
||||
Workers {
|
||||
/// Check reachability of each worker over SSH
|
||||
#[arg(short, long)]
|
||||
check: bool,
|
||||
/// Unregister a worker
|
||||
Rm {
|
||||
/// Worker name
|
||||
name: String,
|
||||
},
|
||||
|
||||
/// Set the default worker
|
||||
Default {
|
||||
/// Worker name
|
||||
worker: String,
|
||||
name: String,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1,5 +1,142 @@
|
||||
use anyhow::Result;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{
|
||||
config, db,
|
||||
job::{Job, JobStatus},
|
||||
ssh,
|
||||
};
|
||||
|
||||
pub fn execute() -> Result<()> {
|
||||
anyhow::bail!("not yet implemented")
|
||||
let cfg = config::load()?;
|
||||
let mut jobs = db::list()?;
|
||||
|
||||
if jobs.is_empty() {
|
||||
println!("No jobs yet. Run 'p -- <command>' to start one.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// ── Reconcile running jobs ────────────────────────────────────────────────
|
||||
//
|
||||
// Group running jobs by worker, then make one SSH call per worker to
|
||||
// check all of them at once. Update and persist any that have finished.
|
||||
|
||||
let mut by_worker: HashMap<String, Vec<usize>> = HashMap::new();
|
||||
for (i, job) in jobs.iter().enumerate() {
|
||||
if job.status == JobStatus::Running {
|
||||
by_worker.entry(job.worker.clone()).or_default().push(i);
|
||||
}
|
||||
}
|
||||
|
||||
for (worker_name, indices) in &by_worker {
|
||||
let Some(worker_cfg) = cfg.get_worker(worker_name) else {
|
||||
// Worker was deregistered; mark jobs unknown.
|
||||
for &i in indices {
|
||||
jobs[i].status = JobStatus::Unknown;
|
||||
}
|
||||
continue;
|
||||
};
|
||||
|
||||
let ids: Vec<&str> = indices.iter().map(|&i| jobs[i].id.as_str()).collect();
|
||||
|
||||
match ssh::poll_jobs(worker_cfg, &ids) {
|
||||
Ok(results) => {
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
for &i in indices {
|
||||
let id = jobs[i].id.clone();
|
||||
if let Some(maybe_ec) = results.get(&id) {
|
||||
if let Some(ec) = maybe_ec {
|
||||
jobs[i].status = if *ec == 0 {
|
||||
JobStatus::Done
|
||||
} else {
|
||||
JobStatus::Failed
|
||||
};
|
||||
jobs[i].exit_code = Some(*ec);
|
||||
jobs[i].finished_at = Some(now);
|
||||
db::save(&jobs[i])?;
|
||||
}
|
||||
// None means still running — no update needed.
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// Worker unreachable; show unknown rather than stale "running".
|
||||
for &i in indices {
|
||||
jobs[i].status = JobStatus::Unknown;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Display ───────────────────────────────────────────────────────────────
|
||||
|
||||
print_table(&jobs);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_table(jobs: &[Job]) {
|
||||
// Compute column widths from content, with a minimum equal to the header.
|
||||
let id_w = 8; // always show 8-char short IDs
|
||||
let worker_w = jobs
|
||||
.iter()
|
||||
.map(|j| j.worker.len())
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
.max(6); // "WORKER"
|
||||
let cwd_w = jobs
|
||||
.iter()
|
||||
.map(|j| j.cwd_display().len().min(24))
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
.max(3); // "CWD"
|
||||
let cmd_w = jobs
|
||||
.iter()
|
||||
.map(|j| j.command_display(24).len())
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
.max(7); // "COMMAND"
|
||||
let status_w = jobs
|
||||
.iter()
|
||||
.map(|j| j.status_display().len())
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
.max(6); // "STATUS"
|
||||
|
||||
// Header
|
||||
println!(
|
||||
"{:<id_w$} {:<worker_w$} {:<cwd_w$} {:<cmd_w$} {:<status_w$} DURATION",
|
||||
"ID", "WORKER", "CWD", "COMMAND", "STATUS",
|
||||
);
|
||||
// Separator
|
||||
println!(
|
||||
"{:<id_w$} {:<worker_w$} {:<cwd_w$} {:<cmd_w$} {:<status_w$} --------",
|
||||
"-".repeat(id_w),
|
||||
"-".repeat(worker_w),
|
||||
"-".repeat(cwd_w),
|
||||
"-".repeat(cmd_w),
|
||||
"-".repeat(status_w),
|
||||
);
|
||||
|
||||
for job in jobs {
|
||||
let cwd = truncate(&job.cwd_display(), cwd_w);
|
||||
let cmd = truncate(&job.command_display(cmd_w), cmd_w);
|
||||
println!(
|
||||
"{:<id_w$} {:<worker_w$} {:<cwd_w$} {:<cmd_w$} {:<status_w$} {}",
|
||||
job.short_id(),
|
||||
job.worker,
|
||||
cwd,
|
||||
cmd,
|
||||
job.status_display(),
|
||||
job.duration_display(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Truncate a string to `max` display characters, appending `…` if needed.
|
||||
fn truncate(s: &str, max: usize) -> String {
|
||||
if s.len() > max {
|
||||
format!("{}…", &s[..max.saturating_sub(1)])
|
||||
} else {
|
||||
s.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,7 @@ pub mod attach;
|
||||
pub mod logs;
|
||||
pub mod ls;
|
||||
pub mod pull;
|
||||
pub mod register;
|
||||
pub mod rm;
|
||||
pub mod run;
|
||||
pub mod set_default;
|
||||
pub mod stop;
|
||||
pub mod workers;
|
||||
pub mod worker;
|
||||
|
||||
4
p/src/commands/worker/mod.rs
Normal file
4
p/src/commands/worker/mod.rs
Normal file
@@ -0,0 +1,4 @@
|
||||
pub mod ls;
|
||||
pub mod register;
|
||||
pub mod rm;
|
||||
pub mod set_default;
|
||||
6
p/src/commands/worker/rm.rs
Normal file
6
p/src/commands/worker/rm.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
use anyhow::Result;
|
||||
|
||||
pub fn execute(name: &str) -> Result<()> {
|
||||
let _ = name;
|
||||
anyhow::bail!("not yet implemented")
|
||||
}
|
||||
@@ -63,10 +63,13 @@ fn main() -> Result<()> {
|
||||
local_dest,
|
||||
} => commands::pull::execute(&job_id, &remote_path, local_dest.as_deref()),
|
||||
cli::Command::Rm { job_id, force } => commands::rm::execute(&job_id, force),
|
||||
cli::Command::Register { connection, name } => {
|
||||
commands::register::execute(&connection, name.as_deref())
|
||||
cli::Command::Worker { command } => match command {
|
||||
cli::WorkerCommand::Ls { check } => commands::worker::ls::execute(check),
|
||||
cli::WorkerCommand::Register { connection, name } => {
|
||||
commands::worker::register::execute(&connection, name.as_deref())
|
||||
}
|
||||
cli::Command::Workers { check } => commands::workers::execute(check),
|
||||
cli::Command::Default { worker } => commands::set_default::execute(&worker),
|
||||
cli::WorkerCommand::Rm { name } => commands::worker::rm::execute(&name),
|
||||
cli::WorkerCommand::Default { name } => commands::worker::set_default::execute(&name),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user