diff --git a/Cargo.lock b/Cargo.lock index 0e11ba6..324d00e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,7 +158,7 @@ dependencies = [ "sha1", "sync_wrapper", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.24.0", "tower 0.5.2", "tower-layer", "tower-service", @@ -2181,13 +2181,13 @@ dependencies = [ "sysinfo", "tempfile", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.24.0", "url", ] [[package]] name = "socktop_agent" -version = "1.40.66" +version = "1.40.67" dependencies = [ "anyhow", "assert_cmd", @@ -2210,6 +2210,7 @@ dependencies = [ "tempfile", "time", "tokio", + "tokio-tungstenite 0.21.0", "tonic-build", "tracing", "tracing-subscriber", @@ -2463,6 +2464,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", +] + [[package]] name = "tokio-tungstenite" version = "0.24.0" @@ -2475,7 +2488,7 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls 0.26.2", - "tungstenite", + "tungstenite 0.24.0", ] [[package]] @@ -2608,6 +2621,25 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror 1.0.69", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.24.0" diff --git a/socktop_agent/Cargo.toml b/socktop_agent/Cargo.toml index 8bda2b1..2b553f6 100644 --- a/socktop_agent/Cargo.toml +++ b/socktop_agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "socktop_agent" -version = "1.40.66" +version = "1.40.67" authors = ["Jason Witty "] description = "Remote system monitor over WebSocket, TUI like top" edition = "2021" @@ -36,3 +36,4 @@ protoc-bin-vendored = "3" [dev-dependencies] assert_cmd = "2.0" tempfile = "3.10" +tokio-tungstenite = "0.21" diff --git a/socktop_agent/src/metrics.rs b/socktop_agent/src/metrics.rs index 9718236..91f8d9a 100644 --- a/socktop_agent/src/metrics.rs +++ b/socktop_agent/src/metrics.rs @@ -49,6 +49,15 @@ struct GpuCache { } static GPUC: OnceCell> = OnceCell::new(); +// Static caches for unchanging data +static HOSTNAME: OnceCell = OnceCell::new(); +struct NetworkNameCache { + names: Vec, + infos: Vec, +} +static NETWORK_CACHE: OnceCell> = OnceCell::new(); +static CPU_VEC: OnceCell>> = OnceCell::new(); + fn cached_temp() -> Option { if !temp_enabled() { return None; @@ -121,9 +130,19 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics { warn!("sysinfo selective refresh panicked: {e:?}"); } - let hostname = state.hostname.clone(); + // Get or initialize hostname once + let hostname = HOSTNAME.get_or_init(|| state.hostname.clone()).clone(); + + // Reuse CPU vector to avoid allocation let cpu_total = sys.global_cpu_usage(); - let cpu_per_core: Vec = sys.cpus().iter().map(|c| c.cpu_usage()).collect(); + let cpu_per_core = { + let vec_lock = CPU_VEC.get_or_init(|| Mutex::new(Vec::with_capacity(32))); + let mut vec = vec_lock.lock().unwrap(); + vec.clear(); + vec.extend(sys.cpus().iter().map(|c| c.cpu_usage())); + vec.clone() // Still need to clone but the allocation is reused + }; + let mem_total = sys.total_memory(); let mem_used = mem_total.saturating_sub(sys.available_memory()); let swap_total = sys.total_swap(); @@ -156,17 +175,38 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics { None }; - // Networks - let networks: Vec = { + // Networks with reusable name cache + let networks = { let mut nets = state.networks.lock().await; nets.refresh(false); - nets.iter() - .map(|(name, data)| NetworkInfo { - name: name.to_string(), + + // Get or initialize network cache + let cache = NETWORK_CACHE.get_or_init(|| { + Mutex::new(NetworkNameCache { + names: Vec::new(), + infos: Vec::with_capacity(4), // Most systems have few network interfaces + }) + }); + let mut cache = cache.lock().unwrap(); + + // Collect current network names + let current_names: Vec<_> = nets.keys().map(|name| name.to_string()).collect(); + + // Update cached network names if they changed + if cache.names != current_names { + cache.names = current_names; + } + + // Reuse NetworkInfo objects + cache.infos.clear(); + for (name, data) in nets.iter() { + cache.infos.push(NetworkInfo { + name: name.to_string(), // We'll still clone but avoid Vec reallocation received: data.total_received(), transmitted: data.total_transmitted(), - }) - .collect() + }); + } + cache.infos.clone() }; // GPUs: if we already determined none exist, short-circuit (no repeated probing) diff --git a/socktop_agent/src/ws.rs b/socktop_agent/src/ws.rs index 07cee80..9114d55 100644 --- a/socktop_agent/src/ws.rs +++ b/socktop_agent/src/ws.rs @@ -7,13 +7,33 @@ use axum::{ }; use flate2::{write::GzEncoder, Compression}; use futures_util::StreamExt; +use once_cell::sync::OnceCell; use std::collections::HashMap; use std::io::Write; +use tokio::sync::Mutex; use crate::metrics::{collect_disks, collect_fast_metrics, collect_processes_all}; use crate::proto::pb; use crate::state::AppState; +// Compression threshold based on typical payload size +const COMPRESSION_THRESHOLD: usize = 768; + +// Reusable buffer for compression to avoid allocations +struct CompressionCache { + processes_vec: Vec, +} + +impl CompressionCache { + fn new() -> Self { + Self { + processes_vec: Vec::with_capacity(512), // Typical process count + } + } +} + +static COMPRESSION_CACHE: OnceCell> = OnceCell::new(); + pub async fn ws_handler( ws: WebSocketUpgrade, State(state): State, @@ -46,38 +66,50 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { } Message::Text(ref text) if text == "get_processes" => { let payload = collect_processes_all(&state).await; + // Map to protobuf message - let rows: Vec = payload - .top_processes - .into_iter() - .map(|p| pb::Process { + // Get cached buffers + let cache = COMPRESSION_CACHE.get_or_init(|| Mutex::new(CompressionCache::new())); + let mut cache = cache.lock().await; + + // Reuse process vector to build the list + cache.processes_vec.clear(); + cache + .processes_vec + .extend(payload.top_processes.into_iter().map(|p| pb::Process { pid: p.pid, name: p.name, cpu_usage: p.cpu_usage, mem_bytes: p.mem_bytes, - }) - .collect(); + })); + let pb = pb::Processes { process_count: payload.process_count as u64, - rows, + rows: std::mem::take(&mut cache.processes_vec), }; + let mut buf = Vec::with_capacity(8 * 1024); if prost::Message::encode(&pb, &mut buf).is_err() { let _ = socket.send(Message::Close(None)).await; } else { // compress if large - if buf.len() <= 768 { + if buf.len() <= COMPRESSION_THRESHOLD { let _ = socket.send(Message::Binary(buf)).await; } else { - let mut enc = GzEncoder::new(Vec::new(), Compression::fast()); - if enc.write_all(&buf).is_ok() { - let bin = enc.finish().unwrap_or(buf); - let _ = socket.send(Message::Binary(bin)).await; - } else { - let _ = socket.send(Message::Binary(buf)).await; + // Create a new encoder for each message to ensure proper gzip headers + let mut encoder = + GzEncoder::new(Vec::with_capacity(buf.len()), Compression::fast()); + match encoder.write_all(&buf).and_then(|_| encoder.finish()) { + Ok(compressed) => { + let _ = socket.send(Message::Binary(compressed)).await; + } + Err(_) => { + let _ = socket.send(Message::Binary(buf)).await; + } } } } + drop(cache); // Explicit drop to release mutex early } Message::Close(_) => break, _ => {} @@ -91,7 +123,7 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { // Small, cheap gzip for larger payloads; send text for small. async fn send_json(ws: &mut WebSocket, value: &T) -> Result<(), axum::Error> { let json = serde_json::to_string(value).expect("serialize"); - if json.len() <= 768 { + if json.len() <= COMPRESSION_THRESHOLD { return ws.send(Message::Text(json)).await; } let mut enc = GzEncoder::new(Vec::new(), Compression::fast()); @@ -99,3 +131,83 @@ async fn send_json(ws: &mut WebSocket, value: &T) -> Result let bin = enc.finish().unwrap_or_else(|_| json.into_bytes()); ws.send(Message::Binary(bin)).await } + +#[cfg(test)] +mod tests { + use super::*; + use prost::Message as ProstMessage; + use sysinfo::System; + + #[tokio::test] + async fn test_process_list_not_empty() { + // Initialize system data first to ensure we have processes + let mut sys = System::new_all(); + sys.refresh_all(); + + // Create state and put the refreshed system in it + let state = AppState::new(); + { + let mut sys_lock = state.sys.lock().await; + *sys_lock = sys; + } + + // Get processes directly using the collection function + let processes = collect_processes_all(&state).await; + + // Convert to protobuf message format + let cache = COMPRESSION_CACHE.get_or_init(|| Mutex::new(CompressionCache::new())); + let mut cache = cache.lock().await; + + // Reuse process vector to build the list + cache.processes_vec.clear(); + cache + .processes_vec + .extend(processes.top_processes.into_iter().map(|p| pb::Process { + pid: p.pid, + name: p.name, + cpu_usage: p.cpu_usage, + mem_bytes: p.mem_bytes, + })); + + // Create the protobuf message + let pb = pb::Processes { + process_count: processes.process_count as u64, + rows: cache.processes_vec.clone(), + }; + + // Test protobuf encoding/decoding + let mut buf = Vec::new(); + prost::Message::encode(&pb, &mut buf).expect("Failed to encode protobuf"); + let decoded = pb::Processes::decode(buf.as_slice()).expect("Failed to decode protobuf"); + + // Print debug info + println!("Process count: {}", pb.process_count); + println!("Process vector length: {}", pb.rows.len()); + println!("Encoded size: {} bytes", buf.len()); + println!("Decoded process count: {}", decoded.rows.len()); + + // Print first few processes if available + for (i, process) in pb.rows.iter().take(5).enumerate() { + println!( + "Process {}: {} (PID: {}) CPU: {:.1}% MEM: {} bytes", + i + 1, + process.name, + process.pid, + process.cpu_usage, + process.mem_bytes + ); + } + + // Validate + assert!(!pb.rows.is_empty(), "Process list should not be empty"); + assert!( + pb.process_count > 0, + "Process count should be greater than 0" + ); + assert_eq!( + pb.process_count as usize, + pb.rows.len(), + "Process count mismatch with actual rows" + ); + } +}