Merge pull request #5 from jasonwitty/feature/housekeeping

Feature/housekeeping
This commit is contained in:
jasonwitty 2025-08-24 17:46:26 -07:00 committed by GitHub
commit 1043fffc8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 230 additions and 215 deletions

71
Cargo.lock generated
View File

@ -357,10 +357,8 @@ checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d"
dependencies = [ dependencies = [
"android-tzdata", "android-tzdata",
"iana-time-zone", "iana-time-zone",
"js-sys",
"num-traits", "num-traits",
"serde", "serde",
"wasm-bindgen",
"windows-link", "windows-link",
] ]
@ -1761,18 +1759,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [ dependencies = [
"libc", "libc",
"rand_chacha 0.3.1", "rand_chacha",
"rand_core 0.6.4", "rand_core",
]
[[package]]
name = "rand"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
] ]
[[package]] [[package]]
@ -1782,17 +1770,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [ dependencies = [
"ppv-lite86", "ppv-lite86",
"rand_core 0.6.4", "rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core 0.9.3",
] ]
[[package]] [[package]]
@ -1804,15 +1782,6 @@ dependencies = [
"getrandom 0.2.16", "getrandom 0.2.16",
] ]
[[package]]
name = "rand_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
"getrandom 0.3.3",
]
[[package]] [[package]]
name = "ratatui" name = "ratatui"
version = "0.28.1" version = "0.28.1"
@ -2193,16 +2162,13 @@ dependencies = [
[[package]] [[package]]
name = "socktop" name = "socktop"
version = "0.1.25" version = "0.1.3"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"assert_cmd", "assert_cmd",
"bytes",
"chrono",
"crossterm 0.27.0", "crossterm 0.27.0",
"dirs-next", "dirs-next",
"flate2", "flate2",
"futures",
"futures-util", "futures-util",
"prost", "prost",
"prost-build", "prost-build",
@ -2212,6 +2178,7 @@ dependencies = [
"rustls-pemfile", "rustls-pemfile",
"serde", "serde",
"serde_json", "serde_json",
"sysinfo",
"tempfile", "tempfile",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",
@ -2220,23 +2187,19 @@ dependencies = [
[[package]] [[package]]
name = "socktop_agent" name = "socktop_agent"
version = "0.1.25" version = "0.1.3"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"assert_cmd", "assert_cmd",
"axum", "axum",
"axum-server", "axum-server",
"bytes",
"flate2", "flate2",
"futures",
"futures-util", "futures-util",
"gfxinfo", "gfxinfo",
"hostname", "hostname",
"nvml-wrapper",
"once_cell", "once_cell",
"prost", "prost",
"prost-build", "prost-build",
"prost-types",
"protoc-bin-vendored", "protoc-bin-vendored",
"rcgen", "rcgen",
"rustls 0.23.31", "rustls 0.23.31",
@ -2250,7 +2213,6 @@ dependencies = [
"tonic-build", "tonic-build",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"tungstenite 0.27.0",
] ]
[[package]] [[package]]
@ -2513,7 +2475,7 @@ dependencies = [
"rustls-pki-types", "rustls-pki-types",
"tokio", "tokio",
"tokio-rustls 0.26.2", "tokio-rustls 0.26.2",
"tungstenite 0.24.0", "tungstenite",
] ]
[[package]] [[package]]
@ -2658,7 +2620,7 @@ dependencies = [
"http", "http",
"httparse", "httparse",
"log", "log",
"rand 0.8.5", "rand",
"rustls 0.23.31", "rustls 0.23.31",
"rustls-pki-types", "rustls-pki-types",
"sha1", "sha1",
@ -2666,23 +2628,6 @@ dependencies = [
"utf-8", "utf-8",
] ]
[[package]]
name = "tungstenite"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eadc29d668c91fcc564941132e17b28a7ceb2f3ebf0b9dae3e03fd7a6748eb0d"
dependencies = [
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"rand 0.9.2",
"sha1",
"thiserror 2.0.12",
"utf-8",
]
[[package]] [[package]]
name = "typenum" name = "typenum"
version = "1.18.0" version = "1.18.0"

View File

@ -8,37 +8,30 @@ members = [
[workspace.dependencies] [workspace.dependencies]
# async + streams # async + streams
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
anyhow = "1.0" anyhow = "1.0"
# websocket # websocket
tokio-tungstenite = { version = "0.24", features = ["__rustls-tls", "connect"] } tokio-tungstenite = { version = "0.24", features = ["__rustls-tls", "connect"] }
tungstenite = "0.24"
url = "2.5" url = "2.5"
# JSON + error handling # JSON + error handling
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
thiserror = "1.0"
# system stats # system stats (align across crates)
sysinfo = "0.32" sysinfo = "0.37"
# CLI UI # CLI UI
ratatui = "0.28" ratatui = "0.28"
crossterm = "0.27" crossterm = "0.27"
# date/time
chrono = { version = "0.4", features = ["serde"] }
# web server (remote-agent) # web server (remote-agent)
axum = { version = "0.7", features = ["ws"] } axum = { version = "0.7", features = ["ws"] }
# protobuf # protobuf
prost = "0.13" prost = "0.13"
prost-types = "0.13"
bytes = "1"
dirs-next = "2" dirs-next = "2"
[profile.release] [profile.release]

View File

@ -66,7 +66,7 @@ sudo apt-get install libdrm-dev libdrm-amdgpu1
Two components: Two components:
1) Agent (remote): small Rust WS server using sysinfo + /proc. It collects on demand when the client asks (fast metrics ~500 ms, processes ~2 s, disks ~5 s). No background loop when nobody is connected. 1) Agent (remote): small Rust WS server using sysinfo + /proc. It collects metrics only when the client requests them over the WebSocket (request-driven). No background sampling loop.
2) Client (local): TUI that connects to ws://HOST:PORT/ws (or wss://HOST:PORT/ws when TLS is enabled) and renders updates. 2) Client (local): TUI that connects to ws://HOST:PORT/ws (or wss://HOST:PORT/ws when TLS is enabled) and renders updates.

