diff --git a/socktop_agent/src/metrics.rs b/socktop_agent/src/metrics.rs index e1e8c4e..85e0d00 100644 --- a/socktop_agent/src/metrics.rs +++ b/socktop_agent/src/metrics.rs @@ -12,7 +12,8 @@ use std::fs; use std::io; use std::sync::Mutex; use std::time::{Duration, Instant}; -use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, System}; +use sysinfo::{ProcessRefreshKind, ProcessesToUpdate}; +use std::time::Duration as StdDuration; use tracing::warn; // Runtime toggles (read once) @@ -97,6 +98,20 @@ fn set_gpus(v: Option>) { // Collect only fast-changing metrics (CPU/mem/net + optional temps/gpus). pub async fn collect_fast_metrics(state: &AppState) -> Metrics { + // TTL (ms) overridable via env, default 250ms + let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_METRICS_TTL_MS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(250); + let ttl = StdDuration::from_millis(ttl_ms); + { + let cache = state.cache_metrics.lock().await; + if cache.is_fresh(ttl) { + if let Some(c) = cache.take_clone() { + return c; + } + } + } let mut sys = state.sys.lock().await; if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { sys.refresh_cpu_usage(); @@ -105,7 +120,7 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics { warn!("sysinfo selective refresh panicked: {e:?}"); } - let hostname = System::host_name().unwrap_or_else(|| "unknown".to_string()); + let hostname = state.hostname.clone(); let cpu_total = sys.global_cpu_usage(); let cpu_per_core: Vec = sys.cpus().iter().map(|c| c.cpu_usage()).collect(); let mem_total = sys.total_memory(); @@ -192,7 +207,7 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics { None }; - Metrics { + let metrics = Metrics { cpu_total, cpu_per_core, mem_total, @@ -205,21 +220,44 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics { networks, top_processes: Vec::new(), gpus, + }; + { + let mut cache = state.cache_metrics.lock().await; + cache.set(metrics.clone()); } + metrics } // Cached disks pub async fn collect_disks(state: &AppState) -> Vec { + let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_DISKS_TTL_MS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(1_000); + let ttl = StdDuration::from_millis(ttl_ms); + { + let cache = state.cache_disks.lock().await; + if cache.is_fresh(ttl) { + if let Some(v) = cache.take_clone() { + return v; + } + } + } let mut disks_list = state.disks.lock().await; disks_list.refresh(false); // don't drop missing disks - disks_list + let disks: Vec = disks_list .iter() .map(|d| DiskInfo { name: d.name().to_string_lossy().into_owned(), total: d.total_space(), available: d.available_space(), }) - .collect() + .collect(); + { + let mut cache = state.cache_disks.lock().await; + cache.set(disks.clone()); + } + disks } // Linux-only helpers and implementation using /proc deltas for accurate CPU%. @@ -260,8 +298,22 @@ fn read_proc_jiffies(pid: u32) -> Option { /// Collect all processes (Linux): compute CPU% via /proc jiffies delta; sorting moved to client. #[cfg(target_os = "linux")] pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { - // Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows). - let mut sys = System::new(); + let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_PROCESSES_TTL_MS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(1_000); + let ttl = StdDuration::from_millis(ttl_ms); + { + let cache = state.cache_processes.lock().await; + if cache.is_fresh(ttl) { + if let Some(v) = cache.take_clone() { + return v; + } + } + } + // Reuse shared System to avoid reallocation; refresh processes fully. + let mut sys_guard = state.sys.lock().await; + let sys = &mut *sys_guard; sys.refresh_processes_specifics( ProcessesToUpdate::All, false, @@ -336,50 +388,70 @@ pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { }) .collect(); - ProcessesPayload { + let payload = ProcessesPayload { process_count: total_count, top_processes: procs, + }; + { + let mut cache = state.cache_processes.lock().await; + cache.set(payload.clone()); } + payload } /// Collect all processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh. #[cfg(not(target_os = "linux"))] pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { use tokio::time::sleep; - - let mut sys = state.sys.lock().await; - - // First refresh to set baseline - sys.refresh_processes_specifics( - ProcessesToUpdate::All, - false, - ProcessRefreshKind::everything().without_tasks(), - ); - // Small delay so sysinfo can compute CPU deltas on next refresh + let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_PROCESSES_TTL_MS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(1_000); + let ttl = StdDuration::from_millis(ttl_ms); + { + let cache = state.cache_processes.lock().await; + if cache.is_fresh(ttl) { + if let Some(v) = cache.take_clone() { + return v; + } + } + } + { + let mut sys = state.sys.lock().await; + sys.refresh_processes_specifics( + ProcessesToUpdate::All, + false, + ProcessRefreshKind::everything().without_tasks(), + ); + } + // Release lock during sleep interval sleep(Duration::from_millis(250)).await; - sys.refresh_processes_specifics( - ProcessesToUpdate::All, - false, - ProcessRefreshKind::everything().without_tasks(), - ); - - let total_count = sys.processes().len(); - - let procs: Vec = sys - .processes() - .values() - .map(|p| ProcessInfo { - pid: p.pid().as_u32(), - name: p.name().to_string_lossy().into_owned(), - cpu_usage: p.cpu_usage(), - mem_bytes: p.memory(), - }) - .collect(); - ProcessesPayload { - process_count: total_count, - top_processes: procs, + { + let mut sys = state.sys.lock().await; + sys.refresh_processes_specifics( + ProcessesToUpdate::All, + false, + ProcessRefreshKind::everything().without_tasks(), + ); + let total_count = sys.processes().len(); + let procs: Vec = sys + .processes() + .values() + .map(|p| ProcessInfo { + pid: p.pid().as_u32(), + name: p.name().to_string_lossy().into_owned(), + cpu_usage: p.cpu_usage(), + mem_bytes: p.memory(), + }) + .collect(); + let payload = ProcessesPayload { + process_count: total_count, + top_processes: procs, + }; + { + let mut cache = state.cache_processes.lock().await; + cache.set(payload.clone()); + } + return payload; } } - -// Small helper to select and sort top-k by cpu -// Client now handles sorting/pagination. diff --git a/socktop_agent/src/state.rs b/socktop_agent/src/state.rs index 9b8a6ce..099aa8e 100644 --- a/socktop_agent/src/state.rs +++ b/socktop_agent/src/state.rs @@ -6,6 +6,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::Arc; use sysinfo::{Components, Disks, Networks, System}; use tokio::sync::Mutex; +use std::time::{Duration, Instant}; pub type SharedSystem = Arc>; pub type SharedComponents = Arc>; @@ -25,6 +26,7 @@ pub struct AppState { pub components: SharedComponents, pub disks: SharedDisks, pub networks: SharedNetworks, + pub hostname: String, // For correct per-process CPU% using /proc deltas (Linux only path uses this tracker) #[cfg(target_os = "linux")] @@ -37,6 +39,36 @@ pub struct AppState { // GPU negative cache (probe once). gpu_checked=true after first attempt; gpu_present reflects result. pub gpu_checked: Arc, pub gpu_present: Arc, + + // Lightweight on-demand caches (TTL based) to cap CPU under bursty polling. + pub cache_metrics: Arc>>, + pub cache_disks: Arc>>>, + pub cache_processes: Arc>>, +} + +#[derive(Clone, Debug)] +pub struct CacheEntry { + pub at: Option, + pub value: Option, +} + +impl CacheEntry { + pub fn new() -> Self { + Self { at: None, value: None } + } + pub fn is_fresh(&self, ttl: Duration) -> bool { + self.at.is_some_and(|t| t.elapsed() < ttl) && self.value.is_some() + } + pub fn set(&mut self, v: T) { + self.value = Some(v); + self.at = Some(Instant::now()); + } + pub fn take_clone(&self) -> Option + where + T: Clone, + { + self.value.clone() + } } impl AppState { @@ -51,6 +83,7 @@ impl AppState { components: Arc::new(Mutex::new(components)), disks: Arc::new(Mutex::new(disks)), networks: Arc::new(Mutex::new(networks)), + hostname: System::host_name().unwrap_or_else(|| "unknown".into()), #[cfg(target_os = "linux")] proc_cpu: Arc::new(Mutex::new(ProcCpuTracker::default())), client_count: Arc::new(AtomicUsize::new(0)), @@ -59,6 +92,9 @@ impl AppState { .filter(|s| !s.is_empty()), gpu_checked: Arc::new(AtomicBool::new(false)), gpu_present: Arc::new(AtomicBool::new(false)), + cache_metrics: Arc::new(Mutex::new(CacheEntry::new())), + cache_disks: Arc::new(Mutex::new(CacheEntry::new())), + cache_processes: Arc::new(Mutex::new(CacheEntry::new())), } } }