perf(agent): add hostname + TTL caches (metrics/disks/processes) and reuse sys for processes

This commit is contained in:
jasonwitty 2025-08-24 12:38:32 -07:00
parent b2468a5936
commit 85f9a44e46
2 changed files with 150 additions and 42 deletions

View File

@ -12,7 +12,8 @@ use std::fs;
use std::io; use std::io;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, System}; use sysinfo::{ProcessRefreshKind, ProcessesToUpdate};
use std::time::Duration as StdDuration;
use tracing::warn; use tracing::warn;
// Runtime toggles (read once) // Runtime toggles (read once)
@ -97,6 +98,20 @@ fn set_gpus(v: Option<Vec<crate::gpu::GpuMetrics>>) {
// Collect only fast-changing metrics (CPU/mem/net + optional temps/gpus). // Collect only fast-changing metrics (CPU/mem/net + optional temps/gpus).
pub async fn collect_fast_metrics(state: &AppState) -> Metrics { pub async fn collect_fast_metrics(state: &AppState) -> Metrics {
// TTL (ms) overridable via env, default 250ms
let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_METRICS_TTL_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(250);
let ttl = StdDuration::from_millis(ttl_ms);
{
let cache = state.cache_metrics.lock().await;
if cache.is_fresh(ttl) {
if let Some(c) = cache.take_clone() {
return c;
}
}
}
let mut sys = state.sys.lock().await; let mut sys = state.sys.lock().await;
if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
sys.refresh_cpu_usage(); sys.refresh_cpu_usage();
@ -105,7 +120,7 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics {
warn!("sysinfo selective refresh panicked: {e:?}"); warn!("sysinfo selective refresh panicked: {e:?}");
} }
let hostname = System::host_name().unwrap_or_else(|| "unknown".to_string()); let hostname = state.hostname.clone();
let cpu_total = sys.global_cpu_usage(); let cpu_total = sys.global_cpu_usage();
let cpu_per_core: Vec<f32> = sys.cpus().iter().map(|c| c.cpu_usage()).collect(); let cpu_per_core: Vec<f32> = sys.cpus().iter().map(|c| c.cpu_usage()).collect();
let mem_total = sys.total_memory(); let mem_total = sys.total_memory();
@ -192,7 +207,7 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics {
None None
}; };
Metrics { let metrics = Metrics {
cpu_total, cpu_total,
cpu_per_core, cpu_per_core,
mem_total, mem_total,
@ -205,21 +220,44 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics {
networks, networks,
top_processes: Vec::new(), top_processes: Vec::new(),
gpus, gpus,
};
{
let mut cache = state.cache_metrics.lock().await;
cache.set(metrics.clone());
} }
metrics
} }
// Cached disks // Cached disks
pub async fn collect_disks(state: &AppState) -> Vec<DiskInfo> { pub async fn collect_disks(state: &AppState) -> Vec<DiskInfo> {
let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_DISKS_TTL_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(1_000);
let ttl = StdDuration::from_millis(ttl_ms);
{
let cache = state.cache_disks.lock().await;
if cache.is_fresh(ttl) {
if let Some(v) = cache.take_clone() {
return v;
}
}
}
let mut disks_list = state.disks.lock().await; let mut disks_list = state.disks.lock().await;
disks_list.refresh(false); // don't drop missing disks disks_list.refresh(false); // don't drop missing disks
disks_list let disks: Vec<DiskInfo> = disks_list
.iter() .iter()
.map(|d| DiskInfo { .map(|d| DiskInfo {
name: d.name().to_string_lossy().into_owned(), name: d.name().to_string_lossy().into_owned(),
total: d.total_space(), total: d.total_space(),
available: d.available_space(), available: d.available_space(),
}) })
.collect() .collect();
{
let mut cache = state.cache_disks.lock().await;
cache.set(disks.clone());
}
disks
} }
// Linux-only helpers and implementation using /proc deltas for accurate CPU%. // Linux-only helpers and implementation using /proc deltas for accurate CPU%.
@ -260,8 +298,22 @@ fn read_proc_jiffies(pid: u32) -> Option<u64> {
/// Collect all processes (Linux): compute CPU% via /proc jiffies delta; sorting moved to client. /// Collect all processes (Linux): compute CPU% via /proc jiffies delta; sorting moved to client.
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
// Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows). let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_PROCESSES_TTL_MS")
let mut sys = System::new(); .ok()
.and_then(|v| v.parse().ok())
.unwrap_or(1_000);
let ttl = StdDuration::from_millis(ttl_ms);
{
let cache = state.cache_processes.lock().await;
if cache.is_fresh(ttl) {
if let Some(v) = cache.take_clone() {
return v;
}
}
}
// Reuse shared System to avoid reallocation; refresh processes fully.
let mut sys_guard = state.sys.lock().await;
let sys = &mut *sys_guard;
sys.refresh_processes_specifics( sys.refresh_processes_specifics(
ProcessesToUpdate::All, ProcessesToUpdate::All,
false, false,
@ -336,50 +388,70 @@ pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
}) })
.collect(); .collect();
ProcessesPayload { let payload = ProcessesPayload {
process_count: total_count, process_count: total_count,
top_processes: procs, top_processes: procs,
};
{
let mut cache = state.cache_processes.lock().await;
cache.set(payload.clone());
} }
payload
} }
/// Collect all processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh. /// Collect all processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh.
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
use tokio::time::sleep; use tokio::time::sleep;
let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_PROCESSES_TTL_MS")
let mut sys = state.sys.lock().await; .ok()
.and_then(|v| v.parse().ok())
// First refresh to set baseline .unwrap_or(1_000);
sys.refresh_processes_specifics( let ttl = StdDuration::from_millis(ttl_ms);
ProcessesToUpdate::All, {
false, let cache = state.cache_processes.lock().await;
ProcessRefreshKind::everything().without_tasks(), if cache.is_fresh(ttl) {
); if let Some(v) = cache.take_clone() {
// Small delay so sysinfo can compute CPU deltas on next refresh return v;
}
}
}
{
let mut sys = state.sys.lock().await;
sys.refresh_processes_specifics(
ProcessesToUpdate::All,
false,
ProcessRefreshKind::everything().without_tasks(),
);
}
// Release lock during sleep interval
sleep(Duration::from_millis(250)).await; sleep(Duration::from_millis(250)).await;
sys.refresh_processes_specifics( {
ProcessesToUpdate::All, let mut sys = state.sys.lock().await;
false, sys.refresh_processes_specifics(
ProcessRefreshKind::everything().without_tasks(), ProcessesToUpdate::All,
); false,
ProcessRefreshKind::everything().without_tasks(),
let total_count = sys.processes().len(); );
let total_count = sys.processes().len();
let procs: Vec<ProcessInfo> = sys let procs: Vec<ProcessInfo> = sys
.processes() .processes()
.values() .values()
.map(|p| ProcessInfo { .map(|p| ProcessInfo {
pid: p.pid().as_u32(), pid: p.pid().as_u32(),
name: p.name().to_string_lossy().into_owned(), name: p.name().to_string_lossy().into_owned(),
cpu_usage: p.cpu_usage(), cpu_usage: p.cpu_usage(),
mem_bytes: p.memory(), mem_bytes: p.memory(),
}) })
.collect(); .collect();
ProcessesPayload { let payload = ProcessesPayload {
process_count: total_count, process_count: total_count,
top_processes: procs, top_processes: procs,
};
{
let mut cache = state.cache_processes.lock().await;
cache.set(payload.clone());
}
return payload;
} }
} }
// Small helper to select and sort top-k by cpu
// Client now handles sorting/pagination.