View File

@ -1,6 +1,6 @@
[package] [package]
name = "socktop" name = "socktop"
version = "0.1.25" version = "0.1.3"
authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"] authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"]
description = "Remote system monitor over WebSocket, TUI like top" description = "Remote system monitor over WebSocket, TUI like top"
edition = "2021" edition = "2021"
@ -9,21 +9,19 @@ license = "MIT"
[dependencies] [dependencies]
tokio = { workspace = true } tokio = { workspace = true }
tokio-tungstenite = { workspace = true } tokio-tungstenite = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true } futures-util = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
url = { workspace = true } url = { workspace = true }
ratatui = { workspace = true } ratatui = { workspace = true }
crossterm = { workspace = true } crossterm = { workspace = true }
chrono = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
flate2 = { version = "1", default-features = false, features = ["rust_backend"] } flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
dirs-next = { workspace = true } dirs-next = { workspace = true }
sysinfo = { workspace = true }
rustls = "0.23" rustls = "0.23"
rustls-pemfile = "2.1" rustls-pemfile = "2.1"
prost = { workspace = true } prost = { workspace = true }
bytes = { workspace = true }
[dev-dependencies] [dev-dependencies]
assert_cmd = "2.0" assert_cmd = "2.0"

View File

@ -217,7 +217,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
(u, t, entry.metrics_interval_ms, entry.processes_interval_ms) (u, t, entry.metrics_interval_ms, entry.processes_interval_ms)
} }
ResolveProfile::PromptSelect(mut names) => { ResolveProfile::PromptSelect(mut names) => {
if !names.iter().any(|n| n == "demo") { if !names.iter().any(|n: &String| n == "demo") {
names.push("demo".into()); names.push("demo".into());
} }
eprintln!("Select profile:"); eprintln!("Select profile:");
@ -281,10 +281,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
(url.trim().to_string(), ca_opt, mi, pi) (url.trim().to_string(), ca_opt, mi, pi)
} }
ResolveProfile::None => { ResolveProfile::None => {
eprintln!("No URL provided and no profiles to select."); //eprintln!("No URL provided and no profiles to select.");
//first run, no args, no profiles: show welcome message and offer demo mode
if profiles_mut.profiles.is_empty() && parsed.url.is_none() {
eprintln!("Welcome to socktop!");
eprintln!("It looks like this is your first time running the application.");
eprintln!("You can connect to a socktop_agent instance to monitor system metrics and processes.");
eprintln!("If you don't have an agent running, you can try the demo mode.");
if prompt_yes_no("Would you like to start the demo mode now? [Y/n]: ") {
return run_demo_mode(parsed.tls_ca.as_deref()).await;
} else {
eprintln!("Aborting. You can run 'socktop --help' for usage information.");
return Ok(()); return Ok(());
} }
}
return Err("No URL provided and no profiles to select.".into());
}
}; };
let is_tls = url.starts_with("wss://"); let is_tls = url.starts_with("wss://");
let has_token = url.contains("token="); let has_token = url.contains("token=");
let mut app = App::new() let mut app = App::new()

