//! Metrics collection using sysinfo for socktop_agent. use crate::gpu::collect_all_gpus; use crate::state::AppState; use crate::types::{DiskInfo, Metrics, NetworkInfo, ProcessInfo, ProcessesPayload}; use once_cell::sync::OnceCell; #[cfg(target_os = "linux")] use std::collections::HashMap; #[cfg(target_os = "linux")] use std::fs; #[cfg(target_os = "linux")] use std::io; use std::sync::Mutex; use std::time::Duration as StdDuration; use std::time::{Duration, Instant}; use sysinfo::{ProcessRefreshKind, ProcessesToUpdate}; use tracing::warn; // NOTE: CPU normalization env removed; non-Linux now always reports per-process share (0..100) as given by sysinfo. // Runtime toggles (read once) fn gpu_enabled() -> bool { static ON: OnceCell = OnceCell::new(); *ON.get_or_init(|| { std::env::var("SOCKTOP_AGENT_GPU") .map(|v| v != "0") .unwrap_or(true) }) } fn temp_enabled() -> bool { static ON: OnceCell = OnceCell::new(); *ON.get_or_init(|| { std::env::var("SOCKTOP_AGENT_TEMP") .map(|v| v != "0") .unwrap_or(true) }) } // Tiny TTL caches to avoid rescanning sensors every 500ms const TTL: Duration = Duration::from_millis(1500); struct TempCache { at: Option, v: Option, } static TEMP: OnceCell> = OnceCell::new(); struct GpuCache { at: Option, v: Option>, } static GPUC: OnceCell> = OnceCell::new(); // Static caches for unchanging data static HOSTNAME: OnceCell = OnceCell::new(); struct NetworkNameCache { names: Vec, infos: Vec, } static NETWORK_CACHE: OnceCell> = OnceCell::new(); static CPU_VEC: OnceCell>> = OnceCell::new(); fn cached_temp() -> Option { if !temp_enabled() { return None; } let now = Instant::now(); let lock = TEMP.get_or_init(|| Mutex::new(TempCache { at: None, v: None })); let mut c = lock.lock().ok()?; if c.at.is_none_or(|t| now.duration_since(t) >= TTL) { c.at = Some(now); // caller will fill this; we just hold a slot c.v = None; } c.v } fn set_temp(v: Option) { if let Some(lock) = TEMP.get() { if let Ok(mut c) = lock.lock() { c.v = v; c.at = Some(Instant::now()); } } } fn cached_gpus() -> Option> { if !gpu_enabled() { return None; } let now = Instant::now(); let lock = GPUC.get_or_init(|| Mutex::new(GpuCache { at: None, v: None })); let mut c = lock.lock().ok()?; if c.at.is_none_or(|t| now.duration_since(t) >= TTL) { // mark stale; caller will refresh c.at = Some(now); c.v = None; } c.v.clone() } fn set_gpus(v: Option>) { if let Some(lock) = GPUC.get() { if let Ok(mut c) = lock.lock() { c.v = v.clone(); c.at = Some(Instant::now()); } } } // Collect only fast-changing metrics (CPU/mem/net + optional temps/gpus). pub async fn collect_fast_metrics(state: &AppState) -> Metrics { // TTL (ms) overridable via env, default 250ms let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_METRICS_TTL_MS") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(250); let ttl = StdDuration::from_millis(ttl_ms); { let cache = state.cache_metrics.lock().await; if cache.is_fresh(ttl) { if let Some(c) = cache.get() { return c.clone(); } } } let mut sys = state.sys.lock().await; if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { sys.refresh_cpu_usage(); sys.refresh_memory(); })) { warn!("sysinfo selective refresh panicked: {e:?}"); } // Get or initialize hostname once let hostname = HOSTNAME.get_or_init(|| state.hostname.clone()).clone(); // Reuse CPU vector to avoid allocation let cpu_total = sys.global_cpu_usage(); let cpu_per_core = { let vec_lock = CPU_VEC.get_or_init(|| Mutex::new(Vec::with_capacity(32))); let mut vec = vec_lock.lock().unwrap(); vec.clear(); vec.extend(sys.cpus().iter().map(|c| c.cpu_usage())); vec.clone() // Still need to clone but the allocation is reused }; let mem_total = sys.total_memory(); let mem_used = mem_total.saturating_sub(sys.available_memory()); let swap_total = sys.total_swap(); let swap_used = sys.used_swap(); drop(sys); // CPU temperature: only refresh sensors if cache is stale let cpu_temp_c = if cached_temp().is_some() { cached_temp() } else if temp_enabled() { let val = { let mut components = state.components.lock().await; components.refresh(false); components.iter().find_map(|c| { let l = c.label().to_ascii_lowercase(); if l.contains("cpu") || l.contains("package") || l.contains("tctl") || l.contains("tdie") { c.temperature() } else { None } }) }; set_temp(val); val } else { None }; // Networks with reusable name cache let networks = { let mut nets = state.networks.lock().await; nets.refresh(false); // Get or initialize network cache let cache = NETWORK_CACHE.get_or_init(|| { Mutex::new(NetworkNameCache { names: Vec::new(), infos: Vec::with_capacity(4), // Most systems have few network interfaces }) }); let mut cache = cache.lock().unwrap(); // Collect current network names let current_names: Vec<_> = nets.keys().map(|name| name.to_string()).collect(); // Update cached network names if they changed if cache.names != current_names { cache.names = current_names; } // Reuse NetworkInfo objects cache.infos.clear(); for (name, data) in nets.iter() { cache.infos.push(NetworkInfo { name: name.to_string(), // We'll still clone but avoid Vec reallocation received: data.total_received(), transmitted: data.total_transmitted(), }); } cache.infos.clone() }; // GPUs: if we already determined none exist, short-circuit (no repeated probing) let gpus = if gpu_enabled() { if state.gpu_checked.load(std::sync::atomic::Ordering::Acquire) && !state.gpu_present.load(std::sync::atomic::Ordering::Relaxed) { None } else if cached_gpus().is_some() { cached_gpus() } else { let v = match collect_all_gpus() { Ok(v) if !v.is_empty() => Some(v), Ok(_) => None, Err(e) => { warn!("gpu collection failed: {e}"); None } }; // First probe records presence; subsequent calls rely on cache flags. if !state .gpu_checked .swap(true, std::sync::atomic::Ordering::AcqRel) { if v.is_some() { state .gpu_present .store(true, std::sync::atomic::Ordering::Release); } else { state .gpu_present .store(false, std::sync::atomic::Ordering::Release); } } set_gpus(v.clone()); v } } else { None }; let metrics = Metrics { cpu_total, cpu_per_core, mem_total, mem_used, swap_total, swap_used, hostname, cpu_temp_c, disks: Vec::new(), networks, top_processes: Vec::new(), gpus, }; { let mut cache = state.cache_metrics.lock().await; cache.set(metrics.clone()); } metrics } // Cached disks pub async fn collect_disks(state: &AppState) -> Vec { let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_DISKS_TTL_MS") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(1_000); let ttl = StdDuration::from_millis(ttl_ms); { let cache = state.cache_disks.lock().await; if cache.is_fresh(ttl) { if let Some(v) = cache.get() { return v.clone(); } } } let mut disks_list = state.disks.lock().await; disks_list.refresh(false); // don't drop missing disks let disks: Vec = disks_list .iter() .map(|d| DiskInfo { name: d.name().to_string_lossy().into_owned(), total: d.total_space(), available: d.available_space(), }) .collect(); { let mut cache = state.cache_disks.lock().await; cache.set(disks.clone()); } disks } // Linux-only helpers and implementation using /proc deltas for accurate CPU%. #[cfg(target_os = "linux")] #[inline] fn read_total_jiffies() -> io::Result { // /proc/stat first line: "cpu user nice system idle iowait irq softirq steal ..." let s = fs::read_to_string("/proc/stat")?; if let Some(line) = s.lines().next() { let mut it = line.split_whitespace(); let _cpu = it.next(); // "cpu" let mut sum: u64 = 0; for tok in it.take(8) { if let Ok(v) = tok.parse::() { sum = sum.saturating_add(v); } } return Ok(sum); } Err(io::Error::other("no cpu line")) } #[cfg(target_os = "linux")] #[inline] fn read_proc_jiffies(pid: u32) -> Option { let path = format!("/proc/{pid}/stat"); let s = fs::read_to_string(path).ok()?; // Find the right parenthesis that terminates comm; everything after is space-separated fields starting at "state" let rpar = s.rfind(')')?; let after = s.get(rpar + 2..)?; // skip ") " let mut it = after.split_whitespace(); // utime (14th field) is offset 11 from "state", stime (15th) is next let utime = it.nth(11)?.parse::().ok()?; let stime = it.next()?.parse::().ok()?; Some(utime.saturating_add(stime)) } /// Collect all processes (Linux): compute CPU% via /proc jiffies delta; sorting moved to client. #[cfg(target_os = "linux")] pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_PROCESSES_TTL_MS") .ok() .and_then(|v| v.parse().ok()) // Higher default (1500ms) on non-Linux only; keep 1500 here for Linux correctness (more frequent updates). .unwrap_or(1_500); let ttl = StdDuration::from_millis(ttl_ms); { let cache = state.cache_processes.lock().await; if cache.is_fresh(ttl) { if let Some(c) = cache.get() { return c.clone(); } } } // Reuse shared System to avoid reallocation; refresh processes fully. let mut sys_guard = state.sys.lock().await; let sys = &mut *sys_guard; sys.refresh_processes_specifics( ProcessesToUpdate::All, false, ProcessRefreshKind::everything().without_tasks(), ); let total_count = sys.processes().len(); // Snapshot current per-pid jiffies let mut current: HashMap = HashMap::with_capacity(total_count); for p in sys.processes().values() { let pid = p.pid().as_u32(); if let Some(j) = read_proc_jiffies(pid) { current.insert(pid, j); } } let total_now = read_total_jiffies().unwrap_or(0); // Compute deltas vs last sample let (last_total, mut last_map) = { #[cfg(target_os = "linux")] { let mut t = state.proc_cpu.lock().await; let lt = t.last_total; let lm = std::mem::take(&mut t.last_per_pid); t.last_total = total_now; t.last_per_pid = current.clone(); (lt, lm) } #[cfg(not(target_os = "linux"))] { let _: u64 = total_now; // silence unused warning (0u64, HashMap::new()) } }; // On first run or if total delta is tiny, report zeros if last_total == 0 || total_now <= last_total { let procs: Vec = sys .processes() .values() .map(|p| ProcessInfo { pid: p.pid().as_u32(), name: p.name().to_string_lossy().into_owned(), cpu_usage: 0.0, mem_bytes: p.memory(), }) .collect(); return ProcessesPayload { process_count: total_count, top_processes: procs, }; } let dt = total_now.saturating_sub(last_total).max(1) as f32; let procs: Vec = sys .processes() .values() .map(|p| { let pid = p.pid().as_u32(); let now = current.get(&pid).copied().unwrap_or(0); let prev = last_map.remove(&pid).unwrap_or(0); let du = now.saturating_sub(prev) as f32; let cpu = ((du / dt) * 100.0).clamp(0.0, 100.0); ProcessInfo { pid, name: p.name().to_string_lossy().into_owned(), cpu_usage: cpu, mem_bytes: p.memory(), } }) .collect(); let payload = ProcessesPayload { process_count: total_count, top_processes: procs, }; { let mut cache = state.cache_processes.lock().await; cache.set(payload.clone()); } payload } /// Collect all processes (non-Linux): optimized for reduced allocations and selective updates. #[cfg(not(target_os = "linux"))] pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { // Serve from cache if fresh { let cache = state.cache_processes.lock().await; if cache.is_fresh(StdDuration::from_millis(2_000)) { // Use fixed TTL for cache check if let Some(c) = cache.get() { return c.clone(); } } } // Single efficient refresh with optimized CPU collection let (total_count, procs) = { let mut sys = state.sys.lock().await; let kind = ProcessRefreshKind::nothing().with_memory(); // Optimize refresh strategy based on system load //if load > 5.0 { //JW too complicated. simplify to remove strange behavior // For active systems, get accurate CPU metrics sys.refresh_processes_specifics(ProcessesToUpdate::All, false, kind.with_cpu()); // } else { // // For idle systems, just get basic process info // sys.refresh_processes_specifics(ProcessesToUpdate::All, false, kind); // sys.refresh_cpu_usage(); // } let total_count = sys.processes().len(); let cpu_count = sys.cpus().len() as f32; // Reuse allocations via process cache let mut proc_cache = state.proc_cache.lock().await; proc_cache.reusable_vec.clear(); // Collect all processes, will sort by CPU later for p in sys.processes().values() { let pid = p.pid().as_u32(); // Reuse cached name if available let name = if let Some(cached) = proc_cache.names.get(&pid) { cached.clone() } else { let new_name = p.name().to_string_lossy().into_owned(); proc_cache.names.insert(pid, new_name.clone()); new_name }; // Convert to percentage of total CPU capacity // e.g., 100% on 2 cores of 8 core system = 25% total CPU let raw = p.cpu_usage(); // This is per-core percentage let total_cpu = raw.clamp(0.0, 100.0) / cpu_count; proc_cache.reusable_vec.push(ProcessInfo { pid, name, cpu_usage: total_cpu, mem_bytes: p.memory(), }); } //JW no need to sort here; client does the sorting // // Sort by CPU usage // proc_cache.reusable_vec.sort_by(|a, b| { // b.cpu_usage // .partial_cmp(&a.cpu_usage) // .unwrap_or(std::cmp::Ordering::Equal) // }); // Clean up old process names cache when it grows too large let cache_cleanup_threshold = std::env::var("SOCKTOP_AGENT_NAME_CACHE_CLEANUP_THRESHOLD") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(1000); // Default: most modern systems have 400-700 processes if total_count > proc_cache.names.len() + cache_cleanup_threshold { let now = std::time::Instant::now(); proc_cache .names .retain(|pid, _| sys.processes().contains_key(&sysinfo::Pid::from_u32(*pid))); tracing::debug!( "Cleaned up {} stale process names in {}ms", proc_cache.names.capacity() - proc_cache.names.len(), now.elapsed().as_millis() ); } // Get all processes, take ownership of the vec (will be replaced with empty vec) (total_count, std::mem::take(&mut proc_cache.reusable_vec)) }; let payload = ProcessesPayload { process_count: total_count, top_processes: procs, }; { let mut cache = state.cache_processes.lock().await; cache.set(payload.clone()); } payload }