From c6b8c9c905b005c9f0c6a84639863857e11bd5e1 Mon Sep 17 00:00:00 2001 From: jasonwitty Date: Fri, 15 Aug 2025 19:21:34 -0700 Subject: [PATCH] patch for macbook compatibility issues. --- .github/workflows/ci.yml | 53 ++++++++ README.md | 2 +- rust-toolchain.toml | 3 + socktop/src/app.rs | 19 ++- socktop/src/lib.rs | 4 + socktop/src/ws.rs | 231 +++++++++++++++++++++++++++++------ socktop/tests/ws_probe.rs | 27 ++++ socktop_agent/src/metrics.rs | 51 +++++++- socktop_agent/src/state.rs | 2 +- 9 files changed, 343 insertions(+), 49 deletions(-) create mode 100644 rust-toolchain.toml create mode 100644 socktop/src/lib.rs create mode 100644 socktop/tests/ws_probe.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 145ef56..0bbaaf8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,6 +23,59 @@ jobs: run: cargo clippy --all-targets --all-features -- -D warnings - name: Build (release) run: cargo build --release --workspace + - name: Start agent (Ubuntu) + if: matrix.os == 'ubuntu-latest' + shell: bash + run: | + set -euo pipefail + # Use debug build for faster startup in CI + RUST_LOG=info cargo run -p socktop_agent -- -p 3000 & + AGENT_PID=$! + echo "AGENT_PID=$AGENT_PID" >> $GITHUB_ENV + # Wait for port 3000 to accept connections (30s max) + for i in {1..60}; do + if bash -lc "/dev/null; then + echo "agent is ready" + break + fi + sleep 0.5 + done + - name: Run WS probe test (Ubuntu) + if: matrix.os == 'ubuntu-latest' + shell: bash + env: + SOCKTOP_WS: ws://127.0.0.1:3000/ws + run: | + set -euo pipefail + cargo test -p socktop --test ws_probe -- --nocapture + - name: Stop agent (Ubuntu) + if: always() && matrix.os == 'ubuntu-latest' + shell: bash + run: | + if [ -n "${AGENT_PID:-}" ]; then kill $AGENT_PID || true; fi + - name: Start agent (Windows) + if: matrix.os == 'windows-latest' + shell: pwsh + run: | + $p = Start-Process -FilePath "cargo" -ArgumentList "run -p socktop_agent -- -p 3000" -PassThru + echo "AGENT_PID=$($p.Id)" | Out-File -FilePath $env:GITHUB_ENV -Append + $ready = $false + for ($i = 0; $i -lt 60; $i++) { + if (Test-NetConnection -ComputerName 127.0.0.1 -Port 3000 -InformationLevel Quiet) { $ready = $true; break } + Start-Sleep -Milliseconds 500 + } + if (-not $ready) { Write-Error "agent did not become ready" } + - name: Run WS probe test (Windows) + if: matrix.os == 'windows-latest' + shell: pwsh + run: | + $env:SOCKTOP_WS = "ws://127.0.0.1:3000/ws" + cargo test -p socktop --test ws_probe -- --nocapture + - name: Stop agent (Windows) + if: always() && matrix.os == 'windows-latest' + shell: pwsh + run: | + if ($env:AGENT_PID) { Stop-Process -Id $env:AGENT_PID -Force -ErrorAction SilentlyContinue } - name: Smoke test (client --help) run: cargo run -p socktop -- --help - name: Package artifacts diff --git a/README.md b/README.md index 8d61633..b9d2037 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ exec bash # or: exec zsh / exec fish Windows (for the brave): install from https://rustup.rs with the MSVC toolchain. Yes, you’ll need Visual Studio Build Tools. You chose Windows — enjoy the ride. -### Raspberry Pi / Ubuntu (required) +### Raspberry Pi / Ubuntu / PopOS (required) Install GPU support with apt command below diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..d0ead5e --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "stable" +components = ["clippy", "rustfmt"] diff --git a/socktop/src/app.rs b/socktop/src/app.rs index 4974ca9..4f02306 100644 --- a/socktop/src/app.rs +++ b/socktop/src/app.rs @@ -63,6 +63,9 @@ pub struct App { last_disks_poll: Instant, procs_interval: Duration, disks_interval: Duration, + + // For reconnects + ws_url: String, } impl App { @@ -91,11 +94,13 @@ impl App { .unwrap_or_else(Instant::now), procs_interval: Duration::from_secs(2), disks_interval: Duration::from_secs(5), + ws_url: String::new(), } } pub async fn run(&mut self, url: &str) -> Result<(), Box> { // Connect to agent + self.ws_url = url.to_string(); let mut ws = connect(url).await?; // Terminal setup @@ -244,7 +249,10 @@ impl App { break; } - // Fetch and update + // Draw current frame first so the UI never feels blocked + terminal.draw(|f| self.draw(f))?; + + // Then fetch and update if let Some(m) = request_metrics(ws).await { self.update_with_metrics(m); @@ -268,11 +276,13 @@ impl App { } self.last_disks_poll = Instant::now(); } + } else { + // If we couldn't get metrics, try to reconnect once + if let Ok(new_ws) = connect(&self.ws_url).await { + *ws = new_ws; + } } - // Draw - terminal.draw(|f| self.draw(f))?; - // Tick rate sleep(Duration::from_millis(500)).await; } @@ -461,6 +471,7 @@ impl Default for App { .unwrap_or_else(Instant::now), procs_interval: Duration::from_secs(2), disks_interval: Duration::from_secs(5), + ws_url: String::new(), } } } diff --git a/socktop/src/lib.rs b/socktop/src/lib.rs new file mode 100644 index 0000000..b9d64fc --- /dev/null +++ b/socktop/src/lib.rs @@ -0,0 +1,4 @@ +//! Library surface for integration tests and reuse. + +pub mod types; +pub mod ws; diff --git a/socktop/src/ws.rs b/socktop/src/ws.rs index 6fafc23..0298773 100644 --- a/socktop/src/ws.rs +++ b/socktop/src/ws.rs @@ -1,44 +1,82 @@ //! Minimal WebSocket client helpers for requesting metrics from the agent. -use flate2::bufread::GzDecoder; +use flate2::read::GzDecoder; use futures_util::{SinkExt, StreamExt}; -use std::io::Read; +use std::io::{Cursor, Read}; +use std::sync::OnceLock; use tokio::net::TcpStream; -use tokio::time::{interval, Duration}; +use tokio::time::{interval, timeout, Duration}; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; use crate::types::{DiskInfo, Metrics, ProcessesPayload}; pub type WsStream = WebSocketStream>; -// Connect to the agent and return the WS stream -pub async fn connect(url: &str) -> Result> { - let (ws, _) = connect_async(url).await?; - Ok(ws) +#[inline] +fn debug_on() -> bool { + static ON: OnceLock = OnceLock::new(); + *ON.get_or_init(|| { + std::env::var("SOCKTOP_DEBUG") + .map(|v| v != "0") + .unwrap_or(false) + }) } -// Send a "get_metrics" request and await a single JSON reply -pub async fn request_metrics(ws: &mut WsStream) -> Option { - if ws.send(Message::Text("get_metrics".into())).await.is_err() { - return None; +fn log_msg(msg: &Message) { + match msg { + Message::Binary(b) => eprintln!("ws: Binary {} bytes", b.len()), + Message::Text(s) => eprintln!("ws: Text {} bytes", s.len()), + Message::Close(_) => eprintln!("ws: Close"), + _ => eprintln!("ws: Other frame"), } - match ws.next().await { - Some(Ok(Message::Binary(b))) => { - gunzip_to_string(&b).and_then(|s| serde_json::from_str::(&s).ok()) - } - Some(Ok(Message::Text(json))) => serde_json::from_str::(&json).ok(), - _ => None, +} + +// Connect to the agent and return the WS stream +pub async fn connect(url: &str) -> Result> { + if debug_on() { + eprintln!("ws: connecting to {url}"); } + let (ws, _) = connect_async(url).await?; + if debug_on() { + eprintln!("ws: connected"); + } + Ok(ws) } // Decompress a gzip-compressed binary frame into a String. fn gunzip_to_string(bytes: &[u8]) -> Option { - let mut dec = GzDecoder::new(bytes); + let cursor = Cursor::new(bytes); + let mut dec = GzDecoder::new(cursor); let mut out = String::new(); dec.read_to_string(&mut out).ok()?; + if debug_on() { + eprintln!("ws: gunzip decoded {} bytes", out.len()); + } Some(out) } +fn message_to_json(msg: &Message) -> Option { + match msg { + Message::Binary(b) => { + if debug_on() { + eprintln!("ws: <- Binary frame {} bytes", b.len()); + } + if let Some(s) = gunzip_to_string(b) { + return Some(s); + } + // Fallback: try interpreting as UTF-8 JSON in a binary frame + String::from_utf8(b.clone()).ok() + } + Message::Text(s) => { + if debug_on() { + eprintln!("ws: <- Text frame {} bytes", s.len()); + } + Some(s.clone()) + } + _ => None, + } +} + // Suppress dead_code until these are wired into the app #[allow(dead_code)] pub enum Payload { @@ -64,22 +102,108 @@ fn parse_any_payload(json: &str) -> Result { ))) } +// Send a "get_metrics" request and await a single JSON reply +pub async fn request_metrics(ws: &mut WsStream) -> Option { + if debug_on() { + eprintln!("ws: -> get_metrics"); + } + if ws.send(Message::Text("get_metrics".into())).await.is_err() { + return None; + } + // Drain a few messages until we find Metrics (handle out-of-order replies) + for _ in 0..8 { + match timeout(Duration::from_millis(800), ws.next()).await { + Ok(Some(Ok(msg))) => { + if debug_on() { + log_msg(&msg); + } + if let Some(json) = message_to_json(&msg) { + match parse_any_payload(&json) { + Ok(Payload::Metrics(m)) => return Some(m), + Ok(Payload::Disks(_)) => { + if debug_on() { + eprintln!("ws: got Disks while waiting for Metrics"); + } + } + Ok(Payload::Processes(_)) => { + if debug_on() { + eprintln!("ws: got Processes while waiting for Metrics"); + } + } + Err(_e) => { + if debug_on() { + eprintln!( + "ws: unknown payload while waiting for Metrics (len={})", + json.len() + ); + } + } + } + } else if debug_on() { + eprintln!("ws: non-json frame while waiting for Metrics"); + } + } + Ok(Some(Err(_e))) => continue, + Ok(None) => return None, + Err(_elapsed) => continue, + } + } + None +} + // Send a "get_disks" request and await a JSON Vec pub async fn request_disks(ws: &mut WsStream) -> Option> { + if debug_on() { + eprintln!("ws: -> get_disks"); + } if ws.send(Message::Text("get_disks".into())).await.is_err() { return None; } - match ws.next().await { - Some(Ok(Message::Binary(b))) => { - gunzip_to_string(&b).and_then(|s| serde_json::from_str::>(&s).ok()) + for _ in 0..8 { + match timeout(Duration::from_millis(800), ws.next()).await { + Ok(Some(Ok(msg))) => { + if debug_on() { + log_msg(&msg); + } + if let Some(json) = message_to_json(&msg) { + match parse_any_payload(&json) { + Ok(Payload::Disks(d)) => return Some(d), + Ok(Payload::Metrics(_)) => { + if debug_on() { + eprintln!("ws: got Metrics while waiting for Disks"); + } + } + Ok(Payload::Processes(_)) => { + if debug_on() { + eprintln!("ws: got Processes while waiting for Disks"); + } + } + Err(_e) => { + if debug_on() { + eprintln!( + "ws: unknown payload while waiting for Disks (len={})", + json.len() + ); + } + } + } + } else if debug_on() { + eprintln!("ws: non-json frame while waiting for Disks"); + } + } + Ok(Some(Err(_e))) => continue, + Ok(None) => return None, + Err(_elapsed) => continue, } - Some(Ok(Message::Text(json))) => serde_json::from_str::>(&json).ok(), - _ => None, } + None } // Send a "get_processes" request and await a JSON ProcessesPayload pub async fn request_processes(ws: &mut WsStream) -> Option { + if debug_on() { + eprintln!("ws: -> get_processes"); + } if ws .send(Message::Text("get_processes".into())) .await @@ -87,13 +211,45 @@ pub async fn request_processes(ws: &mut WsStream) -> Option { { return None; } - match ws.next().await { - Some(Ok(Message::Binary(b))) => { - gunzip_to_string(&b).and_then(|s| serde_json::from_str::(&s).ok()) + for _ in 0..16 { + // allow a few more cycles due to gzip size + match timeout(Duration::from_millis(1200), ws.next()).await { + Ok(Some(Ok(msg))) => { + if debug_on() { + log_msg(&msg); + } + if let Some(json) = message_to_json(&msg) { + match parse_any_payload(&json) { + Ok(Payload::Processes(p)) => return Some(p), + Ok(Payload::Metrics(_)) => { + if debug_on() { + eprintln!("ws: got Metrics while waiting for Processes"); + } + } + Ok(Payload::Disks(_)) => { + if debug_on() { + eprintln!("ws: got Disks while waiting for Processes"); + } + } + Err(_e) => { + if debug_on() { + eprintln!( + "ws: unknown payload while waiting for Processes (len={})", + json.len() + ); + } + } + } + } else if debug_on() { + eprintln!("ws: non-json frame while waiting for Processes"); + } + } + Ok(Some(Err(_e))) => continue, + Ok(None) => return None, + Err(_elapsed) => continue, } - Some(Ok(Message::Text(json))) => serde_json::from_str::(&json).ok(), - _ => None, } + None } #[allow(dead_code)] @@ -120,20 +276,15 @@ pub async fn start_ws_polling(mut ws: WsStream) { maybe = ws.next() => { let Some(result) = maybe else { break; }; let Ok(msg) = result else { break; }; + if debug_on() { log_msg(&msg); } match msg { Message::Binary(b) => { if let Some(json) = gunzip_to_string(&b) { if let Ok(payload) = parse_any_payload(&json) { match payload { - Payload::Metrics(_m) => { - // update your app state with fast metrics - } - Payload::Disks(_d) => { - // update your app state with disks - } - Payload::Processes(_p) => { - // update your app state with processes - } + Payload::Metrics(_m) => {}, + Payload::Disks(_d) => {}, + Payload::Processes(_p) => {}, } } } @@ -141,9 +292,9 @@ pub async fn start_ws_polling(mut ws: WsStream) { Message::Text(s) => { if let Ok(payload) = parse_any_payload(&s) { match payload { - Payload::Metrics(_m) => {} - Payload::Disks(_d) => {} - Payload::Processes(_p) => {} + Payload::Metrics(_m) => {}, + Payload::Disks(_d) => {}, + Payload::Processes(_p) => {}, } } } diff --git a/socktop/tests/ws_probe.rs b/socktop/tests/ws_probe.rs new file mode 100644 index 0000000..93bd6d8 --- /dev/null +++ b/socktop/tests/ws_probe.rs @@ -0,0 +1,27 @@ +use socktop::ws::{connect, request_metrics, request_processes}; + +// Integration probe: only runs when SOCKTOP_WS is set to an agent WebSocket URL. +// Example: SOCKTOP_WS=ws://127.0.0.1:3000/ws cargo test -p socktop --test ws_probe -- --nocapture +#[tokio::test] +async fn probe_ws_endpoints() { + // Gate the test to avoid CI failures when no agent is running. + let url = match std::env::var("SOCKTOP_WS") { + Ok(v) if !v.is_empty() => v, + _ => { + eprintln!( + "skipping ws_probe: set SOCKTOP_WS=ws://host:port/ws to run this integration test" + ); + return; + } + }; + + let mut ws = connect(&url).await.expect("connect ws"); + + // Should get fast metrics quickly + let m = request_metrics(&mut ws).await; + assert!(m.is_some(), "expected Metrics payload within timeout"); + + // Processes may be gzipped and a bit slower, but should arrive + let p = request_processes(&mut ws).await; + assert!(p.is_some(), "expected Processes payload within timeout"); +} diff --git a/socktop_agent/src/metrics.rs b/socktop_agent/src/metrics.rs index bee125f..07a93f1 100644 --- a/socktop_agent/src/metrics.rs +++ b/socktop_agent/src/metrics.rs @@ -5,7 +5,9 @@ use crate::state::AppState; use crate::types::{DiskInfo, Metrics, NetworkInfo, ProcessInfo, ProcessesPayload}; use once_cell::sync::OnceCell; use std::collections::HashMap; +#[cfg(target_os = "linux")] use std::fs; +#[cfg(target_os = "linux")] use std::io; use std::sync::Mutex; use std::time::{Duration, Instant}; @@ -198,6 +200,8 @@ pub async fn collect_disks(state: &AppState) -> Vec { .collect() } +// Linux-only helpers and implementation using /proc deltas for accurate CPU%. +#[cfg(target_os = "linux")] #[inline] fn read_total_jiffies() -> io::Result { // /proc/stat first line: "cpu user nice system idle iowait irq softirq steal ..." @@ -216,6 +220,7 @@ fn read_total_jiffies() -> io::Result { Err(io::Error::other("no cpu line")) } +#[cfg(target_os = "linux")] #[inline] fn read_proc_jiffies(pid: u32) -> Option { let path = format!("/proc/{pid}/stat"); @@ -230,11 +235,10 @@ fn read_proc_jiffies(pid: u32) -> Option { Some(utime.saturating_add(stime)) } -// Replace the body of collect_processes_top_k to use /proc deltas. -// This makes CPU% = (delta_proc / delta_total) * 100 over the 2s interval. +/// Collect top processes (Linux variant): compute CPU% via /proc jiffies delta. +#[cfg(target_os = "linux")] pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload { // Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows). - // Only processes, no per-thread entries. let mut sys = System::new(); sys.refresh_processes_specifics( ProcessesToUpdate::All, @@ -308,6 +312,47 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay } } +/// Collect top processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh. +#[cfg(not(target_os = "linux"))] +pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload { + use tokio::time::sleep; + + let mut sys = state.sys.lock().await; + + // First refresh to set baseline + sys.refresh_processes_specifics( + ProcessesToUpdate::All, + false, + ProcessRefreshKind::everything().without_tasks(), + ); + // Small delay so sysinfo can compute CPU deltas on next refresh + sleep(Duration::from_millis(250)).await; + sys.refresh_processes_specifics( + ProcessesToUpdate::All, + false, + ProcessRefreshKind::everything().without_tasks(), + ); + + let total_count = sys.processes().len(); + + let mut procs: Vec = sys + .processes() + .values() + .map(|p| ProcessInfo { + pid: p.pid().as_u32(), + name: p.name().to_string_lossy().into_owned(), + cpu_usage: p.cpu_usage(), + mem_bytes: p.memory(), + }) + .collect(); + + procs = top_k_sorted(procs, k); + ProcessesPayload { + process_count: total_count, + top_processes: procs, + } +} + // Small helper to select and sort top-k by cpu fn top_k_sorted(mut v: Vec, k: usize) -> Vec { if v.len() > k { diff --git a/socktop_agent/src/state.rs b/socktop_agent/src/state.rs index 1f84a43..6af8960 100644 --- a/socktop_agent/src/state.rs +++ b/socktop_agent/src/state.rs @@ -24,7 +24,7 @@ pub struct AppState { pub disks: SharedDisks, pub networks: SharedNetworks, - // For correct per-process CPU% using /proc deltas + // For correct per-process CPU% using /proc deltas (Linux only path uses this tracker) pub proc_cpu: Arc>, // Connection tracking (to allow future idle sleeps if desired)