View File

@ -1,6 +1,6 @@
[package] [package]
name = "socktop_agent" name = "socktop_agent"
version = "0.1.25" version = "0.1.3"
authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"] authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"]
description = "Remote system monitor over WebSocket, TUI like top" description = "Remote system monitor over WebSocket, TUI like top"
edition = "2021" edition = "2021"
@ -13,13 +13,11 @@ sysinfo = { version = "0.37", features = ["network", "disk", "component"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
flate2 = { version = "1", default-features = false, features = ["rust_backend"] } flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
futures = "0.3"
futures-util = "0.3.31" futures-util = "0.3.31"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
nvml-wrapper = "0.10" # nvml-wrapper removed (unused; GPU metrics via gfxinfo only now)
gfxinfo = "0.1.2" gfxinfo = "0.1.2"
tungstenite = "0.27.0"
once_cell = "1.19" once_cell = "1.19"
axum-server = { version = "0.6", features = ["tls-rustls"] } axum-server = { version = "0.6", features = ["tls-rustls"] }
rustls = "0.23" rustls = "0.23"
@ -27,13 +25,11 @@ rustls-pemfile = "2.1"
rcgen = "0.13" # pure-Rust self-signed cert generation (replaces openssl vendored build) rcgen = "0.13" # pure-Rust self-signed cert generation (replaces openssl vendored build)
anyhow = "1" anyhow = "1"
hostname = "0.3" hostname = "0.3"
bytes = { workspace = true }
prost = { workspace = true } prost = { workspace = true }
time = { version = "0.3", default-features = false, features = ["formatting", "macros", "parsing" ] } time = { version = "0.3", default-features = false, features = ["formatting", "macros", "parsing" ] }
[build-dependencies] [build-dependencies]
prost-build = "0.13" prost-build = "0.13"
prost-types = { workspace = true }
tonic-build = { version = "0.12", default-features = false, optional = true } tonic-build = { version = "0.12", default-features = false, optional = true }
protoc-bin-vendored = "3" protoc-bin-vendored = "3"
[dev-dependencies] [dev-dependencies]

View File

@ -1,10 +1,9 @@
//! socktop agent entrypoint: sets up sysinfo handles, launches a sampler, //! socktop agent entrypoint: sets up sysinfo handles and serves a WebSocket endpoint at /ws.
//! and serves a WebSocket endpoint at /ws.
mod gpu; mod gpu;
mod metrics; mod metrics;
mod proto; mod proto;
mod sampler; // sampler module removed (metrics now purely request-driven)
mod state; mod state;
mod types; mod types;
mod ws; mod ws;
@ -15,7 +14,6 @@ use std::str::FromStr;
mod tls; mod tls;
use crate::sampler::{spawn_disks_sampler, spawn_process_sampler, spawn_sampler};
use state::AppState; use state::AppState;
fn arg_flag(name: &str) -> bool { fn arg_flag(name: &str) -> bool {
@ -45,13 +43,7 @@ async fn main() -> anyhow::Result<()> {
let state = AppState::new(); let state = AppState::new();
// Start background sampler (adjust cadence as needed) // No background samplers: metrics collected on-demand per websocket request.
// 500ms fast metrics
let _h_fast = spawn_sampler(state.clone(), std::time::Duration::from_millis(500));
// 2s processes (top 50)
let _h_procs = spawn_process_sampler(state.clone(), std::time::Duration::from_secs(2), 50);
// 5s disks
let _h_disks = spawn_disks_sampler(state.clone(), std::time::Duration::from_secs(5));
// Web app: route /ws to the websocket handler // Web app: route /ws to the websocket handler
async fn healthz() -> StatusCode { async fn healthz() -> StatusCode {
@ -98,45 +90,4 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
#[cfg(test)] // Unit tests for CLI parsing moved to `tests/port_parse.rs`.
mod tests_cli_agent {
// Local helper for testing port parsing
fn parse_port<I: IntoIterator<Item = String>>(args: I, default_port: u16) -> u16 {
let mut it = args.into_iter();
let _ = it.next(); // prog
let mut long: Option<String> = None;
let mut short: Option<String> = None;
while let Some(a) = it.next() {
match a.as_str() {
"--port" => long = it.next(),
"-p" => short = it.next(),
_ if a.starts_with("--port=") => {
if let Some((_, v)) = a.split_once('=') {
long = Some(v.to_string());
}
}
_ => {}
}
}
long.or(short)
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(default_port)
}
#[test]
fn port_long_short_and_assign() {
assert_eq!(
parse_port(vec!["agent".into(), "--port".into(), "9001".into()], 8443),
9001
);
assert_eq!(
parse_port(vec!["agent".into(), "-p".into(), "9002".into()], 8443),
9002
);
assert_eq!(
parse_port(vec!["agent".into(), "--port=9003".into()], 8443),
9003
);
assert_eq!(parse_port(vec!["agent".into()], 8443), 8443);
}
}

View File

@ -11,8 +11,9 @@ use std::fs;
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
use std::io; use std::io;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration as StdDuration;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, System}; use sysinfo::{ProcessRefreshKind, ProcessesToUpdate};
use tracing::warn; use tracing::warn;
// Runtime toggles (read once) // Runtime toggles (read once)
@ -97,6 +98,20 @@ fn set_gpus(v: Option<Vec<crate::gpu::GpuMetrics>>) {
// Collect only fast-changing metrics (CPU/mem/net + optional temps/gpus). // Collect only fast-changing metrics (CPU/mem/net + optional temps/gpus).
pub async fn collect_fast_metrics(state: &AppState) -> Metrics { pub async fn collect_fast_metrics(state: &AppState) -> Metrics {
// TTL (ms) overridable via env, default 250ms
let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_METRICS_TTL_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(250);
let ttl = StdDuration::from_millis(ttl_ms);
{
let cache = state.cache_metrics.lock().await;
if cache.is_fresh(ttl) {
if let Some(c) = cache.take_clone() {
return c;
}
}
}
let mut sys = state.sys.lock().await; let mut sys = state.sys.lock().await;
if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
sys.refresh_cpu_usage(); sys.refresh_cpu_usage();
@ -105,7 +120,7 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics {
warn!("sysinfo selective refresh panicked: {e:?}"); warn!("sysinfo selective refresh panicked: {e:?}");
} }
let hostname = System::host_name().unwrap_or_else(|| "unknown".to_string()); let hostname = state.hostname.clone();
let cpu_total = sys.global_cpu_usage(); let cpu_total = sys.global_cpu_usage();
let cpu_per_core: Vec<f32> = sys.cpus().iter().map(|c| c.cpu_usage()).collect(); let cpu_per_core: Vec<f32> = sys.cpus().iter().map(|c| c.cpu_usage()).collect();
let mem_total = sys.total_memory(); let mem_total = sys.total_memory();
@ -192,7 +207,7 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics {
None None
}; };
Metrics { let metrics = Metrics {
cpu_total, cpu_total,
cpu_per_core, cpu_per_core,
mem_total, mem_total,
@ -205,21 +220,44 @@ pub async fn collect_fast_metrics(state: &AppState) -> Metrics {
networks, networks,
top_processes: Vec::new(), top_processes: Vec::new(),
gpus, gpus,
};
{
let mut cache = state.cache_metrics.lock().await;
cache.set(metrics.clone());
} }
metrics
} }
// Cached disks // Cached disks
pub async fn collect_disks(state: &AppState) -> Vec<DiskInfo> { pub async fn collect_disks(state: &AppState) -> Vec<DiskInfo> {
let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_DISKS_TTL_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(1_000);
let ttl = StdDuration::from_millis(ttl_ms);
{
let cache = state.cache_disks.lock().await;
if cache.is_fresh(ttl) {
if let Some(v) = cache.take_clone() {
return v;
}
}
}
let mut disks_list = state.disks.lock().await; let mut disks_list = state.disks.lock().await;
disks_list.refresh(false); // don't drop missing disks disks_list.refresh(false); // don't drop missing disks
disks_list let disks: Vec<DiskInfo> = disks_list
.iter() .iter()
.map(|d| DiskInfo { .map(|d| DiskInfo {
name: d.name().to_string_lossy().into_owned(), name: d.name().to_string_lossy().into_owned(),
total: d.total_space(), total: d.total_space(),
available: d.available_space(), available: d.available_space(),
}) })
.collect() .collect();
{
let mut cache = state.cache_disks.lock().await;
cache.set(disks.clone());
}
disks
} }
// Linux-only helpers and implementation using /proc deltas for accurate CPU%. // Linux-only helpers and implementation using /proc deltas for accurate CPU%.
@ -260,8 +298,22 @@ fn read_proc_jiffies(pid: u32) -> Option<u64> {
/// Collect all processes (Linux): compute CPU% via /proc jiffies delta; sorting moved to client. /// Collect all processes (Linux): compute CPU% via /proc jiffies delta; sorting moved to client.
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
// Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows). let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_PROCESSES_TTL_MS")
let mut sys = System::new(); .ok()
.and_then(|v| v.parse().ok())
.unwrap_or(1_000);
let ttl = StdDuration::from_millis(ttl_ms);
{
let cache = state.cache_processes.lock().await;
if cache.is_fresh(ttl) {
if let Some(v) = cache.take_clone() {
return v;
}
}
}
// Reuse shared System to avoid reallocation; refresh processes fully.
let mut sys_guard = state.sys.lock().await;
let sys = &mut *sys_guard;
sys.refresh_processes_specifics( sys.refresh_processes_specifics(
ProcessesToUpdate::All, ProcessesToUpdate::All,
false, false,
@ -336,35 +388,52 @@ pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
}) })
.collect(); .collect();
ProcessesPayload { let payload = ProcessesPayload {
process_count: total_count, process_count: total_count,
top_processes: procs, top_processes: procs,
};
{
let mut cache = state.cache_processes.lock().await;
cache.set(payload.clone());
} }
payload
} }
/// Collect all processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh. /// Collect all processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh.
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
use tokio::time::sleep; use tokio::time::sleep;
let ttl_ms: u64 = std::env::var("SOCKTOP_AGENT_PROCESSES_TTL_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(1_000);
let ttl = StdDuration::from_millis(ttl_ms);
{
let cache = state.cache_processes.lock().await;
if cache.is_fresh(ttl) {
if let Some(v) = cache.take_clone() {
return v;
}
}
}
{
let mut sys = state.sys.lock().await; let mut sys = state.sys.lock().await;
// First refresh to set baseline
sys.refresh_processes_specifics( sys.refresh_processes_specifics(
ProcessesToUpdate::All, ProcessesToUpdate::All,
false, false,
ProcessRefreshKind::everything().without_tasks(), ProcessRefreshKind::everything().without_tasks(),
); );
// Small delay so sysinfo can compute CPU deltas on next refresh }
// Release lock during sleep interval
sleep(Duration::from_millis(250)).await; sleep(Duration::from_millis(250)).await;
{
let mut sys = state.sys.lock().await;
sys.refresh_processes_specifics( sys.refresh_processes_specifics(
ProcessesToUpdate::All, ProcessesToUpdate::All,
false, false,
ProcessRefreshKind::everything().without_tasks(), ProcessRefreshKind::everything().without_tasks(),
); );
let total_count = sys.processes().len(); let total_count = sys.processes().len();
let procs: Vec<ProcessInfo> = sys let procs: Vec<ProcessInfo> = sys
.processes() .processes()
.values() .values()
@ -375,11 +444,14 @@ pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
mem_bytes: p.memory(), mem_bytes: p.memory(),
}) })
.collect(); .collect();
ProcessesPayload { let payload = ProcessesPayload {
process_count: total_count, process_count: total_count,
top_processes: procs, top_processes: procs,
};
{
let mut cache = state.cache_processes.lock().await;
cache.set(payload.clone());
}
payload
} }
} }
// Small helper to select and sort top-k by cpu
// Client now handles sorting/pagination.

