From 100434fc3c05ad7466113ebee851971bcfb3cd08 Mon Sep 17 00:00:00 2001 From: jasonwitty Date: Fri, 8 Aug 2025 12:41:32 -0700 Subject: [PATCH] Major refactor, additional comments, performance improvements, idle performance improvements, access token, port specification Release highlights Introduced split client/agent architecture with a ratatui-based TUI and a lightweight WebSocket agent. Added adaptive (idle-aware) sampler: agent samples fast only when clients are connected; sleeps when idle. Implemented metrics JSON caching for instant ws replies; cold-start does one-off collection. Port configuration: --port/-p, positional PORT, or SOCKTOP_PORT env (default 3000). Optional token auth: SOCKTOP_TOKEN on agent, ws://HOST:PORT/ws?token=VALUE in client. Logging via tracing with RUST_LOG control. CI workflow (fmt, clippy, build) for Linux and Windows. Systemd unit example for always-on agent. TUI features CPU: overall sparkline + per-core history with trend arrows and color thresholds. Memory/Swap gauges with humanized labels. Disks panel with per-device usage and icons. Network download/upload sparklines (KB/s) with peak tracking. Top processes table (PID, name, CPU%, mem, mem%). Header with hostname and CPU temperature indicator. Agent changes sysinfo 0.36.1 targeted refresh: refresh_cpu_all, refresh_memory, refresh_processes_specifics(ProcessesToUpdate::All, ProcessRefreshKind::new().with_cpu().with_memory(), true). WebSocket handler: client counting with wake notifications, cold-start handling, proper Response returns. Sampler uses MissedTickBehavior::Skip to avoid catch-up bursts. Docs README updates: running instructions, port configuration, optional token auth, platform notes, example JSON. Added socktop-agent.service systemd unit. Platform notes Linux (AMD/Intel) supported; tested on AMD, targeting Intel next. Raspberry Pi supported (availability of temps varies by model). Windows builds/run; CPU temperature may be unavailable (shows N/A). Known/next Roadmap includes configurable refresh interval, TUI filtering/sorting, TLS/WSS, and export to file. Add Context... README.md --- .github/workflows/ci.yml | 19 + Cargo.lock | 152 +++++ README.md | 85 ++- docs/socktop-agent.service | 18 + .../socktop-screenshot.png | Bin socktop/src/app.rs | 198 +++++++ socktop/src/history.rs | 39 ++ socktop/src/main.rs | 540 +----------------- socktop/src/types.rs | 41 ++ socktop/src/ui/cpu.rs | 91 +++ socktop/src/ui/disks.rs | 73 +++ socktop/src/ui/header.rs | 20 + socktop/src/ui/mem.rs | 23 + socktop/src/ui/mod.rs | 10 + socktop/src/ui/net.rs | 26 + socktop/src/ui/processes.rs | 71 +++ socktop/src/ui/swap.rs | 23 + socktop/src/ui/util.rs | 33 ++ socktop/src/ws.rs | 28 + socktop_agent/Cargo.toml | 4 +- socktop_agent/src/main.rs | 287 ++++------ socktop_agent/src/metrics.rs | 128 +++++ socktop_agent/src/sampler.rs | 36 ++ socktop_agent/src/state.rs | 30 + socktop_agent/src/types.rs | 43 ++ socktop_agent/src/ws.rs | 66 +++ 26 files changed, 1371 insertions(+), 713 deletions(-) create mode 100644 .github/workflows/ci.yml create mode 100644 docs/socktop-agent.service rename socktop-screenshot.png => docs/socktop-screenshot.png (100%) create mode 100644 socktop/src/app.rs create mode 100644 socktop/src/history.rs create mode 100644 socktop/src/types.rs create mode 100644 socktop/src/ui/cpu.rs create mode 100644 socktop/src/ui/disks.rs create mode 100644 socktop/src/ui/header.rs create mode 100644 socktop/src/ui/mem.rs create mode 100644 socktop/src/ui/mod.rs create mode 100644 socktop/src/ui/net.rs create mode 100644 socktop/src/ui/processes.rs create mode 100644 socktop/src/ui/swap.rs create mode 100644 socktop/src/ui/util.rs create mode 100644 socktop/src/ws.rs create mode 100644 socktop_agent/src/metrics.rs create mode 100644 socktop_agent/src/sampler.rs create mode 100644 socktop_agent/src/state.rs create mode 100644 socktop_agent/src/types.rs create mode 100644 socktop_agent/src/ws.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..f5d0e21 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,19 @@ +name: CI +on: + push: + pull_request: +jobs: + build: + strategy: + matrix: + os: [ubuntu-latest, windows-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - name: Cargo fmt + run: cargo fmt --all -- --check + - name: Clippy + run: cargo clippy --all-targets --all-features -D warnings + - name: Build + run: cargo build --release --workspace \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index a7205ab..98aaf20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,15 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -821,6 +830,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.174" @@ -864,6 +879,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -924,6 +948,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -967,6 +1001,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.4" @@ -1110,6 +1150,50 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + [[package]] name = "rustc-demangle" version = "0.1.26" @@ -1212,6 +1296,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1300,6 +1393,8 @@ dependencies = [ "serde_json", "sysinfo", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -1404,6 +1499,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tinystr" version = "0.8.1" @@ -1493,9 +1597,21 @@ checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.34" @@ -1503,6 +1619,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -1581,6 +1727,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "version_check" version = "0.9.5" diff --git a/README.md b/README.md index e1e360d..929f1d5 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ It lets you watch CPU, memory, disks, network, temperatures, and processes on another machine in real-time — from the comfort of your terminal. -![socktop screenshot](docs/socktop-screenshot.png) +![socktop screenshot](./docs/socktop-screenshot.png) --- @@ -37,6 +37,46 @@ The two communicate over a persistent WebSocket connection. --- +## Adaptive (idle-aware) sampling + +The socktop agent now samples system metrics only when at least one WebSocket client is connected. When idle (no clients), the sampler sleeps and CPU usage drops to ~0%. + +How it works +- The WebSocket handler increments/decrements a client counter in `AppState` on connect/disconnect. +- A background sampler wakes when the counter transitions from 0 → >0 and sleeps when it returns to 0. +- The most recent metrics snapshot is cached as JSON for fast responses. + +Cold start behavior +- If a client requests metrics while the cache is empty (e.g., just started or after a long idle), the agent performs a one-off synchronous collection to respond immediately. + +Tuning +- Sampling interval (active): update `spawn_sampler(state, Duration::from_millis(500))` in `socktop_agent/src/main.rs`. +- Always-on or low-frequency idle sampling: replace the “sleep when idle” logic in `socktop_agent/src/sampler.rs` with a low-frequency interval. Example sketch: + +```rust +// In sampler.rs (sketch): sample every 10s when idle, 500ms when active +let idle_period = Duration::from_secs(10); +loop { + let active = state.client_count.load(Ordering::Relaxed) > 0; + let period = if active { Duration::from_millis(500) } else { idle_period }; + let mut ticker = tokio::time::interval(period); + ticker.tick().await; + if !active { + // wake early if a client connects + tokio::select! { + _ = ticker.tick() => {}, + _ = state.wake_sampler.notified() => continue, + } + } + let m = collect_metrics(&state).await; + if let Ok(js) = serde_json::to_string(&m) { + *state.last_json.write().await = js; + } +} +``` + +--- + ## Installation ### Prerequisites @@ -99,6 +139,26 @@ When connected, `socktop` displays: --- +## Configuring the agent port + +The agent listens on TCP port 3000 by default. You can override this via a CLI flag, a positional port argument, or an environment variable: + +- CLI flag: + - socktop_agent --port 8080 + - socktop_agent -p 8080 +- Positional: + - socktop_agent 8080 +- Environment variable: + - SOCKTOP_PORT=8080 socktop_agent + +Help: +- socktop_agent --help + +The TUI should point to ws://HOST:PORT/ws, e.g.: +- cargo run -p socktop -- ws://127.0.0.1:8080/ws + +--- + ## Keyboard Shortcuts | Key | Action | @@ -107,6 +167,29 @@ When connected, `socktop` displays: --- +## Security (optional token) +By default, the agent exposes metrics over an unauthenticated WebSocket. For untrusted networks, set an auth token and pass it in the client URL: + +- Server: + - SOCKTOP_TOKEN=changeme socktop_agent --port 3000 +- Client: + - socktop ws://HOST:3000/ws?token=changeme + +--- + +## Platform notes +- Linux x86_64/AMD/Intel: fully supported. +- Raspberry Pi: + - 64-bit: rustup target add aarch64-unknown-linux-gnu; build on-device for simplicity. + - 32-bit: rustup target add armv7-unknown-linux-gnueabihf. +- Windows: + - TUI and agent build/run with stable Rust. Use PowerShell: + - cargo run -p socktop_agent -- --port 3000 + - cargo run -p socktop -- ws://127.0.0.1:3000/ws + - CPU temperature may be unavailable; display will show N/A. + +--- + ## Example agent JSON `socktop` expects the agent to send metrics in this shape: ```json diff --git a/docs/socktop-agent.service b/docs/socktop-agent.service new file mode 100644 index 0000000..6a39e67 --- /dev/null +++ b/docs/socktop-agent.service @@ -0,0 +1,18 @@ +[Unit] +Description=Socktop agent +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +ExecStart=/usr/local/bin/socktop_agent --port 3000 +Environment=RUST_LOG=info +# Optional auth: +# Environment=SOCKTOP_TOKEN=changeme +Restart=on-failure +User=socktop +Group=socktop +NoNewPrivileges=true + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/socktop-screenshot.png b/docs/socktop-screenshot.png similarity index 100% rename from socktop-screenshot.png rename to docs/socktop-screenshot.png diff --git a/socktop/src/app.rs b/socktop/src/app.rs new file mode 100644 index 0000000..7d831ec --- /dev/null +++ b/socktop/src/app.rs @@ -0,0 +1,198 @@ +//! App state and main loop: input handling, fetching metrics, updating history, and drawing. + +use std::{collections::VecDeque, io, time::{Duration, Instant}}; + +use crossterm::{ + event::{self, Event, KeyCode}, + execute, + terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, +}; +use ratatui::{ + backend::CrosstermBackend, + layout::{Constraint, Direction}, + Terminal, +}; +use tokio::time::sleep; + +use crate::history::{push_capped, PerCoreHistory}; +use crate::types::Metrics; +use crate::ui::{header::draw_header, cpu::{draw_cpu_avg_graph, draw_per_core_bars}, mem::draw_mem, swap::draw_swap, disks::draw_disks, net::draw_net_spark, processes::draw_top_processes}; +use crate::ws::{connect, request_metrics}; + +pub struct App { + // Latest metrics + histories + last_metrics: Option, + + // CPU avg history (0..100) + cpu_hist: VecDeque, + + // Per-core history (0..100) + per_core_hist: PerCoreHistory, + + // Network totals snapshot + histories of KB/s + last_net_totals: Option<(u64, u64, Instant)>, + rx_hist: VecDeque, + tx_hist: VecDeque, + rx_peak: u64, + tx_peak: u64, + + // Quit flag + should_quit: bool, +} + +impl App { + pub fn new() -> Self { + Self { + last_metrics: None, + cpu_hist: VecDeque::with_capacity(600), + per_core_hist: PerCoreHistory::new(60), + last_net_totals: None, + rx_hist: VecDeque::with_capacity(600), + tx_hist: VecDeque::with_capacity(600), + rx_peak: 0, + tx_peak: 0, + should_quit: false, + } + } + + pub async fn run(&mut self, url: &str) -> Result<(), Box> { + // Connect to agent + let mut ws = connect(url).await?; + + // Terminal setup + enable_raw_mode()?; + let mut stdout = io::stdout(); + execute!(stdout, EnterAlternateScreen)?; + let backend = CrosstermBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + terminal.clear()?; + + // Main loop + let res = self.event_loop(&mut terminal, &mut ws).await; + + // Teardown + disable_raw_mode()?; + let backend = terminal.backend_mut(); + execute!(backend, LeaveAlternateScreen)?; + terminal.show_cursor()?; + + res + } + + async fn event_loop( + &mut self, + terminal: &mut Terminal, + ws: &mut crate::ws::WsStream, + ) -> Result<(), Box> { + loop { + // Input (non-blocking) + while event::poll(Duration::from_millis(10))? { + if let Event::Key(k) = event::read()? { + if matches!(k.code, KeyCode::Char('q') | KeyCode::Char('Q') | KeyCode::Esc) { + self.should_quit = true; + } + } + } + if self.should_quit { + break; + } + + // Fetch and update + if let Some(m) = request_metrics(ws).await { + self.update_with_metrics(m); + } + + // Draw + terminal.draw(|f| self.draw(f))?; + + // Tick rate + sleep(Duration::from_millis(500)).await; + } + + Ok(()) + } + + fn update_with_metrics(&mut self, m: Metrics) { + // CPU avg history + let v = m.cpu_total.clamp(0.0, 100.0).round() as u64; + push_capped(&mut self.cpu_hist, v, 600); + + // Per-core history (push current samples) + self.per_core_hist.ensure_cores(m.cpu_per_core.len()); + self.per_core_hist.push_samples(&m.cpu_per_core); + + // NET: sum across all ifaces, compute KB/s via elapsed time + let now = Instant::now(); + let rx_total = m.networks.iter().map(|n| n.received).sum::(); + let tx_total = m.networks.iter().map(|n| n.transmitted).sum::(); + let (rx_kb, tx_kb) = if let Some((prx, ptx, pts)) = self.last_net_totals { + let dt = now.duration_since(pts).as_secs_f64().max(1e-6); + let rx = ((rx_total.saturating_sub(prx)) as f64 / dt / 1024.0).round() as u64; + let tx = ((tx_total.saturating_sub(ptx)) as f64 / dt / 1024.0).round() as u64; + (rx, tx) + } else { (0, 0) }; + self.last_net_totals = Some((rx_total, tx_total, now)); + push_capped(&mut self.rx_hist, rx_kb, 600); + push_capped(&mut self.tx_hist, tx_kb, 600); + self.rx_peak = self.rx_peak.max(rx_kb); + self.tx_peak = self.tx_peak.max(tx_kb); + + self.last_metrics = Some(m); + } + + fn draw(&mut self, f: &mut ratatui::Frame<'_>) { + let area = f.area(); + + let rows = ratatui::layout::Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Length(1), + Constraint::Ratio(1, 3), + Constraint::Length(3), + Constraint::Length(3), + Constraint::Min(10), + ]) + .split(area); + + draw_header(f, rows[0], self.last_metrics.as_ref()); + + let top = ratatui::layout::Layout::default() + .direction(Direction::Horizontal) + .constraints([Constraint::Percentage(66), Constraint::Percentage(34)]) + .split(rows[1]); + + draw_cpu_avg_graph(f, top[0], &self.cpu_hist, self.last_metrics.as_ref()); + draw_per_core_bars(f, top[1], self.last_metrics.as_ref(), &self.per_core_hist); + + draw_mem(f, rows[2], self.last_metrics.as_ref()); + draw_swap(f, rows[3], self.last_metrics.as_ref()); + + let bottom = ratatui::layout::Layout::default() + .direction(Direction::Horizontal) + .constraints([Constraint::Percentage(66), Constraint::Percentage(34)]) + .split(rows[4]); + + let left_stack = ratatui::layout::Layout::default() + .direction(Direction::Vertical) + .constraints([Constraint::Min(6), Constraint::Length(4), Constraint::Length(4)]) + .split(bottom[0]); + + draw_disks(f, left_stack[0], self.last_metrics.as_ref()); + draw_net_spark( + f, + left_stack[1], + &format!("Download (KB/s) — now: {} | peak: {}", self.rx_hist.back().copied().unwrap_or(0), self.rx_peak), + &self.rx_hist, + ratatui::style::Color::Green, + ); + draw_net_spark( + f, + left_stack[2], + &format!("Upload (KB/s) — now: {} | peak: {}", self.tx_hist.back().copied().unwrap_or(0), self.tx_peak), + &self.tx_hist, + ratatui::style::Color::Blue, + ); + + draw_top_processes(f, bottom[1], self.last_metrics.as_ref()); + } +} \ No newline at end of file diff --git a/socktop/src/history.rs b/socktop/src/history.rs new file mode 100644 index 0000000..27d3232 --- /dev/null +++ b/socktop/src/history.rs @@ -0,0 +1,39 @@ +//! Small utilities to manage bounded history buffers for charts. + +use std::collections::VecDeque; + +pub fn push_capped(dq: &mut VecDeque, v: T, cap: usize) { + if dq.len() == cap { + dq.pop_front(); + } + dq.push_back(v); +} + +// Keeps a history deque per core with a fixed capacity +pub struct PerCoreHistory { + pub deques: Vec>, + cap: usize, +} + +impl PerCoreHistory { + pub fn new(cap: usize) -> Self { + Self { deques: Vec::new(), cap } + } + + // Ensure we have one deque per core; resize on CPU topology changes + pub fn ensure_cores(&mut self, n: usize) { + if self.deques.len() == n { + return; + } + self.deques = (0..n).map(|_| VecDeque::with_capacity(self.cap)).collect(); + } + + // Push a new sample set for all cores (values 0..=100) + pub fn push_samples(&mut self, samples: &[f32]) { + self.ensure_cores(samples.len()); + for (i, v) in samples.iter().enumerate() { + let val = v.clamp(0.0, 100.0).round() as u16; + push_capped(&mut self.deques[i], val, self.cap); + } + } +} \ No newline at end of file diff --git a/socktop/src/main.rs b/socktop/src/main.rs index e77d479..309e9be 100644 --- a/socktop/src/main.rs +++ b/socktop/src/main.rs @@ -1,537 +1,23 @@ -use std::{collections::VecDeque, env, error::Error, io, time::{Duration, Instant}}; +//! Entry point for the socktop TUI. Parses args and runs the App. -use crossterm::{ - event::{self, Event, KeyCode}, - execute, - terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, -}; -use futures_util::{SinkExt, StreamExt}; -use ratatui::{ - backend::CrosstermBackend, - layout::{Constraint, Direction, Layout, Rect}, - style::{Color, Style}, - widgets::{Block, Borders, Gauge, Row, Sparkline, Table, Cell}, - Terminal, - text::{Line, Span}, -}; +mod app; +mod history; +mod types; +mod ui; +mod ws; -use ratatui::style::{Modifier}; - -use serde::Deserialize; -use tokio::time::sleep; -use tokio_tungstenite::{connect_async, tungstenite::Message}; - - -#[derive(Debug, Deserialize, Clone)] -struct Disk { name: String, total: u64, available: u64 } -#[derive(Debug, Deserialize, Clone)] -struct Network { received: u64, transmitted: u64 } -#[derive(Debug, Deserialize, Clone)] -struct ProcessInfo { - pid: i32, - name: String, - cpu_usage: f32, - mem_bytes: u64, -} - -#[derive(Debug, Deserialize, Clone)] -struct Metrics { - cpu_total: f32, - cpu_per_core: Vec, - mem_total: u64, - mem_used: u64, - swap_total: u64, - swap_used: u64, - process_count: usize, - hostname: String, - cpu_temp_c: Option, - disks: Vec, - networks: Vec, - top_processes: Vec, -} +use std::env; +use app::App; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> { let args: Vec = env::args().collect(); if args.len() < 2 { eprintln!("Usage: {} ws://HOST:PORT/ws", args[0]); std::process::exit(1); } - let url = &args[1]; - let (mut ws, _) = connect_async(url).await?; - - // Terminal - enable_raw_mode()?; - let mut stdout = io::stdout(); - execute!(stdout, EnterAlternateScreen)?; - let backend = CrosstermBackend::new(stdout); - let mut terminal = Terminal::new(backend)?; - terminal.clear()?; - - // State - let mut last_metrics: Option = None; - let mut cpu_hist: VecDeque = VecDeque::with_capacity(600); - - let mut per_core_hist: Vec> = Vec::new(); // one deque per core - const CORE_HISTORY: usize = 60; // ~30s if you tick every 500ms - - // Network: keep totals across ALL ifaces + timestamp - let mut last_net_totals: Option<(u64, u64, Instant)> = None; - let mut rx_hist: VecDeque = VecDeque::with_capacity(600); - let mut tx_hist: VecDeque = VecDeque::with_capacity(600); - let mut rx_peak: u64 = 0; - let mut tx_peak: u64 = 0; - - let mut should_quit = false; - - loop { - while event::poll(Duration::from_millis(10))? { - if let Event::Key(k) = event::read()? { - if matches!(k.code, KeyCode::Char('q') | KeyCode::Char('Q') | KeyCode::Esc) { - should_quit = true; - } - } - } - if should_quit { break; } - - ws.send(Message::Text("get_metrics".into())).await.ok(); - - if let Some(Ok(Message::Text(json))) = ws.next().await { - if let Ok(m) = serde_json::from_str::(&json) { - // CPU history - let v = m.cpu_total.clamp(0.0, 100.0).round() as u64; - push_capped(&mut cpu_hist, v, 600); - - // NET: sum across all ifaces, compute KB/s via elapsed time - let now = Instant::now(); - let rx_total = m.networks.iter().map(|n| n.received).sum::(); - let tx_total = m.networks.iter().map(|n| n.transmitted).sum::(); - let (rx_kb, tx_kb) = if let Some((prx, ptx, pts)) = last_net_totals { - let dt = now.duration_since(pts).as_secs_f64().max(1e-6); - let rx = ((rx_total.saturating_sub(prx)) as f64 / dt / 1024.0).round() as u64; - let tx = ((tx_total.saturating_sub(ptx)) as f64 / dt / 1024.0).round() as u64; - (rx, tx) - } else { (0, 0) }; - last_net_totals = Some((rx_total, tx_total, now)); - push_capped(&mut rx_hist, rx_kb, 600); - push_capped(&mut tx_hist, tx_kb, 600); - rx_peak = rx_peak.max(rx_kb); - tx_peak = tx_peak.max(tx_kb); - - if let Some(m) = last_metrics.as_ref() { - // resize history buffers if core count changes - if per_core_hist.len() != m.cpu_per_core.len() { - per_core_hist = (0..m.cpu_per_core.len()) - .map(|_| VecDeque::with_capacity(CORE_HISTORY)) - .collect(); - } - } - // push latest per-core samples - if let Some(m) = last_metrics.as_ref() { - for (i, v) in m.cpu_per_core.iter().enumerate() { - let v = v.clamp(0.0, 100.0).round() as u16; - push_capped(&mut per_core_hist[i], v, CORE_HISTORY); - } - } - - last_metrics = Some(m); - } - } - - terminal.draw(|f| { - let area = f.area(); - - let rows = Layout::default() - .direction(Direction::Vertical) - .constraints([ - Constraint::Length(1), - Constraint::Ratio(1, 3), - Constraint::Length(3), - Constraint::Length(3), - Constraint::Min(10), - ]) - .split(area); - - draw_header(f, rows[0], last_metrics.as_ref()); - - let top = Layout::default() - .direction(Direction::Horizontal) - .constraints([Constraint::Percentage(66), Constraint::Percentage(34)]) - .split(rows[1]); - - draw_cpu_avg_graph(f, top[0], &cpu_hist, last_metrics.as_ref()); - draw_per_core_bars(f, top[1], last_metrics.as_ref(), &per_core_hist); - - draw_mem(f, rows[2], last_metrics.as_ref()); - draw_swap(f, rows[3], last_metrics.as_ref()); - - let bottom = Layout::default() - .direction(Direction::Horizontal) - .constraints([Constraint::Percentage(66), Constraint::Percentage(34)]) - .split(rows[4]); - - let left_stack = Layout::default() - .direction(Direction::Vertical) - .constraints([Constraint::Min(6), Constraint::Length(4), Constraint::Length(4)]) - .split(bottom[0]); - - draw_disks(f, left_stack[0], last_metrics.as_ref()); - draw_net_spark( - f, - left_stack[1], - &format!("Download (KB/s) — now: {} | peak: {}", rx_hist.back().copied().unwrap_or(0), rx_peak), - &rx_hist, - Color::Green, - ); - draw_net_spark( - f, - left_stack[2], - &format!("Upload (KB/s) — now: {} | peak: {}", tx_hist.back().copied().unwrap_or(0), tx_peak), - &tx_hist, - Color::Blue, - ); - - draw_top_processes(f, bottom[1], last_metrics.as_ref()); - })?; - - sleep(Duration::from_millis(500)).await; - } - - disable_raw_mode()?; - let backend = terminal.backend_mut(); - execute!(backend, LeaveAlternateScreen)?; - terminal.show_cursor()?; - Ok(()) -} - -fn push_capped(dq: &mut VecDeque, v: T, cap: usize) { - if dq.len() == cap { dq.pop_front(); } - dq.push_back(v); -} - -fn human(b: u64) -> String { - const K: f64 = 1024.0; - let b = b as f64; - if b < K { return format!("{b:.0}B"); } - let kb = b / K; - if kb < K { return format!("{kb:.1}KB"); } - let mb = kb / K; - if mb < K { return format!("{mb:.1}MB"); } - let gb = mb / K; - if gb < K { return format!("{gb:.1}GB"); } - let tb = gb / K; - format!("{tb:.2}TB") -} - -fn draw_header(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) { - let title = if let Some(mm) = m { - let temp = mm.cpu_temp_c.map(|t| { - let icon = if t < 50.0 { "😎" } else if t < 85.0 { "⚠️" } else { "🔥" }; - format!("CPU Temp: {:.1}°C {}", t, icon) - }).unwrap_or_else(|| "CPU Temp: N/A".into()); - format!("socktop — host: {} | {} (press 'q' to quit)", mm.hostname, temp) - } else { - "socktop — connecting... (press 'q' to quit)".into() - }; - f.render_widget(Block::default().title(title).borders(Borders::BOTTOM), area); -} - -fn draw_cpu_avg_graph( - f: &mut ratatui::Frame<'_>, - area: Rect, - hist: &VecDeque, - m: Option<&Metrics>, -) { - let title = if let Some(mm) = m { format!("CPU avg (now: {:>5.1}%)", mm.cpu_total) } else { "CPU avg".into() }; - let max_points = area.width.saturating_sub(2) as usize; - let start = hist.len().saturating_sub(max_points); - let data: Vec = hist.iter().skip(start).cloned().collect(); - let spark = Sparkline::default() - .block(Block::default().borders(Borders::ALL).title(title)) - .data(&data) - .max(100) - .style(Style::default().fg(Color::Cyan)); - f.render_widget(spark, area); -} - -fn draw_per_core_bars( - f: &mut ratatui::Frame<'_>, - area: Rect, - m: Option<&Metrics>, - // 👇 add this param - per_core_hist: &Vec>, -) { - // frame - f.render_widget(Block::default().borders(Borders::ALL).title("Per-core"), area); - let Some(mm) = m else { return; }; - - let inner = Rect { x: area.x + 1, y: area.y + 1, width: area.width.saturating_sub(2), height: area.height.saturating_sub(2) }; - if inner.height == 0 { return; } - - // one row per core - let rows = inner.height as usize; - let show_n = rows.min(mm.cpu_per_core.len()); - let constraints: Vec = (0..show_n).map(|_| Constraint::Length(1)).collect(); - let vchunks = Layout::default().direction(Direction::Vertical).constraints(constraints).split(inner); - - for i in 0..show_n { - let rect = vchunks[i]; - - // split each row: sparkline (history) | stat text - let hchunks = Layout::default() - .direction(Direction::Horizontal) - .constraints([Constraint::Min(6), Constraint::Length(12)]) // was 10 → now 12 - .split(rect); - - let curr = mm.cpu_per_core[i].clamp(0.0, 100.0); - let older = per_core_hist.get(i) - .and_then(|d| d.iter().rev().nth(20).copied()) // ~10s back - .map(|v| v as f32) - .unwrap_or(curr); - let trend = if curr > older + 0.2 { "↑" } - else if curr + 0.2 < older { "↓" } - else { "╌" }; - - // colors by current load - let fg = match curr { - x if x < 25.0 => Color::Green, - x if x < 60.0 => Color::Yellow, - _ => Color::Red, - }; - - // history - let hist: Vec = per_core_hist - .get(i) - .map(|d| { - let max_points = hchunks[0].width as usize; - let start = d.len().saturating_sub(max_points); - d.iter().skip(start).map(|&v| v as u64).collect() - }) - .unwrap_or_default(); - - // sparkline - let spark = Sparkline::default() - .data(&hist) - .max(100) - .style(Style::default().fg(fg)); - f.render_widget(spark, hchunks[0]); // ✅ render_widget on rect - - // right stat “cpuN 37.2% ↑” - let label = format!("cpu{:<2}{}{:>5.1}%", i, trend, curr); - let line = Line::from(Span::styled(label, Style::default().fg(fg).add_modifier(Modifier::BOLD))); - let block = Block::default(); // no borders per row to keep it clean - f.render_widget(block, hchunks[1]); - f.render_widget(ratatui::widgets::Paragraph::new(line).right_aligned(), hchunks[1]); - } -} - -fn draw_mem(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) { - let (used, total, pct) = if let Some(mm) = m { - let pct = if mm.mem_total > 0 { (mm.mem_used as f64 / mm.mem_total as f64 * 100.0) as u16 } else { 0 }; - (mm.mem_used, mm.mem_total, pct) - } else { (0, 0, 0) }; - - let g = Gauge::default() - .block(Block::default().borders(Borders::ALL).title("Memory")) - .gauge_style(Style::default().fg(Color::Magenta)) - .percent(pct) - .label(format!("{} / {}", human(used), human(total))); - f.render_widget(g, area); -} - -fn draw_swap(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) { - let (used, total, pct) = if let Some(mm) = m { - let pct = if mm.swap_total > 0 { (mm.swap_used as f64 / mm.swap_total as f64 * 100.0) as u16 } else { 0 }; - (mm.swap_used, mm.swap_total, pct) - } else { (0, 0, 0) }; - - let g = Gauge::default() - .block(Block::default().borders(Borders::ALL).title("Swap")) - .gauge_style(Style::default().fg(Color::Yellow)) - .percent(pct) - .label(format!("{} / {}", human(used), human(total))); - f.render_widget(g, area); -} - -fn draw_disks(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) { - // Panel frame - f.render_widget(Block::default().borders(Borders::ALL).title("Disks"), area); - - let Some(mm) = m else { return; }; - - // Inner area inside the "Disks" panel - let inner = Rect { - x: area.x + 1, - y: area.y + 1, - width: area.width.saturating_sub(2), - height: area.height.saturating_sub(2), - }; - if inner.height < 3 { return; } - - // Each disk gets a 3-row card: [title line] + [gauge line] + [spacer] - // If we run out of height, we show as many as we can. - let per_disk_h = 3u16; - let max_cards = (inner.height / per_disk_h).min(mm.disks.len() as u16) as usize; - - // Build rows layout (Length(3) per disk) - let constraints: Vec = (0..max_cards).map(|_| Constraint::Length(per_disk_h)).collect(); - let rows = Layout::default() - .direction(Direction::Vertical) - .constraints(constraints) - .split(inner); - - for (i, slot) in rows.iter().enumerate() { - let d = &mm.disks[i]; - let used = d.total.saturating_sub(d.available); - let ratio = if d.total > 0 { used as f64 / d.total as f64 } else { 0.0 }; - let pct = (ratio * 100.0).round() as u16; - - // Color by severity - let color = if pct < 70 { Color::Green } else if pct < 90 { Color::Yellow } else { Color::Red }; - - // 1) Title line (name left, usage right), inside its own little block - let title = format!( - "{} {} {} / {} ({}%)", - disk_icon(&d.name), - truncate_middle(&d.name, (slot.width.saturating_sub(6)) as usize / 2), - human(used), - human(d.total), - pct - ); - - // Card frame (thin border per disk) - let card = Block::default().borders(Borders::ALL).title(title); - - // Render card covering the whole 3-row slot - f.render_widget(card, *slot); - - // 2) Gauge on the second line inside the card - // Compute an inner rect (strip card borders), then pick the middle line for the bar - let inner_card = Rect { - x: slot.x + 1, - y: slot.y + 1, - width: slot.width.saturating_sub(2), - height: slot.height.saturating_sub(2), - }; - if inner_card.height == 0 { continue; } - - // Center line for the gauge - let gauge_rect = Rect { - x: inner_card.x, - y: inner_card.y + inner_card.height / 2, // 1 line down inside the card - width: inner_card.width, - height: 1, - }; - - let g = Gauge::default() - .percent(pct) - .gauge_style(Style::default().fg(color)); - - f.render_widget(g, gauge_rect); - } -} - -fn disk_icon(name: &str) -> &'static str { - let n = name.to_ascii_lowercase(); - if n.contains(":") { "🗄️" } // network mount - else if n.contains("nvme") { "⚡" } // nvme - else if n.starts_with("sd") { "💽" } // sata - else if n.contains("overlay") { "📦" } // containers/overlayfs - else { "🖴" } // generic drive -} - -// Optional helper to keep device names tidy in the title -fn truncate_middle(s: &str, max: usize) -> String { - if s.len() <= max { return s.to_string(); } - if max <= 3 { return "...".into(); } - let keep = max - 3; - let left = keep / 2; - let right = keep - left; - format!("{}...{}", &s[..left], &s[s.len()-right..]) -} - - -fn draw_net_spark( - f: &mut ratatui::Frame<'_>, - area: Rect, - title: &str, - hist: &VecDeque, - color: Color, -) { - let max_points = area.width.saturating_sub(2) as usize; - let start = hist.len().saturating_sub(max_points); - let data: Vec = hist.iter().skip(start).cloned().collect(); - - let spark = Sparkline::default() - .block(Block::default().borders(Borders::ALL).title(title.to_string())) - .data(&data) - .style(Style::default().fg(color)); - f.render_widget(spark, area); -} - -fn draw_top_processes(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) { - let Some(mm) = m else { - f.render_widget(Block::default().borders(Borders::ALL).title("Top Processes"), area); - return; - }; - - let total_mem_bytes = mm.mem_total.max(1); // avoid div-by-zero - let title = format!("Top Processes ({} total)", mm.process_count); - - // Precompute peak CPU to highlight the hog - let peak_cpu = mm.top_processes.iter().map(|p| p.cpu_usage).fold(0.0_f32, f32::max); - - // Build rows with per-cell coloring + zebra striping - let rows: Vec = mm.top_processes.iter().enumerate().map(|(i, p)| { - let mem_pct = (p.mem_bytes as f64 / total_mem_bytes as f64) * 100.0; - - // Color helpers - let cpu_fg = match p.cpu_usage { - x if x < 25.0 => Color::Green, - x if x < 60.0 => Color::Yellow, - _ => Color::Red, - }; - let mem_fg = match mem_pct { - x if x < 5.0 => Color::Blue, - x if x < 20.0 => Color::Magenta, - _ => Color::Red, - }; - - // Light zebra striping (only foreground shift to avoid loud backgrounds) - let zebra = if i % 2 == 0 { Style::default().fg(Color::Gray) } else { Style::default() }; - - // Emphasize the single top CPU row - let emphasis = if (p.cpu_usage - peak_cpu).abs() < f32::EPSILON { - Style::default().add_modifier(Modifier::BOLD) - } else { Style::default() }; - - Row::new(vec![ - Cell::from(p.pid.to_string()).style(Style::default().fg(Color::DarkGray)), - Cell::from(p.name.clone()), - Cell::from(format!("{:.1}%", p.cpu_usage)).style(Style::default().fg(cpu_fg)), - Cell::from(human(p.mem_bytes)), - Cell::from(format!("{:.2}%", mem_pct)).style(Style::default().fg(mem_fg)), - ]) - .style(zebra.patch(emphasis)) - }).collect(); - - let header = Row::new(vec!["PID", "Name", "CPU %", "Mem", "Mem %"]) - .style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)); - - let table = Table::new( - rows, - vec![ - Constraint::Length(8), // PID - Constraint::Percentage(40), // Name - Constraint::Length(8), // CPU % - Constraint::Length(12), // Mem - Constraint::Length(8), // Mem % - ], - ) - .header(header) - .column_spacing(1) - .block(Block::default().borders(Borders::ALL).title(title)); - - f.render_widget(table, area); -} + let url = args[1].clone(); + let mut app = App::new(); + app.run(&url).await +} \ No newline at end of file diff --git a/socktop/src/types.rs b/socktop/src/types.rs new file mode 100644 index 0000000..8a4093f --- /dev/null +++ b/socktop/src/types.rs @@ -0,0 +1,41 @@ +//! Types that mirror the agent's JSON schema. + +use serde::Deserialize; + +#[derive(Debug, Deserialize, Clone)] +pub struct Disk { + pub name: String, + pub total: u64, + pub available: u64, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct Network { + // cumulative totals; client diffs to compute rates + pub received: u64, + pub transmitted: u64, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct ProcessInfo { + pub pid: u32, + pub name: String, + pub cpu_usage: f32, + pub mem_bytes: u64, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct Metrics { + pub cpu_total: f32, + pub cpu_per_core: Vec, + pub mem_total: u64, + pub mem_used: u64, + pub swap_total: u64, + pub swap_used: u64, + pub process_count: usize, + pub hostname: String, + pub cpu_temp_c: Option, + pub disks: Vec, + pub networks: Vec, + pub top_processes: Vec, +} \ No newline at end of file diff --git a/socktop/src/ui/cpu.rs b/socktop/src/ui/cpu.rs new file mode 100644 index 0000000..84082d3 --- /dev/null +++ b/socktop/src/ui/cpu.rs @@ -0,0 +1,91 @@ +//! CPU average sparkline + per-core mini bars. + +use ratatui::{ + layout::{Constraint, Direction, Layout, Rect}, + style::{Color, Style}, + text::{Line, Span}, + widgets::{Block, Borders, Paragraph, Sparkline}, +}; +use ratatui::style::Modifier; + +use crate::history::PerCoreHistory; +use crate::types::Metrics; + +pub fn draw_cpu_avg_graph( + f: &mut ratatui::Frame<'_>, + area: Rect, + hist: &std::collections::VecDeque, + m: Option<&Metrics>, +) { + let title = if let Some(mm) = m { format!("CPU avg (now: {:>5.1}%)", mm.cpu_total) } else { "CPU avg".into() }; + let max_points = area.width.saturating_sub(2) as usize; + let start = hist.len().saturating_sub(max_points); + let data: Vec = hist.iter().skip(start).cloned().collect(); + let spark = Sparkline::default() + .block(Block::default().borders(Borders::ALL).title(title)) + .data(&data) + .max(100) + .style(Style::default().fg(Color::Cyan)); + f.render_widget(spark, area); +} + +pub fn draw_per_core_bars( + f: &mut ratatui::Frame<'_>, + area: Rect, + m: Option<&Metrics>, + per_core_hist: &PerCoreHistory, +) { + f.render_widget(Block::default().borders(Borders::ALL).title("Per-core"), area); + let Some(mm) = m else { return; }; + + let inner = Rect { x: area.x + 1, y: area.y + 1, width: area.width.saturating_sub(2), height: area.height.saturating_sub(2) }; + if inner.height == 0 { return; } + + let rows = inner.height as usize; + let show_n = rows.min(mm.cpu_per_core.len()); + let constraints: Vec = (0..show_n).map(|_| Constraint::Length(1)).collect(); + let vchunks = Layout::default().direction(Direction::Vertical).constraints(constraints).split(inner); + + for i in 0..show_n { + let rect = vchunks[i]; + let hchunks = Layout::default() + .direction(Direction::Horizontal) + .constraints([Constraint::Min(6), Constraint::Length(12)]) + .split(rect); + + let curr = mm.cpu_per_core[i].clamp(0.0, 100.0); + let older = per_core_hist.deques.get(i) + .and_then(|d| d.iter().rev().nth(20).copied()) + .map(|v| v as f32) + .unwrap_or(curr); + let trend = if curr > older + 0.2 { "↑" } + else if curr + 0.2 < older { "↓" } + else { "╌" }; + + let fg = match curr { + x if x < 25.0 => Color::Green, + x if x < 60.0 => Color::Yellow, + _ => Color::Red, + }; + + let hist: Vec = per_core_hist + .deques + .get(i) + .map(|d| { + let max_points = hchunks[0].width as usize; + let start = d.len().saturating_sub(max_points); + d.iter().skip(start).map(|&v| v as u64).collect() + }) + .unwrap_or_default(); + + let spark = Sparkline::default() + .data(&hist) + .max(100) + .style(Style::default().fg(fg)); + f.render_widget(spark, hchunks[0]); + + let label = format!("cpu{:<2}{}{:>5.1}%", i, trend, curr); + let line = Line::from(Span::styled(label, Style::default().fg(fg).add_modifier(Modifier::BOLD))); + f.render_widget(Paragraph::new(line).right_aligned(), hchunks[1]); + } +} \ No newline at end of file diff --git a/socktop/src/ui/disks.rs b/socktop/src/ui/disks.rs new file mode 100644 index 0000000..1da594c --- /dev/null +++ b/socktop/src/ui/disks.rs @@ -0,0 +1,73 @@ +//! Disk cards with per-device gauge and title line. + +use ratatui::{ + layout::{Constraint, Direction, Layout, Rect}, + style::Style, + widgets::{Block, Borders, Gauge}, +}; +use crate::types::Metrics; +use crate::ui::util::{human, truncate_middle, disk_icon}; + +pub fn draw_disks(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) { + f.render_widget(Block::default().borders(Borders::ALL).title("Disks"), area); + let Some(mm) = m else { return; }; + + let inner = Rect { + x: area.x + 1, + y: area.y + 1, + width: area.width.saturating_sub(2), + height: area.height.saturating_sub(2), + }; + if inner.height < 3 { return; } + + let per_disk_h = 3u16; + let max_cards = (inner.height / per_disk_h).min(mm.disks.len() as u16) as usize; + + let constraints: Vec = (0..max_cards).map(|_| Constraint::Length(per_disk_h)).collect(); + let rows = Layout::default() + .direction(Direction::Vertical) + .constraints(constraints) + .split(inner); + + for (i, slot) in rows.iter().enumerate() { + let d = &mm.disks[i]; + let used = d.total.saturating_sub(d.available); + let ratio = if d.total > 0 { used as f64 / d.total as f64 } else { 0.0 }; + let pct = (ratio * 100.0).round() as u16; + + let color = if pct < 70 { ratatui::style::Color::Green } else if pct < 90 { ratatui::style::Color::Yellow } else { ratatui::style::Color::Red }; + + let title = format!( + "{} {} {} / {} ({}%)", + disk_icon(&d.name), + truncate_middle(&d.name, (slot.width.saturating_sub(6)) as usize / 2), + human(used), + human(d.total), + pct + ); + + let card = Block::default().borders(Borders::ALL).title(title); + f.render_widget(card, *slot); + + let inner_card = Rect { + x: slot.x + 1, + y: slot.y + 1, + width: slot.width.saturating_sub(2), + height: slot.height.saturating_sub(2), + }; + if inner_card.height == 0 { continue; } + + let gauge_rect = Rect { + x: inner_card.x, + y: inner_card.y + inner_card.height / 2, + width: inner_card.width, + height: 1, + }; + + let g = Gauge::default() + .percent(pct) + .gauge_style(Style::default().fg(color)); + + f.render_widget(g, gauge_rect); + } +} \ No newline at end of file diff --git a/socktop/src/ui/header.rs b/socktop/src/ui/header.rs new file mode 100644 index 0000000..bd255ab --- /dev/null +++ b/socktop/src/ui/header.rs @@ -0,0 +1,20 @@ +//! Top header with hostname and CPU temperature indicator. + +use ratatui::{ + layout::Rect, + widgets::{Block, Borders}, +}; +use crate::types::Metrics; + +pub fn draw_header(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) { + let title = if let Some(mm) = m { + let temp = mm.cpu_temp_c.map(|t| { + let icon = if t < 50.0 { "😎" } else if t < 85.0 { "⚠️" } else { "🔥" }; + format!("CPU Temp: {:.1}°C {}", t, icon) + }).unwrap_or_else(|| "CPU Temp: N/A".into()); + format!("socktop — host: {} | {} (press 'q' to quit)", mm.hostname, temp) + } else { + "socktop — connecting... (press 'q' to quit)".into() + }; + f.render_widget(Block::default().title(title).borders(Borders::BOTTOM), area); +} \ No newline at end of file diff --git a/socktop/src/ui/mem.rs b/socktop/src/ui/mem.rs new file mode 100644 index 0000000..f873e92 --- /dev/null +++ b/socktop/src/ui/mem.rs @@ -0,0 +1,23 @@ +//! Memory gauge. + +use ratatui::{ + layout::Rect, + style::{Color, Style}, + widgets::{Block, Borders, Gauge}, +}; +use crate::types::Metrics; +use crate::ui::util::human; + +pub fn draw_mem(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) { + let (used, total, pct) = if let Some(mm) = m { + let pct = if mm.mem_total > 0 { (mm.mem_used as f64 / mm.mem_total as f64 * 100.0) as u16 } else { 0 }; + (mm.mem_used, mm.mem_total, pct) + } else { (0, 0, 0) }; + + let g = Gauge::default() + .block(Block::default().borders(Borders::ALL).title("Memory")) + .gauge_style(Style::default().fg(Color::Magenta)) + .percent(pct) + .label(format!("{} / {}", human(used), human(total))); + f.render_widget(g, area); +} \ No newline at end of file diff --git a/socktop/src/ui/mod.rs b/socktop/src/ui/mod.rs new file mode 100644 index 0000000..1ef9fb0 --- /dev/null +++ b/socktop/src/ui/mod.rs @@ -0,0 +1,10 @@ +//! UI module root: exposes drawing functions for individual panels. + +pub mod header; +pub mod cpu; +pub mod mem; +pub mod swap; +pub mod disks; +pub mod net; +pub mod processes; +pub mod util; \ No newline at end of file diff --git a/socktop/src/ui/net.rs b/socktop/src/ui/net.rs new file mode 100644 index 0000000..adf048c --- /dev/null +++ b/socktop/src/ui/net.rs @@ -0,0 +1,26 @@ +//! Network sparklines (download/upload). + +use std::collections::VecDeque; +use ratatui::{ + layout::Rect, + style::{Color, Style}, + widgets::{Block, Borders, Sparkline}, +}; + +pub fn draw_net_spark( + f: &mut ratatui::Frame<'_>, + area: Rect, + title: &str, + hist: &VecDeque, + color: Color, +) { + let max_points = area.width.saturating_sub(2) as usize; + let start = hist.len().saturating_sub(max_points); + let data: Vec = hist.iter().skip(start).cloned().collect(); + + let spark = Sparkline::default() + .block(Block::default().borders(Borders::ALL).title(title.to_string())) + .data(&data) + .style(Style::default().fg(color)); + f.render_widget(spark, area); +} \ No newline at end of file diff --git a/socktop/src/ui/processes.rs b/socktop/src/ui/processes.rs new file mode 100644 index 0000000..5826309 --- /dev/null +++ b/socktop/src/ui/processes.rs @@ -0,0 +1,71 @@ +//! Top processes table with per-cell coloring and zebra striping. + +use ratatui::{ + layout::{Constraint, Rect}, + style::{Color, Style}, + widgets::{Block, Borders, Cell, Row, Table}, +}; +use ratatui::style::Modifier; + +use crate::types::Metrics; +use crate::ui::util::human; + +pub fn draw_top_processes(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) { + let Some(mm) = m else { + f.render_widget(Block::default().borders(Borders::ALL).title("Top Processes"), area); + return; + }; + + let total_mem_bytes = mm.mem_total.max(1); + let title = format!("Top Processes ({} total)", mm.process_count); + let peak_cpu = mm.top_processes.iter().map(|p| p.cpu_usage).fold(0.0_f32, f32::max); + + let rows: Vec = mm.top_processes.iter().enumerate().map(|(i, p)| { + let mem_pct = (p.mem_bytes as f64 / total_mem_bytes as f64) * 100.0; + + let cpu_fg = match p.cpu_usage { + x if x < 25.0 => Color::Green, + x if x < 60.0 => Color::Yellow, + _ => Color::Red, + }; + let mem_fg = match mem_pct { + x if x < 5.0 => Color::Blue, + x if x < 20.0 => Color::Magenta, + _ => Color::Red, + }; + + let zebra = if i % 2 == 0 { Style::default().fg(Color::Gray) } else { Style::default() }; + + let emphasis = if (p.cpu_usage - peak_cpu).abs() < f32::EPSILON { + Style::default().add_modifier(Modifier::BOLD) + } else { Style::default() }; + + Row::new(vec![ + Cell::from(p.pid.to_string()).style(Style::default().fg(Color::DarkGray)), + Cell::from(p.name.clone()), + Cell::from(format!("{:.1}%", p.cpu_usage)).style(Style::default().fg(cpu_fg)), + Cell::from(human(p.mem_bytes)), + Cell::from(format!("{:.2}%", mem_pct)).style(Style::default().fg(mem_fg)), + ]) + .style(zebra.patch(emphasis)) + }).collect(); + + let header = Row::new(vec!["PID", "Name", "CPU %", "Mem", "Mem %"]) + .style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)); + + let table = Table::new( + rows, + vec![ + Constraint::Length(8), + Constraint::Percentage(40), + Constraint::Length(8), + Constraint::Length(12), + Constraint::Length(8), + ], + ) + .header(header) + .column_spacing(1) + .block(Block::default().borders(Borders::ALL).title(title)); + + f.render_widget(table, area); +} \ No newline at end of file diff --git a/socktop/src/ui/swap.rs b/socktop/src/ui/swap.rs new file mode 100644 index 0000000..a077296 --- /dev/null +++ b/socktop/src/ui/swap.rs @@ -0,0 +1,23 @@ +//! Swap gauge. + +use ratatui::{ + layout::Rect, + style::{Color, Style}, + widgets::{Block, Borders, Gauge}, +}; +use crate::types::Metrics; +use crate::ui::util::human; + +pub fn draw_swap(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) { + let (used, total, pct) = if let Some(mm) = m { + let pct = if mm.swap_total > 0 { (mm.swap_used as f64 / mm.swap_total as f64 * 100.0) as u16 } else { 0 }; + (mm.swap_used, mm.swap_total, pct) + } else { (0, 0, 0) }; + + let g = Gauge::default() + .block(Block::default().borders(Borders::ALL).title("Swap")) + .gauge_style(Style::default().fg(Color::Yellow)) + .percent(pct) + .label(format!("{} / {}", human(used), human(total))); + f.render_widget(g, area); +} \ No newline at end of file diff --git a/socktop/src/ui/util.rs b/socktop/src/ui/util.rs new file mode 100644 index 0000000..c78afde --- /dev/null +++ b/socktop/src/ui/util.rs @@ -0,0 +1,33 @@ +//! Small UI helpers: human-readable sizes, truncation, icons. + +pub fn human(b: u64) -> String { + const K: f64 = 1024.0; + let b = b as f64; + if b < K { return format!("{b:.0}B"); } + let kb = b / K; + if kb < K { return format!("{kb:.1}KB"); } + let mb = kb / K; + if mb < K { return format!("{mb:.1}MB"); } + let gb = mb / K; + if gb < K { return format!("{gb:.1}GB"); } + let tb = gb / K; + format!("{tb:.2}TB") +} + +pub fn truncate_middle(s: &str, max: usize) -> String { + if s.len() <= max { return s.to_string(); } + if max <= 3 { return "...".into(); } + let keep = max - 3; + let left = keep / 2; + let right = keep - left; + format!("{}...{}", &s[..left], &s[s.len()-right..]) +} + +pub fn disk_icon(name: &str) -> &'static str { + let n = name.to_ascii_lowercase(); + if n.contains(':') { "🗄️" } + else if n.contains("nvme") { "⚡" } + else if n.starts_with("sd") { "💽" } + else if n.contains("overlay") { "📦" } + else { "🖴" } +} \ No newline at end of file diff --git a/socktop/src/ws.rs b/socktop/src/ws.rs new file mode 100644 index 0000000..d032f2d --- /dev/null +++ b/socktop/src/ws.rs @@ -0,0 +1,28 @@ +//! Minimal WebSocket client helpers for requesting metrics from the agent. + +use tokio::net::TcpStream; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +use crate::types::Metrics; + +pub type WsStream = WebSocketStream>; + +// Connect to the agent and return the WS stream +pub async fn connect(url: &str) -> Result> { + let (ws, _) = connect_async(url).await?; + Ok(ws) +} + +// Send a "get_metrics" request and await a single JSON reply +pub async fn request_metrics(ws: &mut WsStream) -> Option { + if ws.send(Message::Text("get_metrics".into())).await.is_err() { + return None; + } + match ws.next().await { + Some(Ok(Message::Text(json))) => serde_json::from_str::(&json).ok(), + _ => None, + } +} + +// Re-export SinkExt/StreamExt for call sites +use futures_util::{SinkExt, StreamExt}; \ No newline at end of file diff --git a/socktop_agent/Cargo.toml b/socktop_agent/Cargo.toml index 752ea65..f8222a8 100644 --- a/socktop_agent/Cargo.toml +++ b/socktop_agent/Cargo.toml @@ -12,4 +12,6 @@ sysinfo = "0.36.1" serde = { version = "1", features = ["derive"] } serde_json = "1" futures = "0.3" -futures-util = "0.3.31" \ No newline at end of file +futures-util = "0.3.31" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } \ No newline at end of file diff --git a/socktop_agent/src/main.rs b/socktop_agent/src/main.rs index bf49448..14a940d 100644 --- a/socktop_agent/src/main.rs +++ b/socktop_agent/src/main.rs @@ -1,217 +1,136 @@ -use axum::{ - extract::{ - ws::{Message, WebSocket, WebSocketUpgrade}, - State, - }, - response::IntoResponse, - routing::get, - Router, -}; -use futures_util::stream::StreamExt; -use serde::Serialize; -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +//! socktop agent entrypoint: sets up sysinfo handles, launches a sampler, +//! and serves a WebSocket endpoint at /ws. + +mod metrics; +mod sampler; +mod state; +mod ws; +mod types; + +use axum::{routing::get, Router}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration, sync::atomic::AtomicUsize}; use sysinfo::{ Components, CpuRefreshKind, Disks, MemoryRefreshKind, Networks, ProcessRefreshKind, RefreshKind, System, }; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock, Notify}; +use tracing_subscriber::EnvFilter; -// ---------- Data types sent to the client ---------- - -#[derive(Debug, Serialize, Clone)] -struct ProcessInfo { - pid: u32, - name: String, - cpu_usage: f32, - mem_bytes: u64, -} - -#[derive(Debug, Serialize, Clone)] -struct DiskInfo { - name: String, - total: u64, - available: u64, -} - -#[derive(Debug, Serialize, Clone)] -struct NetworkInfo { - name: String, - // cumulative totals since the agent started (client should diff to get rates) - received: u64, - transmitted: u64, -} - -#[derive(Debug, Serialize, Clone)] -struct Metrics { - cpu_total: f32, - cpu_per_core: Vec, - mem_total: u64, - mem_used: u64, - swap_total: u64, - swap_used: u64, - process_count: usize, - hostname: String, - cpu_temp_c: Option, - disks: Vec, - networks: Vec, - top_processes: Vec, -} - -// ---------- Shared state ---------- - -type SharedSystem = Arc>; -type SharedNetworks = Arc>; -type SharedTotals = Arc>>; // iface -> (rx_total, tx_total) - -#[derive(Clone)] -struct AppState { - sys: SharedSystem, - nets: SharedNetworks, - net_totals: SharedTotals, -} +use state::{AppState, SharedTotals}; +use sampler::spawn_sampler; +use ws::ws_handler; #[tokio::main] async fn main() { - // sysinfo 0.36: build specifics + + // Init logging; configure with RUST_LOG (e.g., RUST_LOG=info). + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .with_target(false) + .compact() + .init(); + + // sysinfo build specifics (scopes what refresh_all() will touch internally) let refresh_kind = RefreshKind::nothing() .with_cpu(CpuRefreshKind::everything()) .with_memory(MemoryRefreshKind::everything()) .with_processes(ProcessRefreshKind::everything()); + // Initialize sysinfo handles once and keep them alive let mut sys = System::new_with_specifics(refresh_kind); sys.refresh_all(); - // Keep Networks alive across requests so received()/transmitted() deltas work let mut nets = Networks::new(); nets.refresh(true); - let shared = Arc::new(Mutex::new(sys)); - let shared_nets = Arc::new(Mutex::new(nets)); - let net_totals: SharedTotals = Arc::new(Mutex::new(HashMap::new())); + let mut components = Components::new(); + components.refresh(true); - let app = Router::new() - .route("/ws", get(ws_handler)) - .with_state(AppState { - sys: shared, - nets: shared_nets, - net_totals, - }); + let mut disks = Disks::new(); + disks.refresh(true); - let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); + // Shared state across requests + let state = AppState { + sys: Arc::new(Mutex::new(sys)), + nets: Arc::new(Mutex::new(nets)), + net_totals: Arc::new(Mutex::new(HashMap::::new())) as SharedTotals, + components: Arc::new(Mutex::new(components)), + disks: Arc::new(Mutex::new(disks)), + last_json: Arc::new(RwLock::new(String::new())), + // new: adaptive sampling controls + client_count: Arc::new(AtomicUsize::new(0)), + wake_sampler: Arc::new(Notify::new()), + auth_token: std::env::var("SOCKTOP_TOKEN").ok().filter(|s| !s.is_empty()), + }; + + // Start background sampler (adjust cadence as needed) + let _sampler = spawn_sampler(state.clone(), Duration::from_millis(500)); + + // Web app + let port = resolve_port(); + let app = Router::new().route("/ws", get(ws_handler)).with_state(state); + + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + + //output to console println!("Remote agent running at http://{}", addr); + println!("WebSocket endpoint: ws://{}/ws", addr); + + //trace logging + tracing::info!("Remote agent running at http://{} (ws at /ws)", addr); + tracing::info!("WebSocket endpoint: ws://{}/ws", addr); let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); axum::serve(listener, app).await.unwrap(); + } -async fn ws_handler(ws: WebSocketUpgrade, State(state): State) -> impl IntoResponse { - ws.on_upgrade(move |socket| handle_socket(socket, state)) -} +// Resolve the listening port from CLI args/env with a 3000 default. +// Supports: --port , -p , a bare numeric positional arg, or SOCKTOP_PORT. +fn resolve_port() -> u16 { + const DEFAULT: u16 = 3000; -async fn handle_socket(mut socket: WebSocket, state: AppState) { - while let Some(Ok(msg)) = socket.next().await { - if let Message::Text(text) = msg { - if text == "get_metrics" { - let metrics = collect_metrics(&state).await; - let json = serde_json::to_string(&metrics).unwrap(); - let _ = socket.send(Message::Text(json)).await; + // Env takes precedence over positional, but is overridden by explicit flags if present. + if let Ok(s) = std::env::var("SOCKTOP_PORT") { + if let Ok(p) = s.parse::() { + if p != 0 { + return p; + } + } + eprintln!("Warning: invalid SOCKTOP_PORT='{}'; using default {}", s, DEFAULT); + } + + let mut args = std::env::args().skip(1); + while let Some(arg) = args.next() { + match arg.as_str() { + "--port" | "-p" => { + if let Some(v) = args.next() { + match v.parse::() { + Ok(p) if p != 0 => return p, + _ => { + eprintln!("Invalid port '{}'; using default {}", v, DEFAULT); + return DEFAULT; + } + } + } else { + eprintln!("Missing value for {} ; using default {}", arg, DEFAULT); + return DEFAULT; + } + } + "--help" | "-h" => { + println!("Usage: socktop_agent [--port ] [PORT]\n SOCKTOP_PORT= socktop_agent"); + std::process::exit(0); + } + s => { + if let Ok(p) = s.parse::() { + if p != 0 { + return p; + } + } } } } + + DEFAULT } -// ---------- Metrics collection ---------- - -async fn collect_metrics(state: &AppState) -> Metrics { - // System (CPU/mem/proc) - let mut sys = state.sys.lock().await; - sys.refresh_all(); - - let hostname = System::host_name().unwrap_or_else(|| "unknown".into()); - - // Temps via Components (separate handle in 0.36) - let mut components = Components::new(); - components.refresh(true); - let cpu_temp_c = best_cpu_temp(&components); - - // Disks (separate handle in 0.36) - let mut disks_struct = Disks::new(); - disks_struct.refresh(true); - // Filter anything with available == 0 (e.g., overlay) - let disks: Vec = disks_struct - .list() - .iter() - .filter(|d| d.available_space() > 0) - .map(|d| DiskInfo { - name: d.name().to_string_lossy().to_string(), - total: d.total_space(), - available: d.available_space(), - }) - .collect(); - - // Networks: use a persistent Networks + rolling totals - let mut nets = state.nets.lock().await; - nets.refresh(true); - - let mut totals = state.net_totals.lock().await; - let mut networks: Vec = Vec::new(); - - for (name, data) in nets.iter() { - // sysinfo 0.36: data.received()/transmitted() are deltas since *last* refresh - let delta_rx = data.received(); - let delta_tx = data.transmitted(); - - let entry = totals.entry(name.clone()).or_insert((0, 0)); - entry.0 = entry.0.saturating_add(delta_rx); - entry.1 = entry.1.saturating_add(delta_tx); - - networks.push(NetworkInfo { - name: name.clone(), - received: entry.0, - transmitted: entry.1, - }); - } - - // get number of cpu cores - let n_cpus = sys.cpus().len().max(1) as f32; - - // Top processes: include PID and memory, top 20 by CPU - let mut top_processes: Vec = sys - .processes() - .values() - .map(|p| ProcessInfo { - pid: p.pid().as_u32(), - name: p.name().to_string_lossy().to_string(), - cpu_usage: (p.cpu_usage() / n_cpus).min(100.0), - mem_bytes: p.memory(), // sysinfo 0.36: bytes - }) - .collect(); - top_processes.sort_by(|a, b| b.cpu_usage.partial_cmp(&a.cpu_usage).unwrap()); - top_processes.truncate(20); - - Metrics { - cpu_total: sys.global_cpu_usage(), - cpu_per_core: sys.cpus().iter().map(|c| c.cpu_usage()).collect(), - mem_total: sys.total_memory(), - mem_used: sys.used_memory(), - swap_total: sys.total_swap(), - swap_used: sys.used_swap(), - process_count: sys.processes().len(), - hostname, - cpu_temp_c, - disks, - networks, - top_processes, - } -} - -fn best_cpu_temp(components: &Components) -> Option { - components - .iter() - .filter(|c| { - let label = c.label().to_lowercase(); - label.contains("cpu") || label.contains("package") || label.contains("tctl") || label.contains("tdie") - }) - .filter_map(|c| c.temperature()) - .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)) -} diff --git a/socktop_agent/src/metrics.rs b/socktop_agent/src/metrics.rs new file mode 100644 index 0000000..751889f --- /dev/null +++ b/socktop_agent/src/metrics.rs @@ -0,0 +1,128 @@ +//! Metrics collection using sysinfo. Keeps sysinfo handles in AppState to +//! avoid repeated allocations and allow efficient refreshes. + +use crate::state::AppState; +use crate::types::{DiskInfo, Metrics, NetworkInfo, ProcessInfo}; +use sysinfo::{Components, System}; + +pub async fn collect_metrics(state: &AppState) -> Metrics { + // System (CPU/mem/proc) + let mut sys = state.sys.lock().await; + // Simple and safe — can be replaced by more granular refresh if desired: + // sys.refresh_cpu(); sys.refresh_memory(); sys.refresh_processes_specifics(...); + //sys.refresh_all(); + //refresh all was found to use 2X CPU rather than individual refreshes + sys.refresh_cpu_all(); + sys.refresh_memory(); + sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true); + + let hostname = System::host_name().unwrap_or_else(|| "unknown".into()); + + // Temps via a persistent Components handle + let mut components = state.components.lock().await; + components.refresh(true); + let cpu_temp_c = best_cpu_temp(&components); + + // Disks via a persistent Disks handle + let mut disks_struct = state.disks.lock().await; + disks_struct.refresh(true); + // Filter anything with available == 0 (e.g., overlay/virtual) + let disks: Vec = disks_struct + .list() + .iter() + .filter(|d| d.available_space() > 0) + .map(|d| DiskInfo { + name: d.name().to_string_lossy().to_string(), + total: d.total_space(), + available: d.available_space(), + }) + .collect(); + + // Networks: use a persistent Networks + rolling totals + let mut nets = state.nets.lock().await; + nets.refresh(true); + let mut totals = state.net_totals.lock().await; + let mut networks: Vec = Vec::new(); + for (name, data) in nets.iter() { + // sysinfo: received()/transmitted() are deltas since last refresh + let delta_rx = data.received(); + let delta_tx = data.transmitted(); + + let entry = totals.entry(name.clone()).or_insert((0, 0)); + entry.0 = entry.0.saturating_add(delta_rx); + entry.1 = entry.1.saturating_add(delta_tx); + + networks.push(NetworkInfo { + name: name.clone(), + received: entry.0, + transmitted: entry.1, + }); + } + + // Normalize process CPU to 0..100 across all cores + let n_cpus = sys.cpus().len().max(1) as f32; + + // Build process list + let mut procs: Vec = sys + .processes() + .values() + .map(|p| ProcessInfo { + pid: p.pid().as_u32(), + name: p.name().to_string_lossy().to_string(), + cpu_usage: (p.cpu_usage() / n_cpus).min(100.0), + mem_bytes: p.memory(), + }) + .collect(); + + // Partial select: get the top 20 by CPU without fully sorting the vector + const TOP_N: usize = 20; + if procs.len() > TOP_N { + // nth index is TOP_N-1 (0-based) + let nth = TOP_N - 1; + procs.select_nth_unstable_by(nth, |a, b| { + b.cpu_usage + .partial_cmp(&a.cpu_usage) + .unwrap_or(std::cmp::Ordering::Equal) + }); + procs.truncate(TOP_N); + // Order those 20 nicely for display + procs.sort_by(|a, b| { + b.cpu_usage + .partial_cmp(&a.cpu_usage) + .unwrap_or(std::cmp::Ordering::Equal) + }); + } else { + procs.sort_by(|a, b| { + b.cpu_usage + .partial_cmp(&a.cpu_usage) + .unwrap_or(std::cmp::Ordering::Equal) + }); + } + + Metrics { + cpu_total: sys.global_cpu_usage(), + cpu_per_core: sys.cpus().iter().map(|c| c.cpu_usage()).collect(), + mem_total: sys.total_memory(), + mem_used: sys.used_memory(), + swap_total: sys.total_swap(), + swap_used: sys.used_swap(), + process_count: sys.processes().len(), + hostname, + cpu_temp_c, + disks, + networks, + top_processes: procs, + } +} + +// Pick the hottest CPU-like sensor (labels vary by platform) +pub fn best_cpu_temp(components: &Components) -> Option { + components + .iter() + .filter(|c| { + let label = c.label().to_lowercase(); + label.contains("cpu") || label.contains("package") || label.contains("tctl") || label.contains("tdie") + }) + .filter_map(|c| c.temperature()) + .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)) +} \ No newline at end of file diff --git a/socktop_agent/src/sampler.rs b/socktop_agent/src/sampler.rs new file mode 100644 index 0000000..3ce3843 --- /dev/null +++ b/socktop_agent/src/sampler.rs @@ -0,0 +1,36 @@ +//! Background sampler: periodically collects metrics and updates a JSON cache, +//! so WS replies are just a read of the cached string. + +use crate::metrics::collect_metrics; +use crate::state::AppState; +//use serde_json::to_string; +use tokio::task::JoinHandle; +use tokio::time::{Duration, interval, MissedTickBehavior}; + +pub fn spawn_sampler(state: AppState, period: Duration) -> JoinHandle<()> { + tokio::spawn(async move { + let idle_period = Duration::from_secs(10); + loop { + let active = state.client_count.load(std::sync::atomic::Ordering::Relaxed) > 0; + let mut ticker = interval(if active { period } else { idle_period }); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + ticker.tick().await; + + if !active { + tokio::select! { + _ = ticker.tick() => {}, + _ = state.wake_sampler.notified() => continue, + } + } + + if let Ok(json) = async { + let m = collect_metrics(&state).await; + serde_json::to_string(&m) + } + .await + { + *state.last_json.write().await = json; + } + } + }) +} \ No newline at end of file diff --git a/socktop_agent/src/state.rs b/socktop_agent/src/state.rs new file mode 100644 index 0000000..eb4000d --- /dev/null +++ b/socktop_agent/src/state.rs @@ -0,0 +1,30 @@ +//! Shared agent state: sysinfo handles and hot JSON cache. + +use std::{collections::HashMap, sync::Arc}; +use std::sync::atomic::AtomicUsize; +use sysinfo::{Components, Disks, Networks, System}; +use tokio::sync::{Mutex, RwLock, Notify}; + +pub type SharedSystem = Arc>; +pub type SharedNetworks = Arc>; +pub type SharedTotals = Arc>>; +pub type SharedComponents = Arc>; +pub type SharedDisks = Arc>; + +#[derive(Clone)] +pub struct AppState { + // Persistent sysinfo handles + pub sys: SharedSystem, + pub nets: SharedNetworks, + pub net_totals: SharedTotals, // iface -> (rx_total, tx_total) + pub components: SharedComponents, + pub disks: SharedDisks, + + // Last serialized JSON snapshot for fast WS responses + pub last_json: Arc>, + + // Adaptive sampling controls + pub client_count: Arc, + pub wake_sampler: Arc, + pub auth_token: Option, +} \ No newline at end of file diff --git a/socktop_agent/src/types.rs b/socktop_agent/src/types.rs new file mode 100644 index 0000000..b5fc602 --- /dev/null +++ b/socktop_agent/src/types.rs @@ -0,0 +1,43 @@ +//! Data types sent to the client over WebSocket. +//! Keep this module minimal and stable — it defines the wire format. + +use serde::Serialize; + +#[derive(Debug, Serialize, Clone)] +pub struct ProcessInfo { + pub pid: u32, + pub name: String, + pub cpu_usage: f32, + pub mem_bytes: u64, +} + +#[derive(Debug, Serialize, Clone)] +pub struct DiskInfo { + pub name: String, + pub total: u64, + pub available: u64, +} + +#[derive(Debug, Serialize, Clone)] +pub struct NetworkInfo { + pub name: String, + // cumulative totals since the agent started (client should diff to get rates) + pub received: u64, + pub transmitted: u64, +} + +#[derive(Debug, Serialize, Clone)] +pub struct Metrics { + pub cpu_total: f32, + pub cpu_per_core: Vec, + pub mem_total: u64, + pub mem_used: u64, + pub swap_total: u64, + pub swap_used: u64, + pub process_count: usize, + pub hostname: String, + pub cpu_temp_c: Option, + pub disks: Vec, + pub networks: Vec, + pub top_processes: Vec, +} \ No newline at end of file diff --git a/socktop_agent/src/ws.rs b/socktop_agent/src/ws.rs new file mode 100644 index 0000000..ecbf7eb --- /dev/null +++ b/socktop_agent/src/ws.rs @@ -0,0 +1,66 @@ +//! WebSocket upgrade and per-connection handler. Serves cached JSON quickly. + +use axum::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + Query, State, + }, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use futures_util::stream::StreamExt; + +use crate::metrics::collect_metrics; +use crate::state::AppState; + +use std::collections::HashMap; +use std::sync::atomic::Ordering; + +pub async fn ws_handler( + ws: WebSocketUpgrade, + State(state): State, + Query(q): Query>, +) -> Response { + if let Some(expected) = state.auth_token.as_ref() { + match q.get("token") { + Some(t) if t == expected => {} + _ => return StatusCode::UNAUTHORIZED.into_response(), + } + } + ws.on_upgrade(move |socket| handle_socket(socket, state)) +} + +async fn handle_socket(mut socket: WebSocket, state: AppState) { + // Bump client count on connect and wake the sampler. + state.client_count.fetch_add(1, Ordering::Relaxed); + state.wake_sampler.notify_waiters(); + + // Ensure we decrement on disconnect (drop). + struct ClientGuard(AppState); + impl Drop for ClientGuard { + fn drop(&mut self) { + self.0.client_count.fetch_sub(1, Ordering::Relaxed); + self.0.wake_sampler.notify_waiters(); + } + } + let _guard = ClientGuard(state.clone()); + + while let Some(Ok(msg)) = socket.next().await { + match msg { + Message::Text(text) if text == "get_metrics" => { + // Serve the cached JSON quickly; if empty (cold start), collect once. + let cached = state.last_json.read().await.clone(); + if !cached.is_empty() { + let _ = socket.send(Message::Text(cached)).await; + } else { + let metrics = collect_metrics(&state).await; + if let Ok(js) = serde_json::to_string(&metrics) { + let _ = socket.send(Message::Text(js)).await; + } + } + } + Message::Close(_) => break, + _ => {} + } + } +} \ No newline at end of file