View File

@ -6,6 +6,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc; use std::sync::Arc;
use sysinfo::{Components, Disks, Networks, System}; use sysinfo::{Components, Disks, Networks, System};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use std::time::{Duration, Instant};
pub type SharedSystem = Arc<Mutex<System>>; pub type SharedSystem = Arc<Mutex<System>>;
pub type SharedComponents = Arc<Mutex<Components>>; pub type SharedComponents = Arc<Mutex<Components>>;
@ -25,6 +26,7 @@ pub struct AppState {
pub components: SharedComponents, pub components: SharedComponents,
pub disks: SharedDisks, pub disks: SharedDisks,
pub networks: SharedNetworks, pub networks: SharedNetworks,
pub hostname: String,
// For correct per-process CPU% using /proc deltas (Linux only path uses this tracker) // For correct per-process CPU% using /proc deltas (Linux only path uses this tracker)
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
@ -37,6 +39,36 @@ pub struct AppState {
// GPU negative cache (probe once). gpu_checked=true after first attempt; gpu_present reflects result. // GPU negative cache (probe once). gpu_checked=true after first attempt; gpu_present reflects result.
pub gpu_checked: Arc<AtomicBool>, pub gpu_checked: Arc<AtomicBool>,
pub gpu_present: Arc<AtomicBool>, pub gpu_present: Arc<AtomicBool>,
// Lightweight on-demand caches (TTL based) to cap CPU under bursty polling.
pub cache_metrics: Arc<Mutex<CacheEntry<crate::types::Metrics>>>,
pub cache_disks: Arc<Mutex<CacheEntry<Vec<crate::types::DiskInfo>>>>,
pub cache_processes: Arc<Mutex<CacheEntry<crate::types::ProcessesPayload>>>,
}
#[derive(Clone, Debug)]
pub struct CacheEntry<T> {
pub at: Option<Instant>,
pub value: Option<T>,
}
impl<T> CacheEntry<T> {
pub fn new() -> Self {
Self { at: None, value: None }
}
pub fn is_fresh(&self, ttl: Duration) -> bool {
self.at.is_some_and(|t| t.elapsed() < ttl) && self.value.is_some()
}
pub fn set(&mut self, v: T) {
self.value = Some(v);
self.at = Some(Instant::now());
}
pub fn take_clone(&self) -> Option<T>
where
T: Clone,
{
self.value.clone()
}
} }
impl AppState { impl AppState {
@ -51,6 +83,7 @@ impl AppState {
components: Arc::new(Mutex::new(components)), components: Arc::new(Mutex::new(components)),
disks: Arc::new(Mutex::new(disks)), disks: Arc::new(Mutex::new(disks)),
networks: Arc::new(Mutex::new(networks)), networks: Arc::new(Mutex::new(networks)),
hostname: System::host_name().unwrap_or_else(|| "unknown".into()),
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
proc_cpu: Arc::new(Mutex::new(ProcCpuTracker::default())), proc_cpu: Arc::new(Mutex::new(ProcCpuTracker::default())),
client_count: Arc::new(AtomicUsize::new(0)), client_count: Arc::new(AtomicUsize::new(0)),
@ -59,6 +92,9 @@ impl AppState {
.filter(|s| !s.is_empty()), .filter(|s| !s.is_empty()),
gpu_checked: Arc::new(AtomicBool::new(false)), gpu_checked: Arc::new(AtomicBool::new(false)),
gpu_present: Arc::new(AtomicBool::new(false)), gpu_present: Arc::new(AtomicBool::new(false)),
cache_metrics: Arc::new(Mutex::new(CacheEntry::new())),
cache_disks: Arc::new(Mutex::new(CacheEntry::new())),
cache_processes: Arc::new(Mutex::new(CacheEntry::new())),
} }
} }
} }