View File

@ -1,34 +0,0 @@
//! Background sampler: periodically collects metrics and updates precompressed caches,
//! so WS replies just read and send cached bytes.
use crate::state::AppState;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
// 500ms: fast path (cpu/mem/net/temp/gpu)
pub fn spawn_sampler(_state: AppState, _period: Duration) -> JoinHandle<()> {
tokio::spawn(async move {
// no-op background sampler (request-driven collection elsewhere)
loop {
sleep(Duration::from_secs(3600)).await;
}
})
}
// 2s: processes top-k
pub fn spawn_process_sampler(_state: AppState, _period: Duration, _top_k: usize) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(3600)).await;
}
})
}
// 5s: disks
pub fn spawn_disks_sampler(_state: AppState, _period: Duration) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(3600)).await;
}
})
}

View File

@ -4,6 +4,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant};
use sysinfo::{Components, Disks, Networks, System}; use sysinfo::{Components, Disks, Networks, System};
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -25,6 +26,7 @@ pub struct AppState {
pub components: SharedComponents, pub components: SharedComponents,
pub disks: SharedDisks, pub disks: SharedDisks,
pub networks: SharedNetworks, pub networks: SharedNetworks,
pub hostname: String,
// For correct per-process CPU% using /proc deltas (Linux only path uses this tracker) // For correct per-process CPU% using /proc deltas (Linux only path uses this tracker)
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
@ -37,6 +39,39 @@ pub struct AppState {
// GPU negative cache (probe once). gpu_checked=true after first attempt; gpu_present reflects result. // GPU negative cache (probe once). gpu_checked=true after first attempt; gpu_present reflects result.
pub gpu_checked: Arc<AtomicBool>, pub gpu_checked: Arc<AtomicBool>,
pub gpu_present: Arc<AtomicBool>, pub gpu_present: Arc<AtomicBool>,
// Lightweight on-demand caches (TTL based) to cap CPU under bursty polling.
pub cache_metrics: Arc<Mutex<CacheEntry<crate::types::Metrics>>>,
pub cache_disks: Arc<Mutex<CacheEntry<Vec<crate::types::DiskInfo>>>>,
pub cache_processes: Arc<Mutex<CacheEntry<crate::types::ProcessesPayload>>>,
}
#[derive(Clone, Debug)]
pub struct CacheEntry<T> {
pub at: Option<Instant>,
pub value: Option<T>,
}
impl<T> CacheEntry<T> {
pub fn new() -> Self {
Self {
at: None,
value: None,
}
}
pub fn is_fresh(&self, ttl: Duration) -> bool {
self.at.is_some_and(|t| t.elapsed() < ttl) && self.value.is_some()
}
pub fn set(&mut self, v: T) {
self.value = Some(v);
self.at = Some(Instant::now());
}
pub fn take_clone(&self) -> Option<T>
where
T: Clone,
{
self.value.clone()
}
} }
impl AppState { impl AppState {
@ -51,6 +86,7 @@ impl AppState {
components: Arc::new(Mutex::new(components)), components: Arc::new(Mutex::new(components)),
disks: Arc::new(Mutex::new(disks)), disks: Arc::new(Mutex::new(disks)),
networks: Arc::new(Mutex::new(networks)), networks: Arc::new(Mutex::new(networks)),
hostname: System::host_name().unwrap_or_else(|| "unknown".into()),
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
proc_cpu: Arc::new(Mutex::new(ProcCpuTracker::default())), proc_cpu: Arc::new(Mutex::new(ProcCpuTracker::default())),
client_count: Arc::new(AtomicUsize::new(0)), client_count: Arc::new(AtomicUsize::new(0)),
@ -59,6 +95,9 @@ impl AppState {
.filter(|s| !s.is_empty()), .filter(|s| !s.is_empty()),
gpu_checked: Arc::new(AtomicBool::new(false)), gpu_checked: Arc::new(AtomicBool::new(false)),
gpu_present: Arc::new(AtomicBool::new(false)), gpu_present: Arc::new(AtomicBool::new(false)),
cache_metrics: Arc::new(Mutex::new(CacheEntry::new())),
cache_disks: Arc::new(Mutex::new(CacheEntry::new())),
cache_processes: Arc::new(Mutex::new(CacheEntry::new())),
} }
} }
} }

View File

@ -0,0 +1,40 @@
//! Unit test for port parsing logic moved out of `main.rs`.
fn parse_port<I: IntoIterator<Item = String>>(args: I, default_port: u16) -> u16 {
let mut it = args.into_iter();
let _ = it.next(); // program name
let mut long: Option<String> = None;
let mut short: Option<String> = None;
while let Some(a) = it.next() {
match a.as_str() {
"--port" => long = it.next(),
"-p" => short = it.next(),
_ if a.starts_with("--port=") => {
if let Some((_, v)) = a.split_once('=') {
long = Some(v.to_string());
}
}
_ => {}
}
}
long.or(short)
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(default_port)
}
#[test]
fn port_long_short_and_assign() {
assert_eq!(
parse_port(vec!["agent".into(), "--port".into(), "9001".into()], 8443),
9001
);
assert_eq!(
parse_port(vec!["agent".into(), "-p".into(), "9002".into()], 8443),
9002
);
assert_eq!(
parse_port(vec!["agent".into(), "--port=9003".into()], 8443),
9003
);
assert_eq!(parse_port(vec!["agent".into()], 8443), 8443);
}