Compare commits
2 Commits
883114e2a3
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
06c1d81190
|
|||
| 92132bc37a |
Generated
+7
-14
@@ -73,12 +73,6 @@ version = "1.5.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
|
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "base64"
|
|
||||||
version = "0.22.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bitflags"
|
name = "bitflags"
|
||||||
version = "2.11.1"
|
version = "2.11.1"
|
||||||
@@ -476,7 +470,6 @@ name = "p"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"base64",
|
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
"ctrlc",
|
"ctrlc",
|
||||||
@@ -484,16 +477,10 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
|
"shell-words",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "p-agent"
|
|
||||||
version = "0.1.0"
|
|
||||||
dependencies = [
|
|
||||||
"anyhow",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project-lite"
|
name = "pin-project-lite"
|
||||||
version = "0.2.17"
|
version = "0.2.17"
|
||||||
@@ -619,6 +606,12 @@ dependencies = [
|
|||||||
"unsafe-libyaml",
|
"unsafe-libyaml",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "shell-words"
|
||||||
|
version = "1.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "dc6fe69c597f9c37bfeeeeeb33da3530379845f10be461a66d16d03eca2ded77"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "shlex"
|
name = "shlex"
|
||||||
version = "1.3.0"
|
version = "1.3.0"
|
||||||
|
|||||||
+1
-1
@@ -1,3 +1,3 @@
|
|||||||
[workspace]
|
[workspace]
|
||||||
members = ["p", "agent"]
|
members = ["p"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|||||||
@@ -1,14 +1,15 @@
|
|||||||
# `p` — push jobs to worker
|
# `p` — push jobs to worker
|
||||||
|
|
||||||
A small Rust CLI utility to push command-line jobs to remote worker machines,
|
A small Rust CLI utility to push command-line jobs to remote worker machines,
|
||||||
with directory sync, job management, and attach/detach support.
|
with directory sync, job management, and log streaming.
|
||||||
|
|
||||||
## Motivation
|
## Motivation
|
||||||
|
|
||||||
The common developer workflow of "run this build/test/script on a more powerful
|
The common developer workflow of "run this build/test/script on a more powerful
|
||||||
remote machine" currently requires manually chaining `rsync`, `ssh`, and `tmux`.
|
remote machine" currently requires manually chaining `rsync` and `ssh` with
|
||||||
|
a way to keep the job alive in the background (e.g. `nohup`, `tmux`).
|
||||||
`p` wraps that entire flow into a single ergonomic command, while adding proper
|
`p` wraps that entire flow into a single ergonomic command, while adding proper
|
||||||
job tracking, log capture, and attach/detach mechanics.
|
job tracking and log capture.
|
||||||
|
|
||||||
## Core Concepts
|
## Core Concepts
|
||||||
|
|
||||||
@@ -26,13 +27,6 @@ directory. Each job has:
|
|||||||
- Start time, end time, exit code
|
- Start time, end time, exit code
|
||||||
- Captured output log
|
- Captured output log
|
||||||
|
|
||||||
### p-agent
|
|
||||||
A small Rust (?) binary, automatically uploaded and started by `p` on first use
|
|
||||||
of a worker. It manages job lifecycle, log capture, and status tracking on
|
|
||||||
the worker side. The user never manually installs or configures it.
|
|
||||||
`p` checks the agent version on each connection and re-uploads if outdated.
|
|
||||||
Communication happens over SSH port forwarding — no extra open ports needed.
|
|
||||||
|
|
||||||
## CLI Reference
|
## CLI Reference
|
||||||
|
|
||||||
### Running jobs
|
### Running jobs
|
||||||
@@ -41,14 +35,19 @@ Communication happens over SSH port forwarding — no extra open ports needed.
|
|||||||
p -- <command>
|
p -- <command>
|
||||||
```
|
```
|
||||||
Sync the current directory to the default worker and run `<command>` on it.
|
Sync the current directory to the default worker and run `<command>` on it.
|
||||||
Attaches to the job's tmux session immediately. `Ctrl+B D` detaches without
|
Streams the job's output directly to the terminal (like `tail -f`). This feels
|
||||||
killing the job. `Ctrl+C` sends SIGINT to the running process (standard behavior).
|
like running the command locally.
|
||||||
|
|
||||||
When the job finishes, the session stays open and displays:
|
- `Ctrl+C` detaches from the log stream — the job keeps running on the worker.
|
||||||
|
Use `p logs -f <job-id>` to resume watching.
|
||||||
|
- Use `p stop <job-id>` to kill a running job.
|
||||||
|
- If the network connection drops, the job keeps running on the worker.
|
||||||
|
Use `p logs -f <job-id>` to resume watching.
|
||||||
|
|
||||||
|
When the job finishes, `p` prints the exit code and exits:
|
||||||
```
|
```
|
||||||
--- Job done [exit 0]. Press any key to detach. ---
|
[Job done: exit 0]
|
||||||
```
|
```
|
||||||
This lets the user read final output before returning to their shell.
|
|
||||||
|
|
||||||
```
|
```
|
||||||
p <worker> -- <command>
|
p <worker> -- <command>
|
||||||
@@ -59,7 +58,14 @@ Same, but targets a specific named worker.
|
|||||||
p [-n | --no-sync] -- <command>
|
p [-n | --no-sync] -- <command>
|
||||||
```
|
```
|
||||||
Run `<command>` on the worker without syncing the current directory first.
|
Run `<command>` on the worker without syncing the current directory first.
|
||||||
Useful for commands that need no local files (e.g. `p -n -- htop`).
|
Useful for commands that need no local files.
|
||||||
|
|
||||||
|
```
|
||||||
|
p [-d | --detach] -- <command>
|
||||||
|
```
|
||||||
|
Run `<command>` and immediately detach — do not stream output to the terminal.
|
||||||
|
The job starts on the worker and `p` prints the job ID. Useful for fire-and-forget
|
||||||
|
jobs. Use `p logs -f <job-id>` to watch later.
|
||||||
|
|
||||||
### Job management
|
### Job management
|
||||||
|
|
||||||
@@ -71,19 +77,12 @@ completed jobs (done, failed, stopped).
|
|||||||
Shows: ID (short), worker, original CWD, command, status, duration.
|
Shows: ID (short), worker, original CWD, command, status, duration.
|
||||||
Style inspired by `docker ps` / `lxc list`.
|
Style inspired by `docker ps` / `lxc list`.
|
||||||
|
|
||||||
```
|
|
||||||
p attach <job-id>
|
|
||||||
```
|
|
||||||
Re-attach to the tmux session of a running job. Supports partial IDs.
|
|
||||||
Behaves identically to the initial attach: `Ctrl+B D` detaches, and if the job
|
|
||||||
has already finished the "press any key" screen is shown.
|
|
||||||
Only works on **running** jobs. For finished jobs, use `p logs`.
|
|
||||||
|
|
||||||
```
|
```
|
||||||
p logs <job-id>
|
p logs <job-id>
|
||||||
```
|
```
|
||||||
Print the captured output of a job (running or finished). Supports `-f` to
|
Print the captured output of a job (running or finished). Supports `-f` to
|
||||||
follow a running job's output without attaching to its TTY.
|
follow a running job's output in real-time. `Ctrl+C` detaches without stopping
|
||||||
|
the job.
|
||||||
|
|
||||||
```
|
```
|
||||||
p stop <job-id>
|
p stop <job-id>
|
||||||
@@ -148,62 +147,40 @@ Set the default worker.
|
|||||||
- No automatic sync-back after job completion. Use `p pull` to retrieve
|
- No automatic sync-back after job completion. Use `p pull` to retrieve
|
||||||
specific artifacts.
|
specific artifacts.
|
||||||
|
|
||||||
## Attach / Detach Mechanics
|
## Execution Model
|
||||||
|
|
||||||
Jobs run inside a `tmux` session on the worker. `p` attaches to the session
|
No persistent agent daemon is needed. Jobs are launched and managed via
|
||||||
immediately after starting the job.
|
ad-hoc SSH commands:
|
||||||
|
|
||||||
### Status bar
|
1. `p -- <command>` syncs the directory, then runs via SSH:
|
||||||
The tmux session has a custom status bar showing:
|
|
||||||
```
|
```
|
||||||
p-<short-id> beefy make [running] 0:02:14
|
nohup sh -c '<command> 2>&1 | tee output.log; echo $? > exitcode' & echo $! > pid
|
||||||
```
|
```
|
||||||
Fields: job short-ID, worker name, command (truncated), status, elapsed time.
|
2. The client streams `output.log` in real-time over a separate SSH connection.
|
||||||
|
3. `Ctrl+C` closes the SSH stream — the job keeps running.
|
||||||
|
4. `p stop <job-id>` runs `kill $(cat pid)` over SSH.
|
||||||
|
5. `p logs -f <job-id>` tails the log file over SSH.
|
||||||
|
6. `p ls` reads the local job DB and SSH-polls to reconcile state when needed.
|
||||||
|
|
||||||
### Key bindings while attached
|
> **Worker requirements:** `rsync` must be available on the worker.
|
||||||
| Key | Effect |
|
|
||||||
|---|---|
|
|
||||||
| `Ctrl+B D` | Detach from session. Job keeps running. |
|
|
||||||
| `Ctrl+C` | Sends SIGINT to the foreground process (standard terminal behavior). |
|
|
||||||
|
|
||||||
### On job completion
|
|
||||||
When the job's process exits, `run.sh` writes the exit code and then displays:
|
|
||||||
```
|
|
||||||
--- Job done [exit 0]. Press any key to detach. ---
|
|
||||||
```
|
|
||||||
The tmux session stays open (`remain-on-exit on` for the window) so the user
|
|
||||||
can scroll through final output. Pressing any key detaches the client and
|
|
||||||
returns to the local shell. `p` then reads the exit code and prints a summary.
|
|
||||||
|
|
||||||
### `p attach` on a finished job
|
|
||||||
If the job has already finished and the tmux session is still open (user has
|
|
||||||
not yet pressed a key), `p attach` reconnects to the "press any key" screen.
|
|
||||||
Once the key is pressed, the session closes. For a fully-closed session, use
|
|
||||||
`p logs` instead.
|
|
||||||
|
|
||||||
> **Worker requirements:** `tmux` and `rsync` must be available on the worker
|
|
||||||
> (standard on most Linux systems). The `p-agent` binary is auto-uploaded by `p`.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Job Status & Notification
|
## Job Status & Tracking
|
||||||
|
|
||||||
The **p-agent** runs as a lightweight background process on the worker
|
The client maintains a local job database (`~/.local/share/p/jobs/<uuid>.json`).
|
||||||
(started automatically, not a system service). It:
|
`p ls` reads from this local store for fast output.
|
||||||
|
|
||||||
- Manages job launch and tmux session creation
|
### State reconciliation
|
||||||
- Tees output to `output.log`
|
When a job is running, the client periodically checks if `exitcode` exists on
|
||||||
- Writes `exitcode` on completion
|
the worker. If the client was offline or the connection dropped, the next
|
||||||
- Notifies the client over the SSH reverse tunnel when a job finishes
|
`p ls` SSH-polls workers to reconcile state. Jobs with unknown status are
|
||||||
|
marked accordingly.
|
||||||
|
|
||||||
The client maintains a local job database (`~/.local/share/p/jobs/<uuid>.json`)
|
### Connection drops during streaming
|
||||||
mirroring job state. `p ls` reads from this local store (fast, no SSH),
|
If the SSH connection drops while `p` is streaming output, `p` exits with an
|
||||||
updated in real time while attached, and via agent notifications otherwise.
|
error message showing the job ID. The job continues running on the worker.
|
||||||
|
Resume watching with `p logs -f <job-id>`.
|
||||||
### Degraded mode (agent unreachable / client was offline)
|
|
||||||
If the client missed a completion notification, `p ls` marks affected jobs as
|
|
||||||
`unknown`. The next `p ls` SSH-polls all workers with known-running jobs to
|
|
||||||
reconcile state.
|
|
||||||
|
|
||||||
## Worker-side Layout
|
## Worker-side Layout
|
||||||
|
|
||||||
@@ -211,8 +188,6 @@ All data lives under `~/.p/` on the worker (no root access required).
|
|||||||
|
|
||||||
```
|
```
|
||||||
~/.p/
|
~/.p/
|
||||||
bin/
|
|
||||||
p-agent # auto-uploaded by p, versioned
|
|
||||||
jobs/
|
jobs/
|
||||||
<uuid>/
|
<uuid>/
|
||||||
cmd # command string
|
cmd # command string
|
||||||
@@ -221,7 +196,8 @@ All data lives under `~/.p/` on the worker (no root access required).
|
|||||||
started_at # unix timestamp
|
started_at # unix timestamp
|
||||||
output.log # combined stdout+stderr, always captured
|
output.log # combined stdout+stderr, always captured
|
||||||
exitcode # written on completion; absent = still running
|
exitcode # written on completion; absent = still running
|
||||||
tmux_session # tmux session name (e.g. "p-<short-uuid>")
|
pid # process ID of the running job
|
||||||
|
|
||||||
workdirs/
|
workdirs/
|
||||||
<uuid>/ # rsync'd copy of client CWD for this job
|
<uuid>/ # rsync'd copy of client CWD for this job
|
||||||
```
|
```
|
||||||
@@ -252,23 +228,9 @@ b004f123 cloud ~/scripts ./bench.sh done [1] 0:00:47
|
|||||||
|
|
||||||
## Open Questions
|
## Open Questions
|
||||||
|
|
||||||
- **Worker arch detection**: `p-agent` must be compiled for the worker's
|
|
||||||
architecture. Options: (a) ship common targets and detect via SSH, (b)
|
|
||||||
compile on the worker if a Rust toolchain is present, (c) require user to
|
|
||||||
specify arch in worker config.
|
|
||||||
|
|
||||||
Maybe we could also implement the agent core in the form of a shell script?
|
|
||||||
At least the entry point, which could do some detection and install or something.
|
|
||||||
We will see on implementation what works... Small rust binary seems nice,
|
|
||||||
but we want support for amd64, aarch64 and riscv64.
|
|
||||||
|
|
||||||
- **Multiple jobs from the same CWD**: each gets its own `workdirs/<uuid>/`,
|
- **Multiple jobs from the same CWD**: each gets its own `workdirs/<uuid>/`,
|
||||||
so they're fully isolated. This may use significant disk space — `p rm`
|
so they're fully isolated. This may use significant disk space — `p rm`
|
||||||
should prompt to clean up.
|
should prompt to clean up.
|
||||||
|
|
||||||
- **Non-Linux workers**: tmux availability and path conventions may differ on
|
- **Non-Linux workers**: path conventions may differ on macOS workers. Out of
|
||||||
macOS workers. Out of scope for now.
|
scope for now.
|
||||||
|
|
||||||
- **Ctrl+C → detach** (future): it would be nicer if Ctrl+C detached the
|
|
||||||
session instead of sending SIGINT to the job, matching the spirit of the
|
|
||||||
tool. This requires per-session tmux key table configuration and is deferred.
|
|
||||||
|
|||||||
@@ -1,11 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "p-agent"
|
|
||||||
version = "0.1.0"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[[bin]]
|
|
||||||
name = "p-agent"
|
|
||||||
path = "src/main.rs"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
anyhow = "1"
|
|
||||||
@@ -1,4 +0,0 @@
|
|||||||
fn main() {
|
|
||||||
println!("p-agent — worker-side agent for p");
|
|
||||||
println!("(not yet implemented)");
|
|
||||||
}
|
|
||||||
+2
-2
@@ -22,7 +22,7 @@ dirs = "5"
|
|||||||
uuid = { version = "1", features = ["v4"] }
|
uuid = { version = "1", features = ["v4"] }
|
||||||
# Timestamps and duration display
|
# Timestamps and duration display
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
# Encoding commands passed through SSH → tmux → bash
|
# Shell escaping for commands
|
||||||
base64 = "0.22"
|
shell-words = "1"
|
||||||
# Ctrl+C handling (detach message)
|
# Ctrl+C handling (detach message)
|
||||||
ctrlc = "3"
|
ctrlc = "3"
|
||||||
|
|||||||
+1
-7
@@ -5,7 +5,7 @@ use clap::{Parser, Subcommand};
|
|||||||
name = "p",
|
name = "p",
|
||||||
about = "Push jobs to remote worker machines",
|
about = "Push jobs to remote worker machines",
|
||||||
long_about = "\
|
long_about = "\
|
||||||
Push jobs to remote worker machines with directory sync and attach/detach support.
|
Push jobs to remote worker machines with directory sync and log streaming.
|
||||||
|
|
||||||
Run a command on the default worker:
|
Run a command on the default worker:
|
||||||
p -- <command>
|
p -- <command>
|
||||||
@@ -33,12 +33,6 @@ pub enum Command {
|
|||||||
all: bool,
|
all: bool,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Re-attach to the tmux session of a running job
|
|
||||||
Attach {
|
|
||||||
/// Job ID or unambiguous prefix
|
|
||||||
job_id: String,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// Print captured output of a job (running or finished)
|
/// Print captured output of a job (running or finished)
|
||||||
Logs {
|
Logs {
|
||||||
/// Job ID or unambiguous prefix
|
/// Job ID or unambiguous prefix
|
||||||
|
|||||||
@@ -1,97 +0,0 @@
|
|||||||
use anyhow::{Context, Result};
|
|
||||||
use chrono::Utc;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
config, db,
|
|
||||||
job::{Job, JobStatus},
|
|
||||||
ssh,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub fn execute(job_id: &str) -> Result<()> {
|
|
||||||
let cfg = config::load()?;
|
|
||||||
let job = db::find(job_id)?.with_context(|| format!("job '{}' not found", job_id))?;
|
|
||||||
|
|
||||||
// Allow attach on Running (normal) and Unknown (worker was temporarily
|
|
||||||
// unreachable — the job may still be live).
|
|
||||||
match job.status {
|
|
||||||
JobStatus::Running | JobStatus::Unknown => {}
|
|
||||||
_ => anyhow::bail!(
|
|
||||||
"job {} is not running (status: {})\n\
|
|
||||||
use 'p logs {}' to view its output",
|
|
||||||
job.short_id(),
|
|
||||||
job.status.as_str(),
|
|
||||||
job.short_id(),
|
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
let worker = cfg.resolve_worker(Some(&job.worker))?.clone();
|
|
||||||
let session = format!("p-{}", job.short_id());
|
|
||||||
let sid = job.short_id().to_string();
|
|
||||||
|
|
||||||
{
|
|
||||||
let sid = sid.clone();
|
|
||||||
ctrlc::set_handler(move || {
|
|
||||||
eprintln!(
|
|
||||||
"\nDetached. Use 'p attach {sid}' to re-attach or 'p logs {sid}' to view output."
|
|
||||||
);
|
|
||||||
std::process::exit(0);
|
|
||||||
})
|
|
||||||
.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
let attach_status = ssh::run_interactive(&worker, &format!("tmux attach -t '{}'", session))?;
|
|
||||||
|
|
||||||
// If tmux exited non-zero the session doesn't exist on the worker.
|
|
||||||
if !attach_status.success() {
|
|
||||||
let exit_code = ssh::read_job_exitcode(&worker, &job.id);
|
|
||||||
if exit_code.is_none() {
|
|
||||||
// No exit code either — the job was lost without completing
|
|
||||||
// (e.g. worker restarted, tmux server killed).
|
|
||||||
eprintln!(
|
|
||||||
"error: tmux session for job {} no longer exists on '{}'.",
|
|
||||||
sid, worker.name
|
|
||||||
);
|
|
||||||
eprintln!("The job was likely interrupted (worker restart or tmux server exit).");
|
|
||||||
eprintln!("Use 'p logs {}' to see whatever output was captured.", sid);
|
|
||||||
db::save(&Job {
|
|
||||||
status: JobStatus::Unknown,
|
|
||||||
..job
|
|
||||||
})?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
// Exit code exists — job finished cleanly but the session was already
|
|
||||||
// cleaned up. Fall through to normal reconciliation.
|
|
||||||
}
|
|
||||||
|
|
||||||
let exit_code = ssh::read_job_exitcode(&worker, &job.id);
|
|
||||||
let finished_at =
|
|
||||||
ssh::read_job_finished_at(&worker, &job.id).unwrap_or_else(|| Utc::now().timestamp());
|
|
||||||
|
|
||||||
if let Some(ec) = exit_code {
|
|
||||||
ssh::run_capture(
|
|
||||||
&worker,
|
|
||||||
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session),
|
|
||||||
)
|
|
||||||
.ok();
|
|
||||||
|
|
||||||
db::save(&Job {
|
|
||||||
status: if ec == 0 {
|
|
||||||
JobStatus::Done
|
|
||||||
} else {
|
|
||||||
JobStatus::Failed
|
|
||||||
},
|
|
||||||
exit_code: Some(ec),
|
|
||||||
finished_at: Some(finished_at),
|
|
||||||
..job
|
|
||||||
})?;
|
|
||||||
|
|
||||||
eprintln!("Job {} finished with exit code {}.", sid, ec);
|
|
||||||
} else {
|
|
||||||
eprintln!(
|
|
||||||
"Detached from job {}. Use 'p attach {}' to re-attach.",
|
|
||||||
sid, sid
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
+49
-3
@@ -1,6 +1,11 @@
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
|
use chrono::Utc;
|
||||||
|
|
||||||
use crate::{config, db, job::JobStatus, ssh};
|
use crate::{
|
||||||
|
config, db,
|
||||||
|
job::{Job, JobStatus},
|
||||||
|
ssh,
|
||||||
|
};
|
||||||
|
|
||||||
pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
||||||
let cfg = config::load()?;
|
let cfg = config::load()?;
|
||||||
@@ -19,10 +24,12 @@ pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
|||||||
job.short_id()
|
job.short_id()
|
||||||
);
|
);
|
||||||
print_log(worker, &log)?;
|
print_log(worker, &log)?;
|
||||||
|
reconcile_and_print_exitcode(worker, &job)?;
|
||||||
} else {
|
} else {
|
||||||
// Stream live output and exit automatically when the job finishes.
|
// Stream live output and exit automatically when the job finishes.
|
||||||
//
|
//
|
||||||
// The script backgrounds `tail -f` (so its stdout still flows to the
|
// Use -F (--follow=name) to handle the file not existing yet.
|
||||||
|
// The script backgrounds `tail -F` (so its stdout still flows to the
|
||||||
// client through the SSH pipe), then polls for the exitcode file that
|
// client through the SSH pipe), then polls for the exitcode file that
|
||||||
// run.sh writes the moment the job exits. A 1-second sleep after
|
// run.sh writes the moment the job exits. A 1-second sleep after
|
||||||
// detecting the exitcode file gives tail time to drain any bytes that
|
// detecting the exitcode file gives tail time to drain any bytes that
|
||||||
@@ -33,7 +40,7 @@ pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
|||||||
// shell, terminating the whole script including the tail background job.
|
// shell, terminating the whole script including the tail background job.
|
||||||
let exitcode_path = format!("~/.p/jobs/{}/exitcode", job.id);
|
let exitcode_path = format!("~/.p/jobs/{}/exitcode", job.id);
|
||||||
let follow_cmd = format!(
|
let follow_cmd = format!(
|
||||||
"tail -n +1 -f {log} & \
|
"tail -n +1 -F {log} & \
|
||||||
TAIL_PID=$!; \
|
TAIL_PID=$!; \
|
||||||
while ! [ -f {exitcode} ]; do sleep 1; done; \
|
while ! [ -f {exitcode} ]; do sleep 1; done; \
|
||||||
sleep 1; \
|
sleep 1; \
|
||||||
@@ -43,9 +50,16 @@ pub fn execute(job_id: &str, follow: bool) -> Result<()> {
|
|||||||
exitcode = exitcode_path,
|
exitcode = exitcode_path,
|
||||||
);
|
);
|
||||||
ssh::run_output(worker, &follow_cmd)?;
|
ssh::run_output(worker, &follow_cmd)?;
|
||||||
|
|
||||||
|
// Reconcile state after streaming ends
|
||||||
|
reconcile_and_print_exitcode(worker, &job)?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
print_log(worker, &log)?;
|
print_log(worker, &log)?;
|
||||||
|
if job.status == JobStatus::Running {
|
||||||
|
// Check if job has finished
|
||||||
|
reconcile_and_print_exitcode(worker, &job)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -59,3 +73,35 @@ fn print_log(worker: &crate::config::WorkerConfig, log: &str) -> Result<()> {
|
|||||||
print!("{}", out);
|
print!("{}", out);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if the job has finished and update local state. Print exit code if done.
|
||||||
|
fn reconcile_and_print_exitcode(worker: &crate::config::WorkerConfig, job: &Job) -> Result<()> {
|
||||||
|
if job.status != JobStatus::Running {
|
||||||
|
// Already finished
|
||||||
|
if let Some(ec) = job.exit_code {
|
||||||
|
eprintln!("[Job done: exit {}]", ec);
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let exit_code = ssh::read_job_exitcode(worker, &job.id);
|
||||||
|
if let Some(ec) = exit_code {
|
||||||
|
let finished_at =
|
||||||
|
ssh::read_job_finished_at(worker, &job.id).unwrap_or_else(|| Utc::now().timestamp());
|
||||||
|
|
||||||
|
db::save(&Job {
|
||||||
|
status: if ec == 0 {
|
||||||
|
JobStatus::Done
|
||||||
|
} else {
|
||||||
|
JobStatus::Failed
|
||||||
|
},
|
||||||
|
exit_code: Some(ec),
|
||||||
|
finished_at: Some(finished_at),
|
||||||
|
..job.clone()
|
||||||
|
})?;
|
||||||
|
|
||||||
|
eprintln!("[Job done: exit {}]", ec);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
pub mod attach;
|
|
||||||
pub mod logs;
|
pub mod logs;
|
||||||
pub mod ls;
|
pub mod ls;
|
||||||
pub mod prune;
|
pub mod prune;
|
||||||
|
|||||||
@@ -16,14 +16,14 @@ pub fn execute(job_id: &str, force: bool) -> Result<()> {
|
|||||||
sid
|
sid
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// Kill the tmux session before wiping the files.
|
// Kill the job via PID before wiping the files.
|
||||||
let session = format!("p-{}", sid);
|
|
||||||
let worker = cfg.resolve_worker(Some(&job.worker))?;
|
let worker = cfg.resolve_worker(Some(&job.worker))?;
|
||||||
ssh::run_capture(
|
let pid_path = format!("~/.p/jobs/{}/pid", job.id);
|
||||||
worker,
|
let kill_cmd = format!(
|
||||||
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session),
|
"PID=$(cat {} 2>/dev/null) && kill \"$PID\" 2>/dev/null",
|
||||||
)
|
pid_path
|
||||||
.ok();
|
);
|
||||||
|
ssh::run_capture(worker, &kill_cmd).ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove remote job dir and work dir (best-effort — warn if unreachable).
|
// Remove remote job dir and work dir (best-effort — warn if unreachable).
|
||||||
|
|||||||
+55
-161
@@ -1,5 +1,4 @@
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use base64::{engine::general_purpose::STANDARD as B64, Engine};
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use std::io::IsTerminal;
|
use std::io::IsTerminal;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@@ -25,7 +24,6 @@ pub fn execute(
|
|||||||
|
|
||||||
let id = Uuid::new_v4().to_string();
|
let id = Uuid::new_v4().to_string();
|
||||||
let short_id = &id[..8];
|
let short_id = &id[..8];
|
||||||
let session = format!("p-{}", short_id);
|
|
||||||
|
|
||||||
// Resolve the remote home directory once using an absolute path so that
|
// Resolve the remote home directory once using an absolute path so that
|
||||||
// every subsequent call — SSH, rsync, and scripts on the worker — all
|
// every subsequent call — SSH, rsync, and scripts on the worker — all
|
||||||
@@ -78,27 +76,31 @@ pub fn execute(
|
|||||||
};
|
};
|
||||||
db::save(&job)?;
|
db::save(&job)?;
|
||||||
|
|
||||||
// ── 4. Write files and start the job on the worker ────────────────────────
|
// ── 4. Write metadata and start the job on the worker ─────────────────────
|
||||||
|
|
||||||
let cmd_b64 = B64.encode(&cmd_str);
|
|
||||||
let run_sh = build_run_sh(&session, &job_dir, &work_dir, &worker.name);
|
|
||||||
let run_sh_b64 = B64.encode(&run_sh);
|
|
||||||
|
|
||||||
|
// Write cmd, started_at, and the run script
|
||||||
|
let run_sh = build_run_sh(&job_dir, &work_dir, &cmd_str);
|
||||||
let setup = format!(
|
let setup = format!(
|
||||||
"printf '%s' '{cmd_b64}' > {job_dir}/cmd && \
|
"printf '%s' {} > {job_dir}/cmd && \
|
||||||
printf '%s' '{run_sh_b64}' | base64 -d > {job_dir}/run.sh && \
|
|
||||||
chmod +x {job_dir}/run.sh && \
|
|
||||||
date +%s > {job_dir}/started_at && \
|
date +%s > {job_dir}/started_at && \
|
||||||
tmux new-session -d -s '{session}' {job_dir}/run.sh",
|
touch {job_dir}/output.log && \
|
||||||
job_dir = job_dir,
|
printf '%s' {} > {job_dir}/run.sh && \
|
||||||
cmd_b64 = cmd_b64,
|
chmod +x {job_dir}/run.sh",
|
||||||
run_sh_b64 = run_sh_b64,
|
shell_words::quote(&cmd_str),
|
||||||
session = session,
|
shell_words::quote(&run_sh),
|
||||||
|
job_dir = job_dir
|
||||||
);
|
);
|
||||||
|
ssh::run_capture(&worker, &setup).context("failed to write job metadata")?;
|
||||||
|
|
||||||
ssh::run_capture(&worker, &setup).context("failed to set up job on worker")?;
|
// Start the run script with nohup
|
||||||
|
// The PID is saved to a file for later termination via `p stop`
|
||||||
|
let launch_cmd = format!(
|
||||||
|
"cd {} && nohup {}/run.sh & echo $! > {}/pid",
|
||||||
|
work_dir, job_dir, job_dir
|
||||||
|
);
|
||||||
|
ssh::run_capture(&worker, &launch_cmd).context("failed to start job on worker")?;
|
||||||
|
|
||||||
// ── 4. Attach or detach ───────────────────────────────────────────────────────
|
// ── 5. Detach or stream output ────────────────────────────────────────────
|
||||||
|
|
||||||
if detach {
|
if detach {
|
||||||
// Non-interactive mode: print only the job UUID to stdout and exit.
|
// Non-interactive mode: print only the job UUID to stdout and exit.
|
||||||
@@ -108,41 +110,43 @@ pub fn execute(
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Interactive mode: attach to the tmux session.
|
// Interactive mode: stream output.log in real-time.
|
||||||
// run.sh keeps the session alive after the job finishes (via `read`),
|
// Ctrl+C detaches from the stream — the job keeps running.
|
||||||
// so there is no race between job completion and our attach call.
|
|
||||||
//
|
|
||||||
// Ctrl+B D detaches cleanly mid-run; the job keeps going in the background.
|
|
||||||
// Any other key on the "press any key" screen triggers `tmux detach-client`
|
|
||||||
// from within run.sh — no [exited] or [detached] flash.
|
|
||||||
|
|
||||||
let sid = short_id.to_string();
|
let sid = short_id.to_string();
|
||||||
ctrlc::set_handler(move || {
|
ctrlc::set_handler(move || {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"\nDetached. Use 'p attach {sid}' to re-attach or 'p logs {sid}' to view output."
|
"\nDetached. Use 'p logs -f {}' to resume watching or 'p stop {}' to kill.",
|
||||||
|
sid, sid
|
||||||
);
|
);
|
||||||
std::process::exit(0);
|
std::process::exit(0);
|
||||||
})
|
})
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
eprintln!("Job {} started on {}.", short_id, worker.name);
|
eprintln!("Job {} started on {}.", short_id, worker.name);
|
||||||
ssh::run_interactive(&worker, &format!("tmux attach -t '{}'", session))?;
|
|
||||||
|
|
||||||
// ── 5. Reconcile status after returning ───────────────────────────────────
|
// Stream the log file, exiting when the job finishes
|
||||||
|
// Use -F (--follow=name) to handle the file not existing yet
|
||||||
|
let log_path = format!("{}/output.log", job_dir);
|
||||||
|
let exitcode_path = format!("{}/exitcode", job_dir);
|
||||||
|
let stream_cmd = format!(
|
||||||
|
"tail -n +1 -F {} & \
|
||||||
|
TAIL_PID=$!; \
|
||||||
|
while ! [ -f {} ]; do sleep 1; done; \
|
||||||
|
sleep 0.5; \
|
||||||
|
kill \"$TAIL_PID\" 2>/dev/null; \
|
||||||
|
wait \"$TAIL_PID\" 2>/dev/null",
|
||||||
|
log_path, exitcode_path
|
||||||
|
);
|
||||||
|
ssh::run_output(&worker, &stream_cmd)?;
|
||||||
|
|
||||||
|
// ── 6. Reconcile status after returning ───────────────────────────────────
|
||||||
|
|
||||||
let exit_code = ssh::read_job_exitcode(&worker, &id);
|
let exit_code = ssh::read_job_exitcode(&worker, &id);
|
||||||
let finished_at =
|
let finished_at =
|
||||||
ssh::read_job_finished_at(&worker, &id).unwrap_or_else(|| Utc::now().timestamp());
|
ssh::read_job_finished_at(&worker, &id).unwrap_or_else(|| Utc::now().timestamp());
|
||||||
|
|
||||||
if let Some(ec) = exit_code {
|
if let Some(ec) = exit_code {
|
||||||
// Job is done. The session might still be alive if the user Ctrl+B D'd
|
|
||||||
// from the "press any key" screen — kill it to clean up the lingering `read`.
|
|
||||||
ssh::run_capture(
|
|
||||||
&worker,
|
|
||||||
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session),
|
|
||||||
)
|
|
||||||
.ok();
|
|
||||||
|
|
||||||
db::save(&Job {
|
db::save(&Job {
|
||||||
status: if ec == 0 {
|
status: if ec == 0 {
|
||||||
JobStatus::Done
|
JobStatus::Done
|
||||||
@@ -154,145 +158,35 @@ pub fn execute(
|
|||||||
..job
|
..job
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
eprintln!("Job {} finished with exit code {}.", short_id, ec);
|
eprintln!("[Job done: exit {}]", ec);
|
||||||
} else {
|
} else {
|
||||||
// User detached mid-run (Ctrl+B D). Job is still running on the worker.
|
// Connection dropped or job lost
|
||||||
db::save(&Job {
|
db::save(&Job {
|
||||||
status: JobStatus::Running,
|
status: JobStatus::Unknown,
|
||||||
..job
|
..job
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"Detached from job {}. Use 'p attach {}' to re-attach or 'p logs {}' to view output.",
|
"Connection lost. Job {} may still be running. Use 'p logs -f {}' to resume.",
|
||||||
short_id, short_id, short_id
|
short_id, short_id
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build the shell script that runs inside the tmux pane.
|
/// Build the shell script that runs on the worker.
|
||||||
///
|
fn build_run_sh(job_dir: &str, work_dir: &str, cmd_str: &str) -> String {
|
||||||
/// `pub(crate)` so it can be tested.
|
|
||||||
pub(crate) fn build_run_sh(session: &str, job_dir: &str, work_dir: &str, worker: &str) -> String {
|
|
||||||
// Truncate worker name for display to avoid overflowing the status bar.
|
|
||||||
let worker_display = if worker.len() > 20 {
|
|
||||||
&worker[..20]
|
|
||||||
} else {
|
|
||||||
worker
|
|
||||||
};
|
|
||||||
|
|
||||||
format!(
|
format!(
|
||||||
// Shebang + status bar setup
|
r#"#!/bin/bash
|
||||||
"#!/bin/bash\n\
|
cd {work_dir}
|
||||||
tmux set-option -t '{session}' status on 2>/dev/null\n\
|
{cmd} 2>&1 | tee {job_dir}/output.log
|
||||||
tmux set-option -t '{session}' status-style 'fg=colour250,bg=colour235' 2>/dev/null\n\
|
EXIT_CODE=${{PIPESTATUS[0]}}
|
||||||
tmux set-option -t '{session}' status-left-length 60 2>/dev/null\n\
|
echo "$EXIT_CODE" > {job_dir}/exitcode
|
||||||
tmux set-option -t '{session}' status-right-length 20 2>/dev/null\n\
|
date +%s > {job_dir}/finished_at
|
||||||
tmux set-option -t '{session}' status-left \
|
"#,
|
||||||
'#[fg=colour39,bold] {session} #[fg=colour250,nobold]| {worker_display} ' 2>/dev/null\n\
|
|
||||||
tmux set-option -t '{session}' status-right \
|
|
||||||
'#[fg=colour250] running ' 2>/dev/null\n\
|
|
||||||
\n\
|
|
||||||
# Run the command, tee output to log\n\
|
|
||||||
cd {work_dir}\n\
|
|
||||||
cmd=$(base64 -d < {job_dir}/cmd)\n\
|
|
||||||
bash -c \"$cmd\" 2>&1 | tee {job_dir}/output.log\n\
|
|
||||||
EXIT_CODE=${{PIPESTATUS[0]}}\n\
|
|
||||||
echo \"$EXIT_CODE\" > {job_dir}/exitcode\n\
|
|
||||||
date +%s > {job_dir}/finished_at\n\
|
|
||||||
\n\
|
|
||||||
# Update status bar to show result\n\
|
|
||||||
if [ \"$EXIT_CODE\" -eq 0 ]; then\n\
|
|
||||||
tmux set-option -t '{session}' status-right \
|
|
||||||
'#[fg=colour2,bold] done [0] ' 2>/dev/null\n\
|
|
||||||
else\n\
|
|
||||||
tmux set-option -t '{session}' status-right \
|
|
||||||
\"#[fg=colour1,bold] done [$EXIT_CODE] \" 2>/dev/null\n\
|
|
||||||
fi\n\
|
|
||||||
\n\
|
|
||||||
# Keep the session open so the user can read final output\n\
|
|
||||||
printf '\\n\\033[2m--- done [exit %d] - press any key to detach ---\\033[0m\\n' \
|
|
||||||
\"$EXIT_CODE\"\n\
|
|
||||||
read -rn 1 -s\n\
|
|
||||||
\n\
|
|
||||||
# Detach cleanly — no [exited] / [detached] flash\n\
|
|
||||||
tmux detach-client -s '{session}' 2>/dev/null || true\n",
|
|
||||||
session = session,
|
|
||||||
worker_display = worker_display,
|
|
||||||
job_dir = job_dir,
|
|
||||||
work_dir = work_dir,
|
work_dir = work_dir,
|
||||||
|
cmd = cmd_str,
|
||||||
|
job_dir = job_dir
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Tests ─────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
fn sample_script() -> String {
|
|
||||||
build_run_sh(
|
|
||||||
"p-a3f2b091",
|
|
||||||
"/home/ubuntu/.p/jobs/a3f2b091-0000-0000-0000-000000000000",
|
|
||||||
"/home/ubuntu/.p/workdirs/a3f2b091-0000-0000-0000-000000000000",
|
|
||||||
"beefy",
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_starts_with_shebang() {
|
|
||||||
assert!(sample_script().starts_with("#!/bin/bash\n"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_contains_session_name() {
|
|
||||||
assert!(sample_script().contains("p-a3f2b091"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_contains_job_and_work_dirs() {
|
|
||||||
let s = sample_script();
|
|
||||||
assert!(s.contains("/home/ubuntu/.p/jobs/a3f2b091"));
|
|
||||||
assert!(s.contains("/home/ubuntu/.p/workdirs/a3f2b091"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_captures_exit_code_via_pipestatus() {
|
|
||||||
// Must use PIPESTATUS[0] to get the command's exit code, not tee's.
|
|
||||||
assert!(sample_script().contains("PIPESTATUS[0]"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_writes_exitcode_file() {
|
|
||||||
assert!(sample_script().contains("exitcode"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_tees_output_to_log() {
|
|
||||||
assert!(sample_script().contains("tee"));
|
|
||||||
assert!(sample_script().contains("output.log"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_waits_for_keypress_before_detach() {
|
|
||||||
let s = sample_script();
|
|
||||||
assert!(s.contains("read -rn 1 -s"));
|
|
||||||
assert!(s.contains("tmux detach-client"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn script_sets_up_status_bar() {
|
|
||||||
let s = sample_script();
|
|
||||||
assert!(s.contains("status on"));
|
|
||||||
assert!(s.contains("beefy"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn worker_name_truncated_at_20_chars() {
|
|
||||||
let s = build_run_sh("p-test", "/j", "/w", "a-very-long-worker-name-here");
|
|
||||||
// "a-very-long-worker-name-here" is 28 chars; only first 20 should appear.
|
|
||||||
assert!(s.contains("a-very-long-worker-n"));
|
|
||||||
assert!(!s.contains("a-very-long-worker-name-here"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
+14
-6
@@ -20,14 +20,22 @@ pub fn execute(job_id: &str) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let worker = cfg.resolve_worker(Some(&job.worker))?;
|
let worker = cfg.resolve_worker(Some(&job.worker))?;
|
||||||
let session = format!("p-{}", job.short_id());
|
|
||||||
let sid = job.short_id().to_string();
|
let sid = job.short_id().to_string();
|
||||||
|
|
||||||
// Kill the tmux session. This terminates the job process and run.sh.
|
// Read the PID file and kill the process
|
||||||
ssh::run_capture(
|
// Use a subshell that always succeeds to avoid SSH errors
|
||||||
worker,
|
let kill_cmd = format!(
|
||||||
&format!("tmux kill-session -t '{}' 2>/dev/null || true", session),
|
"PID=$(cat ~/.p/jobs/{}/pid 2>/dev/null); \
|
||||||
)?;
|
if [ -n \"$PID\" ]; then \
|
||||||
|
kill \"$PID\" 2>/dev/null; \
|
||||||
|
sleep 1; \
|
||||||
|
kill -9 \"$PID\" 2>/dev/null; \
|
||||||
|
fi; \
|
||||||
|
echo 15 > ~/.p/jobs/{}/exitcode; \
|
||||||
|
date +%s > ~/.p/jobs/{}/finished_at",
|
||||||
|
job.id, job.id, job.id
|
||||||
|
);
|
||||||
|
ssh::run_capture(worker, &kill_cmd)?;
|
||||||
|
|
||||||
db::save(&Job {
|
db::save(&Job {
|
||||||
status: JobStatus::Stopped,
|
status: JobStatus::Stopped,
|
||||||
|
|||||||
@@ -33,6 +33,17 @@ pub fn execute(connection: &str, name: Option<&str>) -> Result<()> {
|
|||||||
|
|
||||||
config::save(&cfg)?;
|
config::save(&cfg)?;
|
||||||
|
|
||||||
|
// Check that the worker is reachable
|
||||||
|
let worker = cfg.get_worker(&name).unwrap();
|
||||||
|
print!("Checking worker connectivity... ");
|
||||||
|
if ssh::is_reachable(worker) {
|
||||||
|
println!("ok");
|
||||||
|
} else {
|
||||||
|
println!();
|
||||||
|
println!("note: could not reach '{}' (worker may be offline)", name);
|
||||||
|
println!(" run 'p worker ls --check' once it's available");
|
||||||
|
}
|
||||||
|
|
||||||
if first {
|
if first {
|
||||||
println!("Registered '{}' and set as default worker.", name);
|
println!("Registered '{}' and set as default worker.", name);
|
||||||
} else {
|
} else {
|
||||||
@@ -40,29 +51,5 @@ pub fn execute(connection: &str, name: Option<&str>) -> Result<()> {
|
|||||||
println!("Run 'p worker default {}' to make it the default.", name);
|
println!("Run 'p worker default {}' to make it the default.", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the worker has the tools p needs.
|
|
||||||
let worker = cfg.get_worker(&name).unwrap();
|
|
||||||
print!("Checking worker dependencies... ");
|
|
||||||
match ssh::check_dependencies(worker) {
|
|
||||||
Ok(ref missing) if missing.is_empty() => println!("ok"),
|
|
||||||
Ok(missing) => {
|
|
||||||
println!();
|
|
||||||
println!(
|
|
||||||
"warning: '{}' is missing required tools: {}",
|
|
||||||
name,
|
|
||||||
missing.join(", ")
|
|
||||||
);
|
|
||||||
println!(" install them and re-register, or jobs will fail at launch");
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
println!();
|
|
||||||
println!(
|
|
||||||
"note: could not reach '{}' to check dependencies (worker may be offline)",
|
|
||||||
name
|
|
||||||
);
|
|
||||||
println!(" run 'p worker ls --check' once it's available");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,7 +56,6 @@ fn main() -> Result<()> {
|
|||||||
let cli = cli::Cli::parse();
|
let cli = cli::Cli::parse();
|
||||||
match cli.command {
|
match cli.command {
|
||||||
cli::Command::Ls { all } => commands::ls::execute(all),
|
cli::Command::Ls { all } => commands::ls::execute(all),
|
||||||
cli::Command::Attach { job_id } => commands::attach::execute(&job_id),
|
|
||||||
cli::Command::Logs { job_id, follow } => commands::logs::execute(&job_id, follow),
|
cli::Command::Logs { job_id, follow } => commands::logs::execute(&job_id, follow),
|
||||||
cli::Command::Stop { job_id } => commands::stop::execute(&job_id),
|
cli::Command::Stop { job_id } => commands::stop::execute(&job_id),
|
||||||
cli::Command::Pull {
|
cli::Command::Pull {
|
||||||
|
|||||||
+4
-37
@@ -30,8 +30,8 @@ pub fn parse_connection(conn: &str) -> (String, Option<u16>) {
|
|||||||
/// Extract the bare hostname from a connection string (used as a default worker name).
|
/// Extract the bare hostname from a connection string (used as a default worker name).
|
||||||
pub fn hostname_from_connection(conn: &str) -> String {
|
pub fn hostname_from_connection(conn: &str) -> String {
|
||||||
let (user_host, _port) = parse_connection(conn);
|
let (user_host, _port) = parse_connection(conn);
|
||||||
match user_host.rsplit_once('@') {
|
match user_host.rfind('@') {
|
||||||
Some((_user, host)) => host.to_string(),
|
Some(i) => user_host[i + 1..].to_string(),
|
||||||
None => user_host,
|
None => user_host,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -74,26 +74,6 @@ fn run_with_timeout(worker: &WorkerConfig, remote_cmd: &str, timeout_secs: u32)
|
|||||||
Ok(String::from_utf8_lossy(&out.stdout).into_owned())
|
Ok(String::from_utf8_lossy(&out.stdout).into_owned())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run a command over SSH with an interactive terminal (inherits stdin/stdout/stderr).
|
|
||||||
///
|
|
||||||
/// Forces `TERM=xterm-256color` on the remote to avoid failures with terminal
|
|
||||||
/// emulators whose terminfo entry is not installed on the worker (e.g. ghostty,
|
|
||||||
/// kitty). The user's actual emulator capabilities are unaffected — only the
|
|
||||||
/// terminfo lookup on the remote side changes.
|
|
||||||
pub fn run_interactive(
|
|
||||||
worker: &WorkerConfig,
|
|
||||||
remote_cmd: &str,
|
|
||||||
) -> Result<std::process::ExitStatus> {
|
|
||||||
let mut args = vec!["-t".to_string()];
|
|
||||||
args.extend(ssh_args(worker));
|
|
||||||
args.push(format!("TERM=xterm-256color {}", remote_cmd));
|
|
||||||
|
|
||||||
Command::new("ssh")
|
|
||||||
.args(&args)
|
|
||||||
.status()
|
|
||||||
.context("failed to spawn ssh")
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Run a remote command and stream its output to the local terminal via a plain
|
/// Run a remote command and stream its output to the local terminal via a plain
|
||||||
/// SSH pipe (no PTY). Use this for output streaming where interactive terminal
|
/// SSH pipe (no PTY). Use this for output streaming where interactive terminal
|
||||||
/// features are not needed. Ctrl+C on the client kills ssh and the remote process.
|
/// features are not needed. Ctrl+C on the client kills ssh and the remote process.
|
||||||
@@ -147,7 +127,7 @@ pub fn is_reachable(worker: &WorkerConfig) -> bool {
|
|||||||
|
|
||||||
// ── Job-status helpers ────────────────────────────────────────────────────────
|
// ── Job-status helpers ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
/// Read the exit code written by run.sh on the worker, if the job has finished.
|
/// Read the exit code written by the job on the worker, if the job has finished.
|
||||||
pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
|
pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
|
||||||
let cmd = format!("cat ~/.p/jobs/{}/exitcode 2>/dev/null", job_id);
|
let cmd = format!("cat ~/.p/jobs/{}/exitcode 2>/dev/null", job_id);
|
||||||
run_capture(worker, &cmd)
|
run_capture(worker, &cmd)
|
||||||
@@ -155,7 +135,7 @@ pub fn read_job_exitcode(worker: &WorkerConfig, job_id: &str) -> Option<i32> {
|
|||||||
.and_then(|s| s.trim().parse().ok())
|
.and_then(|s| s.trim().parse().ok())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read the finish timestamp written by run.sh on the worker, if the job has finished.
|
/// Read the finish timestamp written by the job on the worker, if the job has finished.
|
||||||
pub fn read_job_finished_at(worker: &WorkerConfig, job_id: &str) -> Option<i64> {
|
pub fn read_job_finished_at(worker: &WorkerConfig, job_id: &str) -> Option<i64> {
|
||||||
let cmd = format!("cat ~/.p/jobs/{}/finished_at 2>/dev/null", job_id);
|
let cmd = format!("cat ~/.p/jobs/{}/finished_at 2>/dev/null", job_id);
|
||||||
run_capture(worker, &cmd)
|
run_capture(worker, &cmd)
|
||||||
@@ -214,19 +194,6 @@ pub fn poll_jobs(
|
|||||||
Ok(map)
|
Ok(map)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check that the required worker-side tools (tmux, base64) are installed.
|
|
||||||
/// Returns the names of any missing tools. Uses a 5-second timeout so
|
|
||||||
/// `p worker register` doesn't hang on offline workers.
|
|
||||||
pub fn check_dependencies(worker: &WorkerConfig) -> Result<Vec<String>> {
|
|
||||||
let script = "missing=''; \
|
|
||||||
command -v tmux >/dev/null 2>&1 || missing=\"$missing tmux\"; \
|
|
||||||
command -v base64 >/dev/null 2>&1 || missing=\"$missing base64\"; \
|
|
||||||
printf '%s' \"$missing\"";
|
|
||||||
|
|
||||||
let out = run_with_timeout(worker, script, 5)?;
|
|
||||||
Ok(out.split_whitespace().map(|s| s.to_string()).collect())
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Tests ─────────────────────────────────────────────────────────────────────
|
// ── Tests ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
Reference in New Issue
Block a user