Compare commits

..

2 Commits

Author SHA1 Message Date
vhaudiquet 06c1d81190 feat: switch from tmux to simple nohup for remote jobs
CI / Check, test, lint (push) Successful in 28s
- Remove tmux-based execution; jobs now run via nohup + shell script
- Remove p-agent daemon from workspace
- Remove `p attach` command (use `p logs -f` instead)
- Add `-d/--detach` flag to start job without streaming
- Stream output via SSH tail -F
- Ctrl+C detaches from stream; job keeps running
- `p stop` kills via PID file
- Worker requirements reduced to just rsync (no tmux needed)
2026-06-06 12:26:58 +02:00
vhaudiquet 92132bc37a spec: update spec
CI / Check, test, lint (push) Failing after 11m1s
- Default `p -- <cmd>` now streams logs via SSH (like tail -f)
   - Ctrl+C detaches from stream; job keeps running on worker
   - Add `-d/--detach` flag to start job without streaming
   - Remove `p attach` command (use `p logs -f` instead)
   - Remove p-agent daemon; jobs launched via nohup over SSH
   - Simplify worker requirements: only rsync needed (no tmux, no agent)
   - Jobs managed via ad-hoc SSH: kill $(cat pid), tail -f output.log
2026-06-06 02:12:53 +02:00
16 changed files with 204 additions and 467 deletions
Generated
+7 -14
View File
@@ -73,12 +73,6 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "base64"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "2.11.1" version = "2.11.1"
@@ -476,7 +470,6 @@ name = "p"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64",
"chrono", "chrono",
"clap", "clap",
"ctrlc", "ctrlc",
@@ -484,16 +477,10 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"serde_yaml", "serde_yaml",
"shell-words",
"uuid", "uuid",
] ]
[[package]]
name = "p-agent"
version = "0.1.0"
dependencies = [
"anyhow",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.17" version = "0.2.17"
@@ -619,6 +606,12 @@ dependencies = [
"unsafe-libyaml", "unsafe-libyaml",
] ]
[[package]]
name = "shell-words"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc6fe69c597f9c37bfeeeeeb33da3530379845f10be461a66d16d03eca2ded77"
[[package]] [[package]]
name = "shlex" name = "shlex"
version = "1.3.0" version = "1.3.0"
+1 -1
View File
@@ -1,3 +1,3 @@
[workspace] [workspace]
members = ["p", "agent"] members = ["p"]
resolver = "2" resolver = "2"
+51 -89
View File
@@ -1,14 +1,15 @@
# `p` — push jobs to worker # `p` — push jobs to worker
A small Rust CLI utility to push command-line jobs to remote worker machines, A small Rust CLI utility to push command-line jobs to remote worker machines,
with directory sync, job management, and attach/detach support. with directory sync, job management, and log streaming.
## Motivation ## Motivation
The common developer workflow of "run this build/test/script on a more powerful The common developer workflow of "run this build/test/script on a more powerful
remote machine" currently requires manually chaining `rsync`, `ssh`, and `tmux`. remote machine" currently requires manually chaining `rsync` and `ssh` with
a way to keep the job alive in the background (e.g. `nohup`, `tmux`).
`p` wraps that entire flow into a single ergonomic command, while adding proper `p` wraps that entire flow into a single ergonomic command, while adding proper
job tracking, log capture, and attach/detach mechanics. job tracking and log capture.
## Core Concepts ## Core Concepts
@@ -26,13 +27,6 @@ directory. Each job has:
- Start time, end time, exit code - Start time, end time, exit code
- Captured output log - Captured output log
### p-agent
A small Rust (?) binary, automatically uploaded and started by `p` on first use
of a worker. It manages job lifecycle, log capture, and status tracking on
the worker side. The user never manually installs or configures it.
`p` checks the agent version on each connection and re-uploads if outdated.
Communication happens over SSH port forwarding — no extra open ports needed.
## CLI Reference ## CLI Reference
### Running jobs ### Running jobs
@@ -41,14 +35,19 @@ Communication happens over SSH port forwarding — no extra open ports needed.
p -- <command> p -- <command>
``` ```
Sync the current directory to the default worker and run `<command>` on it. Sync the current directory to the default worker and run `<command>` on it.
Attaches to the job's tmux session immediately. `Ctrl+B D` detaches without Streams the job's output directly to the terminal (like `tail -f`). This feels
killing the job. `Ctrl+C` sends SIGINT to the running process (standard behavior). like running the command locally.
When the job finishes, the session stays open and displays: - `Ctrl+C` detaches from the log stream — the job keeps running on the worker.
Use `p logs -f <job-id>` to resume watching.
- Use `p stop <job-id>` to kill a running job.
- If the network connection drops, the job keeps running on the worker.
Use `p logs -f <job-id>` to resume watching.
When the job finishes, `p` prints the exit code and exits:
``` ```
--- Job done [exit 0]. Press any key to detach. --- [Job done: exit 0]
``` ```
This lets the user read final output before returning to their shell.
``` ```
p <worker> -- <command> p <worker> -- <command>
@@ -59,7 +58,14 @@ Same, but targets a specific named worker.
p [-n | --no-sync] -- <command> p [-n | --no-sync] -- <command>
``` ```
Run `<command>` on the worker without syncing the current directory first. Run `<command>` on the worker without syncing the current directory first.
Useful for commands that need no local files (e.g. `p -n -- htop`). Useful for commands that need no local files.
```
p [-d | --detach] -- <command>
```
Run `<command>` and immediately detach — do not stream output to the terminal.
The job starts on the worker and `p` prints the job ID. Useful for fire-and-forget
jobs. Use `p logs -f <job-id>` to watch later.
### Job management ### Job management
@@ -71,19 +77,12 @@ completed jobs (done, failed, stopped).
Shows: ID (short), worker, original CWD, command, status, duration. Shows: ID (short), worker, original CWD, command, status, duration.
Style inspired by `docker ps` / `lxc list`. Style inspired by `docker ps` / `lxc list`.
```
p attach <job-id>
```
Re-attach to the tmux session of a running job. Supports partial IDs.
Behaves identically to the initial attach: `Ctrl+B D` detaches, and if the job
has already finished the "press any key" screen is shown.
Only works on **running** jobs. For finished jobs, use `p logs`.
``` ```
p logs <job-id> p logs <job-id>
``` ```
Print the captured output of a job (running or finished). Supports `-f` to Print the captured output of a job (running or finished). Supports `-f` to
follow a running job's output without attaching to its TTY. follow a running job's output in real-time. `Ctrl+C` detaches without stopping
the job.
``` ```
p stop <job-id> p stop <job-id>
@@ -148,62 +147,40 @@ Set the default worker.
- No automatic sync-back after job completion. Use `p pull` to retrieve - No automatic sync-back after job completion. Use `p pull` to retrieve
specific artifacts. specific artifacts.
## Attach / Detach Mechanics ## Execution Model
Jobs run inside a `tmux` session on the worker. `p` attaches to the session No persistent agent daemon is needed. Jobs are launched and managed via
immediately after starting the job. ad-hoc SSH commands:
### Status bar 1. `p -- <command>` syncs the directory, then runs via SSH:
The tmux session has a custom status bar showing:
``` ```
p-<short-id> beefy make [running] 0:02:14 nohup sh -c '<command> 2>&1 | tee output.log; echo $? > exitcode' & echo $! > pid
``` ```
Fields: job short-ID, worker name, command (truncated), status, elapsed time. 2. The client streams `output.log` in real-time over a separate SSH connection.
3. `Ctrl+C` closes the SSH stream — the job keeps running.
4. `p stop <job-id>` runs `kill $(cat pid)` over SSH.
5. `p logs -f <job-id>` tails the log file over SSH.
6. `p ls` reads the local job DB and SSH-polls to reconcile state when needed.
### Key bindings while attached > **Worker requirements:** `rsync` must be available on the worker.
| Key | Effect |
|---|---|
| `Ctrl+B D` | Detach from session. Job keeps running. |
| `Ctrl+C` | Sends SIGINT to the foreground process (standard terminal behavior). |
### On job completion
When the job's process exits, `run.sh` writes the exit code and then displays:
```
--- Job done [exit 0]. Press any key to detach. ---
```
The tmux session stays open (`remain-on-exit on` for the window) so the user
can scroll through final output. Pressing any key detaches the client and
returns to the local shell. `p` then reads the exit code and prints a summary.
### `p attach` on a finished job
If the job has already finished and the tmux session is still open (user has
not yet pressed a key), `p attach` reconnects to the "press any key" screen.
Once the key is pressed, the session closes. For a fully-closed session, use
`p logs` instead.
> **Worker requirements:** `tmux` and `rsync` must be available on the worker
> (standard on most Linux systems). The `p-agent` binary is auto-uploaded by `p`.
--- ---
## Job Status & Notification ## Job Status & Tracking
The **p-agent** runs as a lightweight background process on the worker The client maintains a local job database (`~/.local/share/p/jobs/<uuid>.json`).
(started automatically, not a system service). It: `p ls` reads from this local store for fast output.
- Manages job launch and tmux session creation ### State reconciliation
- Tees output to `output.log` When a job is running, the client periodically checks if `exitcode` exists on
- Writes `exitcode` on completion the worker. If the client was offline or the connection dropped, the next
- Notifies the client over the SSH reverse tunnel when a job finishes `p ls` SSH-polls workers to reconcile state. Jobs with unknown status are
marked accordingly.
The client maintains a local job database (`~/.local/share/p/jobs/<uuid>.json`) ### Connection drops during streaming
mirroring job state. `p ls` reads from this local store (fast, no SSH), If the SSH connection drops while `p` is streaming output, `p` exits with an
updated in real time while attached, and via agent notifications otherwise. error message showing the job ID. The job continues running on the worker.
Resume watching with `p logs -f <job-id>`.
### Degraded mode (agent unreachable / client was offline)
If the client missed a completion notification, `p ls` marks affected jobs as
`unknown`. The next `p ls` SSH-polls all workers with known-running jobs to
reconcile state.
## Worker-side Layout ## Worker-side Layout
@@ -211,8 +188,6 @@ All data lives under `~/.p/` on the worker (no root access required).
``` ```
~/.p/ ~/.p/
bin/
p-agent # auto-uploaded by p, versioned
jobs/ jobs/
<uuid>/ <uuid>/
cmd # command string cmd # command string
@@ -221,7 +196,8 @@ All data lives under `~/.p/` on the worker (no root access required).
started_at # unix timestamp started_at # unix timestamp
output.log # combined stdout+stderr, always captured output.log # combined stdout+stderr, always captured
exitcode # written on completion; absent = still running exitcode # written on completion; absent = still running
tmux_session # tmux session name (e.g. "p-<short-uuid>") pid # process ID of the running job
workdirs/ workdirs/
<uuid>/ # rsync'd copy of client CWD for this job <uuid>/ # rsync'd copy of client CWD for this job
``` ```
@@ -252,23 +228,9 @@ b004f123 cloud ~/scripts ./bench.sh done [1] 0:00:47
## Open Questions ## Open Questions
- **Worker arch detection**: `p-agent` must be compiled for the worker's
architecture. Options: (a) ship common targets and detect via SSH, (b)
compile on the worker if a Rust toolchain is present, (c) require user to
specify arch in worker config.
Maybe we could also implement the agent core in the form of a shell script?
At least the entry point, which could do some detection and install or something.
We will see on implementation what works... Small rust binary seems nice,
but we want support for amd64, aarch64 and riscv64.
- **Multiple jobs from the same CWD**: each gets its own `workdirs/<uuid>/`, - **Multiple jobs from the same CWD**: each gets its own `workdirs/<uuid>/`,
so they're fully isolated. This may use significant disk space — `p rm` so they're fully isolated. This may use significant disk space — `p rm`
should prompt to clean up. should prompt to clean up.
- **Non-Linux workers**: tmux availability and path conventions may differ on - **Non-Linux workers**: path conventions may differ on macOS workers. Out of
macOS workers. Out of scope for now. scope for now.
- **Ctrl+C → detach** (future): it would be nicer if Ctrl+C detached the
session instead of sending SIGINT to the job, matching the spirit of the
tool. This requires per-session tmux key table configuration and is deferred.
-11
View File
@@ -1,11 +0,0 @@
[package]
name = "p-agent"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "p-agent"
path = "src/main.rs"
[dependencies]
anyhow = "1"
-4
View File
@@ -1,4 +0,0 @@
fn main() {
println!("p-agent — worker-side agent for p");
println!("(not yet implemented)");
}
+2 -2
View File
@@ -22,7 +22,7 @@ dirs = "5"
uuid = { version = "1", features = ["v4"] } uuid = { version = "1", features = ["v4"] }
# Timestamps and duration display # Timestamps and duration display
chrono = "0.4" chrono = "0.4"
# Encoding commands passed through SSH → tmux → bash # Shell escaping for commands
base64 = "0.22" shell-words = "1"
# Ctrl+C handling (detach message) # Ctrl+C handling (detach message)
ctrlc = "3" ctrlc = "3"
+1 -7
View File
@@ -5,7 +5,7 @@ use clap::{Parser, Subcommand};
name = "p", name = "p",
about = "Push jobs to remote worker machines", about = "Push jobs to remote worker machines",
long_about = "\ long_about = "\
Push jobs to remote worker machines with directory sync and attach/detach support. Push jobs to remote worker machines with directory sync and log streaming.
Run a command on the default worker: Run a command on the default worker:
p -- <command> p -- <command>
@@ -33,12 +33,6 @@ pub enum Command {
all: bool, all: bool,
}, },
/// Re-attach to the tmux session of a running job
Attach {
/// Job ID or unambiguous prefix
job_id: String,
},
/// Print captured output of a job (running or finished) /// Print captured output of a job (running or finished)
Logs { Logs {
/// Job ID or unambiguous prefix /// Job ID or unambiguous prefix
-97
View File
@@ -1,97 +0,0 @@
use anyhow::{Context, Result};
use chrono::Utc;
use crate::{
config, db,
job::{Job, JobStatus},
ssh,
};
pub fn execute(job_id: &str) -> Result<()> {
let cfg = config::load()?;
let job = db::find(job_id)?.with_context(|| format!("job '{}' not found", job_id))?;
// Allow attach on Running (normal) and Unknown (worker was temporarily
// unreachable — the job may still be live).
match job.status {
JobStatus::Running | JobStatus::Unknown => {}
_ => anyhow::bail!(
"job {} is not running (status: {})\n\
use 'p logs {}' to view its output",
job.short_id(),
job.status.as_str(),
job.short_id(),
),
}
let worker = cfg.resolve_worker(Some(&job.worker))?.clone();
let session = format!("p-{}", job.short_id());
let sid = job.short_id().to_string();
{
let sid = sid.clone();
ctrlc::set_handler(move || {
eprintln!(
"\nDetached. Use 'p attach {sid}' to re-attach or 'p logs {sid}' to view output."
);
std::process::exit(0);
})
.ok();
}
let attach_status = ssh::run_interactive(&worker, &format!("tmux attach -t '{}'", session))?;
// If tmux exited non-zero the session doesn't exist on the worker.
if !attach_status.success() {
let exit_code = ssh::read_job_exitcode(&worker, &job.id);
if exit_code.is_none() {
// No exit code either — the job was lost without completing
// (e.g. worker restarted, tmux server killed).
eprintln!(
"error: tmux session for job {} no longer exists on '{}'.",
sid, worker.name
);
eprintln!("The job was likely interrupted (worker restart or tmux server exit).");
eprintln!("Use 'p logs {}' to see whatever output was captured.", sid);
db::save(&Job {
status: JobStatus::Unknown,
..job
})?;
return Ok(());
}
// Exit code exists — job finished cleanly but the session was already
// cleaned up. Fall through to normal reconciliation.
}
let exit_code = ssh::read_job_exitcode(&worker, &job.id);
let finished_at =
ssh::read_job_finished_at(&worker, &job.id).unwrap_or_else(|| Utc::now().timestamp());
if let Some(ec) = exit_code {
ssh::run_capture(
&worker,
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session),
)
.ok();
db::save(&Job {
status: if ec == 0 {
JobStatus::Done
} else {
JobStatus::Failed
},
exit_code: Some(ec),
finished_at: Some(finished_at),
..job
})?;
eprintln!("Job {} finished with exit code {}.", sid, ec);
} else {
eprintln!(
"Detached from job {}. Use 'p attach {}' to re-attach.",
sid, sid
);
}
Ok(())
}
+49 -3
View File
@@ -1,6 +1,11 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::Utc;
use crate::{config, db, job::JobStatus, ssh}; use crate::{
config, db,
job::{Job, JobStatus},
ssh,
};
pub fn execute(job_id: &str, follow: bool) -> Result<()> { pub fn execute(job_id: &str, follow: bool) -> Result<()> {
let cfg = config::load()?; let cfg = config::load()?;
@@ -19,10 +24,12 @@ pub fn execute(job_id: &str, follow: bool) -> Result<()> {
job.short_id() job.short_id()
); );
print_log(worker, &log)?; print_log(worker, &log)?;
reconcile_and_print_exitcode(worker, &job)?;
} else { } else {
// Stream live output and exit automatically when the job finishes. // Stream live output and exit automatically when the job finishes.
// //
// The script backgrounds `tail -f` (so its stdout still flows to the // Use -F (--follow=name) to handle the file not existing yet.
// The script backgrounds `tail -F` (so its stdout still flows to the
// client through the SSH pipe), then polls for the exitcode file that // client through the SSH pipe), then polls for the exitcode file that
// run.sh writes the moment the job exits. A 1-second sleep after // run.sh writes the moment the job exits. A 1-second sleep after
// detecting the exitcode file gives tail time to drain any bytes that // detecting the exitcode file gives tail time to drain any bytes that
@@ -33,7 +40,7 @@ pub fn execute(job_id: &str, follow: bool) -> Result<()> {
// shell, terminating the whole script including the tail background job. // shell, terminating the whole script including the tail background job.
let exitcode_path = format!("~/.p/jobs/{}/exitcode", job.id); let exitcode_path = format!("~/.p/jobs/{}/exitcode", job.id);
let follow_cmd = format!( let follow_cmd = format!(
"tail -n +1 -f {log} & \ "tail -n +1 -F {log} & \
TAIL_PID=$!; \ TAIL_PID=$!; \
while ! [ -f {exitcode} ]; do sleep 1; done; \ while ! [ -f {exitcode} ]; do sleep 1; done; \
sleep 1; \ sleep 1; \
@@ -43,9 +50,16 @@ pub fn execute(job_id: &str, follow: bool) -> Result<()> {
exitcode = exitcode_path, exitcode = exitcode_path,
); );
ssh::run_output(worker, &follow_cmd)?; ssh::run_output(worker, &follow_cmd)?;
// Reconcile state after streaming ends
reconcile_and_print_exitcode(worker, &job)?;
} }
} else { } else {
print_log(worker, &log)?; print_log(worker, &log)?;
if job.status == JobStatus::Running {
// Check if job has finished
reconcile_and_print_exitcode(worker, &job)?;
}
} }
Ok(()) Ok(())
@@ -59,3 +73,35 @@ fn print_log(worker: &crate::config::WorkerConfig, log: &str) -> Result<()> {
print!("{}", out); print!("{}", out);
Ok(()) Ok(())
} }
/// Check if the job has finished and update local state. Print exit code if done.
fn reconcile_and_print_exitcode(worker: &crate::config::WorkerConfig, job: &Job) -> Result<()> {
if job.status != JobStatus::Running {
// Already finished
if let Some(ec) = job.exit_code {
eprintln!("[Job done: exit {}]", ec);
}
return Ok(());
}
let exit_code = ssh::read_job_exitcode(worker, &job.id);
if let Some(ec) = exit_code {
let finished_at =
ssh::read_job_finished_at(worker, &job.id).unwrap_or_else(|| Utc::now().timestamp());
db::save(&Job {
status: if ec == 0 {
JobStatus::Done
} else {
JobStatus::Failed
},
exit_code: Some(ec),
finished_at: Some(finished_at),
..job.clone()
})?;
eprintln!("[Job done: exit {}]", ec);
}
Ok(())
}
-1
View File
@@ -1,4 +1,3 @@
pub mod attach;
pub mod logs; pub mod logs;
pub mod ls; pub mod ls;
pub mod prune; pub mod prune;
+7 -7
View File
@@ -16,14 +16,14 @@ pub fn execute(job_id: &str, force: bool) -> Result<()> {
sid sid
); );
} }
// Kill the tmux session before wiping the files. // Kill the job via PID before wiping the files.
let session = format!("p-{}", sid);
let worker = cfg.resolve_worker(Some(&job.worker))?; let worker = cfg.resolve_worker(Some(&job.worker))?;
ssh::run_capture( let pid_path = format!("~/.p/jobs/{}/pid", job.id);
worker, let kill_cmd = format!(
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session), "PID=$(cat {} 2>/dev/null) && kill \"$PID\" 2>/dev/null",
) pid_path
.ok(); );
ssh::run_capture(worker, &kill_cmd).ok();
} }
// Remove remote job dir and work dir (best-effort — warn if unreachable). // Remove remote job dir and work dir (best-effort — warn if unreachable).
+55 -161
View File
@@ -1,5 +1,4 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use base64::{engine::general_purpose::STANDARD as B64, Engine};
use chrono::Utc; use chrono::Utc;
use std::io::IsTerminal; use std::io::IsTerminal;
use uuid::Uuid; use uuid::Uuid;
@@ -25,7 +24,6 @@ pub fn execute(
let id = Uuid::new_v4().to_string(); let id = Uuid::new_v4().to_string();
let short_id = &id[..8]; let short_id = &id[..8];
let session = format!("p-{}", short_id);
// Resolve the remote home directory once using an absolute path so that // Resolve the remote home directory once using an absolute path so that
// every subsequent call — SSH, rsync, and scripts on the worker — all // every subsequent call — SSH, rsync, and scripts on the worker — all
@@ -78,27 +76,31 @@ pub fn execute(
}; };
db::save(&job)?; db::save(&job)?;
// ── 4. Write files and start the job on the worker ──────────────────────── // ── 4. Write metadata and start the job on the worker ─────────────────────
let cmd_b64 = B64.encode(&cmd_str);
let run_sh = build_run_sh(&session, &job_dir, &work_dir, &worker.name);
let run_sh_b64 = B64.encode(&run_sh);
// Write cmd, started_at, and the run script
let run_sh = build_run_sh(&job_dir, &work_dir, &cmd_str);
let setup = format!( let setup = format!(
"printf '%s' '{cmd_b64}' > {job_dir}/cmd && \ "printf '%s' {} > {job_dir}/cmd && \
printf '%s' '{run_sh_b64}' | base64 -d > {job_dir}/run.sh && \
chmod +x {job_dir}/run.sh && \
date +%s > {job_dir}/started_at && \ date +%s > {job_dir}/started_at && \
tmux new-session -d -s '{session}' {job_dir}/run.sh", touch {job_dir}/output.log && \
job_dir = job_dir, printf '%s' {} > {job_dir}/run.sh && \
cmd_b64 = cmd_b64, chmod +x {job_dir}/run.sh",
run_sh_b64 = run_sh_b64, shell_words::quote(&cmd_str),
session = session, shell_words::quote(&run_sh),
job_dir = job_dir
); );
ssh::run_capture(&worker, &setup).context("failed to write job metadata")?;
ssh::run_capture(&worker, &setup).context("failed to set up job on worker")?; // Start the run script with nohup
// The PID is saved to a file for later termination via `p stop`
let launch_cmd = format!(
"cd {} && nohup {}/run.sh & echo $! > {}/pid",
work_dir, job_dir, job_dir
);
ssh::run_capture(&worker, &launch_cmd).context("failed to start job on worker")?;
// ── 4. Attach or detach ─────────────────────────────────────────────────────── // ── 5. Detach or stream output ────────────────────────────────────────────
if detach { if detach {
// Non-interactive mode: print only the job UUID to stdout and exit. // Non-interactive mode: print only the job UUID to stdout and exit.
@@ -108,41 +110,43 @@ pub fn execute(
return Ok(()); return Ok(());
} }
// Interactive mode: attach to the tmux session. // Interactive mode: stream output.log in real-time.
// run.sh keeps the session alive after the job finishes (via `read`), // Ctrl+C detaches from the stream — the job keeps running.
// so there is no race between job completion and our attach call.
//
// Ctrl+B D detaches cleanly mid-run; the job keeps going in the background.
// Any other key on the "press any key" screen triggers `tmux detach-client`
// from within run.sh — no [exited] or [detached] flash.
let sid = short_id.to_string(); let sid = short_id.to_string();
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
eprintln!( eprintln!(
"\nDetached. Use 'p attach {sid}' to re-attach or 'p logs {sid}' to view output." "\nDetached. Use 'p logs -f {}' to resume watching or 'p stop {}' to kill.",
sid, sid
); );
std::process::exit(0); std::process::exit(0);
}) })
.ok(); .ok();
eprintln!("Job {} started on {}.", short_id, worker.name); eprintln!("Job {} started on {}.", short_id, worker.name);
ssh::run_interactive(&worker, &format!("tmux attach -t '{}'", session))?;
// ── 5. Reconcile status after returning ─────────────────────────────────── // Stream the log file, exiting when the job finishes
// Use -F (--follow=name) to handle the file not existing yet
let log_path = format!("{}/output.log", job_dir);
let exitcode_path = format!("{}/exitcode", job_dir);
let stream_cmd = format!(
"tail -n +1 -F {} & \
TAIL_PID=$!; \
while ! [ -f {} ]; do sleep 1; done; \
sleep 0.5; \
kill \"$TAIL_PID\" 2>/dev/null; \
wait \"$TAIL_PID\" 2>/dev/null",
log_path, exitcode_path
);
ssh::run_output(&worker, &stream_cmd)?;
// ── 6. Reconcile status after returning ───────────────────────────────────
let exit_code = ssh::read_job_exitcode(&worker, &id); let exit_code = ssh::read_job_exitcode(&worker, &id);
let finished_at = let finished_at =
ssh::read_job_finished_at(&worker, &id).unwrap_or_else(|| Utc::now().timestamp()); ssh::read_job_finished_at(&worker, &id).unwrap_or_else(|| Utc::now().timestamp());
if let Some(ec) = exit_code { if let Some(ec) = exit_code {
// Job is done. The session might still be alive if the user Ctrl+B D'd
// from the "press any key" screen — kill it to clean up the lingering `read`.
ssh::run_capture(
&worker,
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session),
)
.ok();
db::save(&Job { db::save(&Job {
status: if ec == 0 { status: if ec == 0 {
JobStatus::Done JobStatus::Done
@@ -154,145 +158,35 @@ pub fn execute(
..job ..job
})?; })?;
eprintln!("Job {} finished with exit code {}.", short_id, ec); eprintln!("[Job done: exit {}]", ec);
} else { } else {
// User detached mid-run (Ctrl+B D). Job is still running on the worker. // Connection dropped or job lost
db::save(&Job { db::save(&Job {
status: JobStatus::Running, status: JobStatus::Unknown,
..job ..job
})?; })?;
eprintln!( eprintln!(
"Detached from job {}. Use 'p attach {}' to re-attach or 'p logs {}' to view output.", "Connection lost. Job {} may still be running. Use 'p logs -f {}' to resume.",
short_id, short_id, short_id short_id, short_id
); );
} }
Ok(()) Ok(())
} }
/// Build the shell script that runs inside the tmux pane. /// Build the shell script that runs on the worker.
/// fn build_run_sh(job_dir: &str, work_dir: &str, cmd_str: &str) -> String {
/// `pub(crate)` so it can be tested.
pub(crate) fn build_run_sh(session: &str, job_dir: &str, work_dir: &str, worker: &str) -> String {
// Truncate worker name for display to avoid overflowing the status bar.
let worker_display = if worker.len() > 20 {
&worker[..20]
} else {
worker
};
format!( format!(
// Shebang + status bar setup r#"#!/bin/bash
"#!/bin/bash\n\ cd {work_dir}
tmux set-option -t '{session}' status on 2>/dev/null\n\ {cmd} 2>&1 | tee {job_dir}/output.log
tmux set-option -t '{session}' status-style 'fg=colour250,bg=colour235' 2>/dev/null\n\ EXIT_CODE=${{PIPESTATUS[0]}}
tmux set-option -t '{session}' status-left-length 60 2>/dev/null\n\ echo "$EXIT_CODE" > {job_dir}/exitcode
tmux set-option -t '{session}' status-right-length 20 2>/dev/null\n\ date +%s > {job_dir}/finished_at
tmux set-option -t '{session}' status-left \ "#,
'#[fg=colour39,bold] {session} #[fg=colour250,nobold]| {worker_display} ' 2>/dev/null\n\
tmux set-option -t '{session}' status-right \
'#[fg=colour250] running ' 2>/dev/null\n\
\n\
# Run the command, tee output to log\n\
cd {work_dir}\n\
cmd=$(base64 -d < {job_dir}/cmd)\n\
bash -c \"$cmd\" 2>&1 | tee {job_dir}/output.log\n\
EXIT_CODE=${{PIPESTATUS[0]}}\n\
echo \"$EXIT_CODE\" > {job_dir}/exitcode\n\
date +%s > {job_dir}/finished_at\n\
\n\
# Update status bar to show result\n\
if [ \"$EXIT_CODE\" -eq 0 ]; then\n\
tmux set-option -t '{session}' status-right \
'#[fg=colour2,bold] done [0] ' 2>/dev/null\n\
else\n\
tmux set-option -t '{session}' status-right \
\"#[fg=colour1,bold] done [$EXIT_CODE] \" 2>/dev/null\n\
fi\n\
\n\
# Keep the session open so the user can read final output\n\
printf '\\n\\033[2m--- done [exit %d] - press any key to detach ---\\033[0m\\n' \
\"$EXIT_CODE\"\n\
read -rn 1 -s\n\
\n\
# Detach cleanly — no [exited] / [detached] flash\n\
tmux detach-client -s '{session}' 2>/dev/null || true\n",
session = session,
worker_display = worker_display,
job_dir = job_dir,
work_dir = work_dir, work_dir = work_dir,
cmd = cmd_str,
job_dir = job_dir
) )
} }
// ── Tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn sample_script() -> String {
build_run_sh(
"p-a3f2b091",
"/home/ubuntu/.p/jobs/a3f2b091-0000-0000-0000-000000000000",
"/home/ubuntu/.p/workdirs/a3f2b091-0000-0000-0000-000000000000",
"beefy",
)
}
#[test]
fn script_starts_with_shebang() {
assert!(sample_script().starts_with("#!/bin/bash\n"));
}
#[test]
fn script_contains_session_name() {
assert!(sample_script().contains("p-a3f2b091"));
}
#[test]
fn script_contains_job_and_work_dirs() {
let s = sample_script();
assert!(s.contains("/home/ubuntu/.p/jobs/a3f2b091"));
assert!(s.contains("/home/ubuntu/.p/workdirs/a3f2b091"));
}
#[test]
fn script_captures_exit_code_via_pipestatus() {
// Must use PIPESTATUS[0] to get the command's exit code, not tee's.
assert!(sample_script().contains("PIPESTATUS[0]"));
}
#[test]
fn script_writes_exitcode_file() {
assert!(sample_script().contains("exitcode"));
}
#[test]
fn script_tees_output_to_log() {
assert!(sample_script().contains("tee"));
assert!(sample_script().contains("output.log"));
}
#[test]
fn script_waits_for_keypress_before_detach() {
let s = sample_script();
assert!(s.contains("read -rn 1 -s"));
assert!(s.contains("tmux detach-client"));
}
#[test]
fn script_sets_up_status_bar() {
let s = sample_script();
assert!(s.contains("status on"));
assert!(s.contains("beefy"));
}
#[test]
fn worker_name_truncated_at_20_chars() {
let s = build_run_sh("p-test", "/j", "/w", "a-very-long-worker-name-here");
// "a-very-long-worker-name-here" is 28 chars; only first 20 should appear.
assert!(s.contains("a-very-long-worker-n"));
assert!(!s.contains("a-very-long-worker-name-here"));
}
}
+14 -6
View File
@@ -20,14 +20,22 @@ pub fn execute(job_id: &str) -> Result<()> {
} }
let worker = cfg.resolve_worker(Some(&job.worker))?; let worker = cfg.resolve_worker(Some(&job.worker))?;
let session = format!("p-{}", job.short_id());
let sid = job.short_id().to_string(); let sid = job.short_id().to_string();
// Kill the tmux session. This terminates the job process and run.sh. // Read the PID file and kill the process
ssh::run_capture( // Use a subshell that always succeeds to avoid SSH errors
worker, let kill_cmd = format!(
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session), "PID=$(cat ~/.p/jobs/{}/pid 2>/dev/null); \
)?; if [ -n \"$PID\" ]; then \
kill \"$PID\" 2>/dev/null; \
sleep 1; \
kill -9 \"$PID\" 2>/dev/null; \
fi; \
echo 15 > ~/.p/jobs/{}/exitcode; \
date +%s > ~/.p/jobs/{}/finished_at",
job.id, job.id, job.id
);
ssh::run_capture(worker, &kill_cmd)?;
db::save(&Job { db::save(&Job {
status: JobStatus::Stopped, status: JobStatus::Stopped,
+11 -24
View File
@@ -33,6 +33,17 @@ pub fn execute(connection: &str, name: Option<&str>) -> Result<()> {
config::save(&cfg)?; config::save(&cfg)?;
// Check that the worker is reachable
let worker = cfg.get_worker(&name).unwrap();
print!("Checking worker connectivity... ");
if ssh::is_reachable(worker) {
println!("ok");
} else {
println!();
println!("note: could not reach '{}' (worker may be offline)", name);
println!(" run 'p worker ls --check' once it's available");
}
if first { if first {
println!("Registered '{}' and set as default worker.", name); println!("Registered '{}' and set as default worker.", name);
} else { } else {
@@ -40,29 +51,5 @@ pub fn execute(connection: &str, name: Option<&str>) -> Result<()> {
println!("Run 'p worker default {}' to make it the default.", name); println!("Run 'p worker default {}' to make it the default.", name);
} }
// Check that the worker has the tools p needs.
let worker = cfg.get_worker(&name).unwrap();
print!("Checking worker dependencies... ");
match ssh::check_dependencies(worker) {
Ok(ref missing) if missing.is_empty() => println!("ok"),
Ok(missing) => {
println!();
println!(
"warning: '{}' is missing required tools: {}",
name,
missing.join(", ")
);
println!(" install them and re-register, or jobs will fail at launch");
}
Err(_) => {
println!();
println!(
"note: could not reach '{}' to check dependencies (worker may be offline)",
name
);
println!(" run 'p worker ls --check' once it's available");
}
}
Ok(()) Ok(())
} }
-1
View File
@@ -56,7 +56,6 @@ fn main() -> Result<()> {
let cli = cli::Cli::parse(); let cli = cli::Cli::parse();
match cli.command { match cli.command {
cli::Command::Ls { all } => commands::ls::execute(all), cli::Command::Ls { all } => commands::ls::execute(all),
cli::Command::Attach { job_id } => commands::attach::execute(&job_id),
cli::Command::Logs { job_id, follow } => commands::logs::execute(&job_id, follow), cli::Command::Logs { job_id, follow } => commands::logs::execute(&job_id, follow),
cli::Command::Stop { job_id } => commands::stop::execute(&job_id), cli::Command::Stop { job_id } => commands::stop::execute(&job_id),
cli::Command::Pull { cli::Command::Pull {
+4 -37
View File
@@ -30,8 +30,8 @@ pub fn parse_connection(conn: &str) -> (String, Option<u16>) {
/// Extract the bare hostname from a connection string (used as a default worker name). /// Extract the bare hostname from a connection string (used as a default worker name).
pub fn hostname_from_connection(conn: &str) -> String { pub fn hostname_from_connection(conn: &str) -> String {
let (user_host, _port) = parse_connection(conn); let (user_host, _port) = parse_connection(conn);
match user_host.rsplit_once('@') { match user_host.rfind('@') {
Some((_user, host)) => host.to_string(), Some(i) => user_host[i + 1..].to_string(),
None => user_host, None => user_host,
} }
} }
@@ -74,26 +74,6 @@ fn run_with_timeout(worker: &WorkerConfig, remote_cmd: &str, timeout_secs: u32)
Ok(String::from_utf8_lossy(&out.stdout).into_owned()) Ok(String::from_utf8_lossy(&out.stdout).into_owned())
} }
/// Run a command over SSH with an interactive terminal (inherits stdin/stdout/stderr).
///
/// Forces `TERM=xterm-256color` on the remote to avoid failures with terminal
/// emulators whose terminfo entry is not installed on the worker (e.g. ghostty,
/// kitty). The user's actual emulator capabilities are unaffected — only the
/// terminfo lookup on the remote side changes.
pub fn run_interactive(
worker: &WorkerConfig,
remote_cmd: &str,
) -> Result<std::process::ExitStatus> {
let mut args = vec!["-t".to_string()];
args.extend(ssh_args(worker));
args.push(format!("TERM=xterm-256color {}", remote_cmd));
Command::new("ssh")
.args(&args)
.status()
.context("failed to spawn ssh")
}
/// Run a remote command and stream its output to the local terminal via a plain /// Run a remote command and stream its output to the local terminal via a plain
/// SSH pipe (no PTY). Use this for output streaming where interactive terminal /// SSH pipe (no PTY). Use this for output streaming where interactive terminal
/// features are not needed. Ctrl+C on the client kills ssh and the remote process. /// features are not needed. Ctrl+C on the client kills ssh and the remote process.
@@ -147,7 +127,7 @@ pub fn is_reachable(worker: &WorkerConfig) -> bool {
// ── Job-status helpers ──────────────────────────────────────────────────────── // ── Job-status helpers ────────────────────────────────────────────────────────
/// Read the exit code written by run.sh on the worker, if the job has finished. /// Read the exit code written by the job on the worker, if the job has finished.
pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> { pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
let cmd = format!("cat ~/.p/jobs/{}/exitcode 2>/dev/null", job_id); let cmd = format!("cat ~/.p/jobs/{}/exitcode 2>/dev/null", job_id);
run_capture(worker, &cmd) run_capture(worker, &cmd)
@@ -155,7 +135,7 @@ pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
.and_then(|s| s.trim().parse().ok()) .and_then(|s| s.trim().parse().ok())
} }
/// Read the finish timestamp written by run.sh on the worker, if the job has finished. /// Read the finish timestamp written by the job on the worker, if the job has finished.
pub fn read_job_finished_at(worker: &WorkerConfig, job_id: &str) -> Option<i64> { pub fn read_job_finished_at(worker: &WorkerConfig, job_id: &str) -> Option<i64> {
let cmd = format!("cat ~/.p/jobs/{}/finished_at 2>/dev/null", job_id); let cmd = format!("cat ~/.p/jobs/{}/finished_at 2>/dev/null", job_id);
run_capture(worker, &cmd) run_capture(worker, &cmd)
@@ -214,19 +194,6 @@ pub fn poll_jobs(
Ok(map) Ok(map)
} }
/// Check that the required worker-side tools (tmux, base64) are installed.
/// Returns the names of any missing tools. Uses a 5-second timeout so
/// `p worker register` doesn't hang on offline workers.
pub fn check_dependencies(worker: &WorkerConfig) -> Result<Vec<String>> {
let script = "missing=''; \
command -v tmux >/dev/null 2>&1 || missing=\"$missing tmux\"; \
command -v base64 >/dev/null 2>&1 || missing=\"$missing base64\"; \
printf '%s' \"$missing\"";
let out = run_with_timeout(worker, script, 5)?;
Ok(out.split_whitespace().map(|s| s.to_string()).collect())
}
// ── Tests ───────────────────────────────────────────────────────────────────── // ── Tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)] #[cfg(test)]