Major refactor, additional comments, performance improvements, idle performance improvements, access token, port specification

Release highlights

Introduced split client/agent architecture with a ratatui-based TUI and a lightweight WebSocket agent.
Added adaptive (idle-aware) sampler: agent samples fast only when clients are connected; sleeps when idle.
Implemented metrics JSON caching for instant ws replies; cold-start does one-off collection.
Port configuration: --port/-p, positional PORT, or SOCKTOP_PORT env (default 3000).
Optional token auth: SOCKTOP_TOKEN on agent, ws://HOST:PORT/ws?token=VALUE in client.
Logging via tracing with RUST_LOG control.
CI workflow (fmt, clippy, build) for Linux and Windows.
Systemd unit example for always-on agent.
TUI features

CPU: overall sparkline + per-core history with trend arrows and color thresholds.
Memory/Swap gauges with humanized labels.
Disks panel with per-device usage and icons.
Network download/upload sparklines (KB/s) with peak tracking.
Top processes table (PID, name, CPU%, mem, mem%).
Header with hostname and CPU temperature indicator.
Agent changes

sysinfo 0.36.1 targeted refresh: refresh_cpu_all, refresh_memory, refresh_processes_specifics(ProcessesToUpdate::All, ProcessRefreshKind::new().with_cpu().with_memory(), true).
WebSocket handler: client counting with wake notifications, cold-start handling, proper Response returns.
Sampler uses MissedTickBehavior::Skip to avoid catch-up bursts.
Docs

README updates: running instructions, port configuration, optional token auth, platform notes, example JSON.
Added socktop-agent.service systemd unit.
Platform notes

Linux (AMD/Intel) supported; tested on AMD, targeting Intel next.
Raspberry Pi supported (availability of temps varies by model).
Windows builds/run; CPU temperature may be unavailable (shows N/A).
Known/next

Roadmap includes configurable refresh interval, TUI filtering/sorting, TLS/WSS, and export to file.
Add Context...
README.md
This commit is contained in:
jasonwitty 2025-08-08 12:41:32 -07:00
parent 1c2415bc1b
commit 100434fc3c
26 changed files with 1371 additions and 713 deletions

19
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,19 @@
name: CI
on:
push:
pull_request:
jobs:
build:
strategy:
matrix:
os: [ubuntu-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- name: Cargo fmt
run: cargo fmt --all -- --check
- name: Clippy
run: cargo clippy --all-targets --all-features -D warnings
- name: Build
run: cargo build --release --workspace

152
Cargo.lock generated
View File

@ -17,6 +17,15 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "aho-corasick"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [
"memchr",
]
[[package]]
name = "allocator-api2"
version = "0.2.21"
@ -821,6 +830,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.174"
@ -864,6 +879,15 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
@ -924,6 +948,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-traits"
version = "0.2.19"
@ -967,6 +1001,12 @@ version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.4"
@ -1110,6 +1150,50 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
name = "regex-automata"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "rustc-demangle"
version = "0.1.26"
@ -1212,6 +1296,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
@ -1300,6 +1393,8 @@ dependencies = [
"serde_json",
"sysinfo",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
@ -1404,6 +1499,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
]
[[package]]
name = "tinystr"
version = "0.8.1"
@ -1493,9 +1597,21 @@ checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.34"
@ -1503,6 +1619,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@ -1581,6 +1727,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "version_check"
version = "0.9.5"

View File

@ -4,7 +4,7 @@
It lets you watch CPU, memory, disks, network, temperatures, and processes on another machine in real-time — from the comfort of your terminal.
![socktop screenshot](docs/socktop-screenshot.png)
![socktop screenshot](./docs/socktop-screenshot.png)
---
@ -37,6 +37,46 @@ The two communicate over a persistent WebSocket connection.
---
## Adaptive (idle-aware) sampling
The socktop agent now samples system metrics only when at least one WebSocket client is connected. When idle (no clients), the sampler sleeps and CPU usage drops to ~0%.
How it works
- The WebSocket handler increments/decrements a client counter in `AppState` on connect/disconnect.
- A background sampler wakes when the counter transitions from 0 → >0 and sleeps when it returns to 0.
- The most recent metrics snapshot is cached as JSON for fast responses.
Cold start behavior
- If a client requests metrics while the cache is empty (e.g., just started or after a long idle), the agent performs a one-off synchronous collection to respond immediately.
Tuning
- Sampling interval (active): update `spawn_sampler(state, Duration::from_millis(500))` in `socktop_agent/src/main.rs`.
- Always-on or low-frequency idle sampling: replace the “sleep when idle” logic in `socktop_agent/src/sampler.rs` with a low-frequency interval. Example sketch:
```rust
// In sampler.rs (sketch): sample every 10s when idle, 500ms when active
let idle_period = Duration::from_secs(10);
loop {
let active = state.client_count.load(Ordering::Relaxed) > 0;
let period = if active { Duration::from_millis(500) } else { idle_period };
let mut ticker = tokio::time::interval(period);
ticker.tick().await;
if !active {
// wake early if a client connects
tokio::select! {
_ = ticker.tick() => {},
_ = state.wake_sampler.notified() => continue,
}
}
let m = collect_metrics(&state).await;
if let Ok(js) = serde_json::to_string(&m) {
*state.last_json.write().await = js;
}
}
```
---
## Installation
### Prerequisites
@ -99,6 +139,26 @@ When connected, `socktop` displays:
---
## Configuring the agent port
The agent listens on TCP port 3000 by default. You can override this via a CLI flag, a positional port argument, or an environment variable:
- CLI flag:
- socktop_agent --port 8080
- socktop_agent -p 8080
- Positional:
- socktop_agent 8080
- Environment variable:
- SOCKTOP_PORT=8080 socktop_agent
Help:
- socktop_agent --help
The TUI should point to ws://HOST:PORT/ws, e.g.:
- cargo run -p socktop -- ws://127.0.0.1:8080/ws
---
## Keyboard Shortcuts
| Key | Action |
@ -107,6 +167,29 @@ When connected, `socktop` displays:
---
## Security (optional token)
By default, the agent exposes metrics over an unauthenticated WebSocket. For untrusted networks, set an auth token and pass it in the client URL:
- Server:
- SOCKTOP_TOKEN=changeme socktop_agent --port 3000
- Client:
- socktop ws://HOST:3000/ws?token=changeme
---
## Platform notes
- Linux x86_64/AMD/Intel: fully supported.
- Raspberry Pi:
- 64-bit: rustup target add aarch64-unknown-linux-gnu; build on-device for simplicity.
- 32-bit: rustup target add armv7-unknown-linux-gnueabihf.
- Windows:
- TUI and agent build/run with stable Rust. Use PowerShell:
- cargo run -p socktop_agent -- --port 3000
- cargo run -p socktop -- ws://127.0.0.1:3000/ws
- CPU temperature may be unavailable; display will show N/A.
---
## Example agent JSON
`socktop` expects the agent to send metrics in this shape:
```json

View File

@ -0,0 +1,18 @@
[Unit]
Description=Socktop agent
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=/usr/local/bin/socktop_agent --port 3000
Environment=RUST_LOG=info
# Optional auth:
# Environment=SOCKTOP_TOKEN=changeme
Restart=on-failure
User=socktop
Group=socktop
NoNewPrivileges=true
[Install]
WantedBy=multi-user.target

View File

Before

Width:  |  Height:  |  Size: 151 KiB

After

Width:  |  Height:  |  Size: 151 KiB

198
socktop/src/app.rs Normal file
View File

@ -0,0 +1,198 @@
//! App state and main loop: input handling, fetching metrics, updating history, and drawing.
use std::{collections::VecDeque, io, time::{Duration, Instant}};
use crossterm::{
event::{self, Event, KeyCode},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{
backend::CrosstermBackend,
layout::{Constraint, Direction},
Terminal,
};
use tokio::time::sleep;
use crate::history::{push_capped, PerCoreHistory};
use crate::types::Metrics;
use crate::ui::{header::draw_header, cpu::{draw_cpu_avg_graph, draw_per_core_bars}, mem::draw_mem, swap::draw_swap, disks::draw_disks, net::draw_net_spark, processes::draw_top_processes};
use crate::ws::{connect, request_metrics};
pub struct App {
// Latest metrics + histories
last_metrics: Option<Metrics>,
// CPU avg history (0..100)
cpu_hist: VecDeque<u64>,
// Per-core history (0..100)
per_core_hist: PerCoreHistory,
// Network totals snapshot + histories of KB/s
last_net_totals: Option<(u64, u64, Instant)>,
rx_hist: VecDeque<u64>,
tx_hist: VecDeque<u64>,
rx_peak: u64,
tx_peak: u64,
// Quit flag
should_quit: bool,
}
impl App {
pub fn new() -> Self {
Self {
last_metrics: None,
cpu_hist: VecDeque::with_capacity(600),
per_core_hist: PerCoreHistory::new(60),
last_net_totals: None,
rx_hist: VecDeque::with_capacity(600),
tx_hist: VecDeque::with_capacity(600),
rx_peak: 0,
tx_peak: 0,
should_quit: false,
}
}
pub async fn run(&mut self, url: &str) -> Result<(), Box<dyn std::error::Error>> {
// Connect to agent
let mut ws = connect(url).await?;
// Terminal setup
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
terminal.clear()?;
// Main loop
let res = self.event_loop(&mut terminal, &mut ws).await;
// Teardown
disable_raw_mode()?;
let backend = terminal.backend_mut();
execute!(backend, LeaveAlternateScreen)?;
terminal.show_cursor()?;
res
}
async fn event_loop<B: ratatui::backend::Backend>(
&mut self,
terminal: &mut Terminal<B>,
ws: &mut crate::ws::WsStream,
) -> Result<(), Box<dyn std::error::Error>> {
loop {
// Input (non-blocking)
while event::poll(Duration::from_millis(10))? {
if let Event::Key(k) = event::read()? {
if matches!(k.code, KeyCode::Char('q') | KeyCode::Char('Q') | KeyCode::Esc) {
self.should_quit = true;
}
}
}
if self.should_quit {
break;
}
// Fetch and update
if let Some(m) = request_metrics(ws).await {
self.update_with_metrics(m);
}
// Draw
terminal.draw(|f| self.draw(f))?;
// Tick rate
sleep(Duration::from_millis(500)).await;
}
Ok(())
}
fn update_with_metrics(&mut self, m: Metrics) {
// CPU avg history
let v = m.cpu_total.clamp(0.0, 100.0).round() as u64;
push_capped(&mut self.cpu_hist, v, 600);
// Per-core history (push current samples)
self.per_core_hist.ensure_cores(m.cpu_per_core.len());
self.per_core_hist.push_samples(&m.cpu_per_core);
// NET: sum across all ifaces, compute KB/s via elapsed time
let now = Instant::now();
let rx_total = m.networks.iter().map(|n| n.received).sum::<u64>();
let tx_total = m.networks.iter().map(|n| n.transmitted).sum::<u64>();
let (rx_kb, tx_kb) = if let Some((prx, ptx, pts)) = self.last_net_totals {
let dt = now.duration_since(pts).as_secs_f64().max(1e-6);
let rx = ((rx_total.saturating_sub(prx)) as f64 / dt / 1024.0).round() as u64;
let tx = ((tx_total.saturating_sub(ptx)) as f64 / dt / 1024.0).round() as u64;
(rx, tx)
} else { (0, 0) };
self.last_net_totals = Some((rx_total, tx_total, now));
push_capped(&mut self.rx_hist, rx_kb, 600);
push_capped(&mut self.tx_hist, tx_kb, 600);
self.rx_peak = self.rx_peak.max(rx_kb);
self.tx_peak = self.tx_peak.max(tx_kb);
self.last_metrics = Some(m);
}
fn draw(&mut self, f: &mut ratatui::Frame<'_>) {
let area = f.area();
let rows = ratatui::layout::Layout::default()
.direction(Direction::Vertical)
.constraints([
Constraint::Length(1),
Constraint::Ratio(1, 3),
Constraint::Length(3),
Constraint::Length(3),
Constraint::Min(10),
])
.split(area);
draw_header(f, rows[0], self.last_metrics.as_ref());
let top = ratatui::layout::Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(66), Constraint::Percentage(34)])
.split(rows[1]);
draw_cpu_avg_graph(f, top[0], &self.cpu_hist, self.last_metrics.as_ref());
draw_per_core_bars(f, top[1], self.last_metrics.as_ref(), &self.per_core_hist);
draw_mem(f, rows[2], self.last_metrics.as_ref());
draw_swap(f, rows[3], self.last_metrics.as_ref());
let bottom = ratatui::layout::Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(66), Constraint::Percentage(34)])
.split(rows[4]);
let left_stack = ratatui::layout::Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Min(6), Constraint::Length(4), Constraint::Length(4)])
.split(bottom[0]);
draw_disks(f, left_stack[0], self.last_metrics.as_ref());
draw_net_spark(
f,
left_stack[1],
&format!("Download (KB/s) — now: {} | peak: {}", self.rx_hist.back().copied().unwrap_or(0), self.rx_peak),
&self.rx_hist,
ratatui::style::Color::Green,
);
draw_net_spark(
f,
left_stack[2],
&format!("Upload (KB/s) — now: {} | peak: {}", self.tx_hist.back().copied().unwrap_or(0), self.tx_peak),
&self.tx_hist,
ratatui::style::Color::Blue,
);
draw_top_processes(f, bottom[1], self.last_metrics.as_ref());
}
}

39
socktop/src/history.rs Normal file
View File

@ -0,0 +1,39 @@
//! Small utilities to manage bounded history buffers for charts.
use std::collections::VecDeque;
pub fn push_capped<T>(dq: &mut VecDeque<T>, v: T, cap: usize) {
if dq.len() == cap {
dq.pop_front();
}
dq.push_back(v);
}
// Keeps a history deque per core with a fixed capacity
pub struct PerCoreHistory {
pub deques: Vec<VecDeque<u16>>,
cap: usize,
}
impl PerCoreHistory {
pub fn new(cap: usize) -> Self {
Self { deques: Vec::new(), cap }
}
// Ensure we have one deque per core; resize on CPU topology changes
pub fn ensure_cores(&mut self, n: usize) {
if self.deques.len() == n {
return;
}
self.deques = (0..n).map(|_| VecDeque::with_capacity(self.cap)).collect();
}
// Push a new sample set for all cores (values 0..=100)
pub fn push_samples(&mut self, samples: &[f32]) {
self.ensure_cores(samples.len());
for (i, v) in samples.iter().enumerate() {
let val = v.clamp(0.0, 100.0).round() as u16;
push_capped(&mut self.deques[i], val, self.cap);
}
}
}

View File

@ -1,537 +1,23 @@
use std::{collections::VecDeque, env, error::Error, io, time::{Duration, Instant}};
//! Entry point for the socktop TUI. Parses args and runs the App.
use crossterm::{
event::{self, Event, KeyCode},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use futures_util::{SinkExt, StreamExt};
use ratatui::{
backend::CrosstermBackend,
layout::{Constraint, Direction, Layout, Rect},
style::{Color, Style},
widgets::{Block, Borders, Gauge, Row, Sparkline, Table, Cell},
Terminal,
text::{Line, Span},
};
mod app;
mod history;
mod types;
mod ui;
mod ws;
use ratatui::style::{Modifier};
use serde::Deserialize;
use tokio::time::sleep;
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[derive(Debug, Deserialize, Clone)]
struct Disk { name: String, total: u64, available: u64 }
#[derive(Debug, Deserialize, Clone)]
struct Network { received: u64, transmitted: u64 }
#[derive(Debug, Deserialize, Clone)]
struct ProcessInfo {
pid: i32,
name: String,
cpu_usage: f32,
mem_bytes: u64,
}
#[derive(Debug, Deserialize, Clone)]
struct Metrics {
cpu_total: f32,
cpu_per_core: Vec<f32>,
mem_total: u64,
mem_used: u64,
swap_total: u64,
swap_used: u64,
process_count: usize,
hostname: String,
cpu_temp_c: Option<f32>,
disks: Vec<Disk>,
networks: Vec<Network>,
top_processes: Vec<ProcessInfo>,
}
use std::env;
use app::App;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} ws://HOST:PORT/ws", args[0]);
std::process::exit(1);
}
let url = &args[1];
let (mut ws, _) = connect_async(url).await?;
let url = args[1].clone();
// Terminal
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
terminal.clear()?;
// State
let mut last_metrics: Option<Metrics> = None;
let mut cpu_hist: VecDeque<u64> = VecDeque::with_capacity(600);
let mut per_core_hist: Vec<VecDeque<u16>> = Vec::new(); // one deque per core
const CORE_HISTORY: usize = 60; // ~30s if you tick every 500ms
// Network: keep totals across ALL ifaces + timestamp
let mut last_net_totals: Option<(u64, u64, Instant)> = None;
let mut rx_hist: VecDeque<u64> = VecDeque::with_capacity(600);
let mut tx_hist: VecDeque<u64> = VecDeque::with_capacity(600);
let mut rx_peak: u64 = 0;
let mut tx_peak: u64 = 0;
let mut should_quit = false;
loop {
while event::poll(Duration::from_millis(10))? {
if let Event::Key(k) = event::read()? {
if matches!(k.code, KeyCode::Char('q') | KeyCode::Char('Q') | KeyCode::Esc) {
should_quit = true;
}
}
}
if should_quit { break; }
ws.send(Message::Text("get_metrics".into())).await.ok();
if let Some(Ok(Message::Text(json))) = ws.next().await {
if let Ok(m) = serde_json::from_str::<Metrics>(&json) {
// CPU history
let v = m.cpu_total.clamp(0.0, 100.0).round() as u64;
push_capped(&mut cpu_hist, v, 600);
// NET: sum across all ifaces, compute KB/s via elapsed time
let now = Instant::now();
let rx_total = m.networks.iter().map(|n| n.received).sum::<u64>();
let tx_total = m.networks.iter().map(|n| n.transmitted).sum::<u64>();
let (rx_kb, tx_kb) = if let Some((prx, ptx, pts)) = last_net_totals {
let dt = now.duration_since(pts).as_secs_f64().max(1e-6);
let rx = ((rx_total.saturating_sub(prx)) as f64 / dt / 1024.0).round() as u64;
let tx = ((tx_total.saturating_sub(ptx)) as f64 / dt / 1024.0).round() as u64;
(rx, tx)
} else { (0, 0) };
last_net_totals = Some((rx_total, tx_total, now));
push_capped(&mut rx_hist, rx_kb, 600);
push_capped(&mut tx_hist, tx_kb, 600);
rx_peak = rx_peak.max(rx_kb);
tx_peak = tx_peak.max(tx_kb);
if let Some(m) = last_metrics.as_ref() {
// resize history buffers if core count changes
if per_core_hist.len() != m.cpu_per_core.len() {
per_core_hist = (0..m.cpu_per_core.len())
.map(|_| VecDeque::with_capacity(CORE_HISTORY))
.collect();
}
}
// push latest per-core samples
if let Some(m) = last_metrics.as_ref() {
for (i, v) in m.cpu_per_core.iter().enumerate() {
let v = v.clamp(0.0, 100.0).round() as u16;
push_capped(&mut per_core_hist[i], v, CORE_HISTORY);
}
}
last_metrics = Some(m);
}
}
terminal.draw(|f| {
let area = f.area();
let rows = Layout::default()
.direction(Direction::Vertical)
.constraints([
Constraint::Length(1),
Constraint::Ratio(1, 3),
Constraint::Length(3),
Constraint::Length(3),
Constraint::Min(10),
])
.split(area);
draw_header(f, rows[0], last_metrics.as_ref());
let top = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(66), Constraint::Percentage(34)])
.split(rows[1]);
draw_cpu_avg_graph(f, top[0], &cpu_hist, last_metrics.as_ref());
draw_per_core_bars(f, top[1], last_metrics.as_ref(), &per_core_hist);
draw_mem(f, rows[2], last_metrics.as_ref());
draw_swap(f, rows[3], last_metrics.as_ref());
let bottom = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(66), Constraint::Percentage(34)])
.split(rows[4]);
let left_stack = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Min(6), Constraint::Length(4), Constraint::Length(4)])
.split(bottom[0]);
draw_disks(f, left_stack[0], last_metrics.as_ref());
draw_net_spark(
f,
left_stack[1],
&format!("Download (KB/s) — now: {} | peak: {}", rx_hist.back().copied().unwrap_or(0), rx_peak),
&rx_hist,
Color::Green,
);
draw_net_spark(
f,
left_stack[2],
&format!("Upload (KB/s) — now: {} | peak: {}", tx_hist.back().copied().unwrap_or(0), tx_peak),
&tx_hist,
Color::Blue,
);
draw_top_processes(f, bottom[1], last_metrics.as_ref());
})?;
sleep(Duration::from_millis(500)).await;
}
disable_raw_mode()?;
let backend = terminal.backend_mut();
execute!(backend, LeaveAlternateScreen)?;
terminal.show_cursor()?;
Ok(())
let mut app = App::new();
app.run(&url).await
}
fn push_capped<T>(dq: &mut VecDeque<T>, v: T, cap: usize) {
if dq.len() == cap { dq.pop_front(); }
dq.push_back(v);
}
fn human(b: u64) -> String {
const K: f64 = 1024.0;
let b = b as f64;
if b < K { return format!("{b:.0}B"); }
let kb = b / K;
if kb < K { return format!("{kb:.1}KB"); }
let mb = kb / K;
if mb < K { return format!("{mb:.1}MB"); }
let gb = mb / K;
if gb < K { return format!("{gb:.1}GB"); }
let tb = gb / K;
format!("{tb:.2}TB")
}
fn draw_header(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) {
let title = if let Some(mm) = m {
let temp = mm.cpu_temp_c.map(|t| {
let icon = if t < 50.0 { "😎" } else if t < 85.0 { "⚠️" } else { "🔥" };
format!("CPU Temp: {:.1}°C {}", t, icon)
}).unwrap_or_else(|| "CPU Temp: N/A".into());
format!("socktop — host: {} | {} (press 'q' to quit)", mm.hostname, temp)
} else {
"socktop — connecting... (press 'q' to quit)".into()
};
f.render_widget(Block::default().title(title).borders(Borders::BOTTOM), area);
}
fn draw_cpu_avg_graph(
f: &mut ratatui::Frame<'_>,
area: Rect,
hist: &VecDeque<u64>,
m: Option<&Metrics>,
) {
let title = if let Some(mm) = m { format!("CPU avg (now: {:>5.1}%)", mm.cpu_total) } else { "CPU avg".into() };
let max_points = area.width.saturating_sub(2) as usize;
let start = hist.len().saturating_sub(max_points);
let data: Vec<u64> = hist.iter().skip(start).cloned().collect();
let spark = Sparkline::default()
.block(Block::default().borders(Borders::ALL).title(title))
.data(&data)
.max(100)
.style(Style::default().fg(Color::Cyan));
f.render_widget(spark, area);
}
fn draw_per_core_bars(
f: &mut ratatui::Frame<'_>,
area: Rect,
m: Option<&Metrics>,
// 👇 add this param
per_core_hist: &Vec<VecDeque<u16>>,
) {
// frame
f.render_widget(Block::default().borders(Borders::ALL).title("Per-core"), area);
let Some(mm) = m else { return; };
let inner = Rect { x: area.x + 1, y: area.y + 1, width: area.width.saturating_sub(2), height: area.height.saturating_sub(2) };
if inner.height == 0 { return; }
// one row per core
let rows = inner.height as usize;
let show_n = rows.min(mm.cpu_per_core.len());
let constraints: Vec<Constraint> = (0..show_n).map(|_| Constraint::Length(1)).collect();
let vchunks = Layout::default().direction(Direction::Vertical).constraints(constraints).split(inner);
for i in 0..show_n {
let rect = vchunks[i];
// split each row: sparkline (history) | stat text
let hchunks = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Min(6), Constraint::Length(12)]) // was 10 → now 12
.split(rect);
let curr = mm.cpu_per_core[i].clamp(0.0, 100.0);
let older = per_core_hist.get(i)
.and_then(|d| d.iter().rev().nth(20).copied()) // ~10s back
.map(|v| v as f32)
.unwrap_or(curr);
let trend = if curr > older + 0.2 { "" }
else if curr + 0.2 < older { "" }
else { "" };
// colors by current load
let fg = match curr {
x if x < 25.0 => Color::Green,
x if x < 60.0 => Color::Yellow,
_ => Color::Red,
};
// history
let hist: Vec<u64> = per_core_hist
.get(i)
.map(|d| {
let max_points = hchunks[0].width as usize;
let start = d.len().saturating_sub(max_points);
d.iter().skip(start).map(|&v| v as u64).collect()
})
.unwrap_or_default();
// sparkline
let spark = Sparkline::default()
.data(&hist)
.max(100)
.style(Style::default().fg(fg));
f.render_widget(spark, hchunks[0]); // ✅ render_widget on rect
// right stat “cpuN 37.2% ↑”
let label = format!("cpu{:<2}{}{:>5.1}%", i, trend, curr);
let line = Line::from(Span::styled(label, Style::default().fg(fg).add_modifier(Modifier::BOLD)));
let block = Block::default(); // no borders per row to keep it clean
f.render_widget(block, hchunks[1]);
f.render_widget(ratatui::widgets::Paragraph::new(line).right_aligned(), hchunks[1]);
}
}
fn draw_mem(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) {
let (used, total, pct) = if let Some(mm) = m {
let pct = if mm.mem_total > 0 { (mm.mem_used as f64 / mm.mem_total as f64 * 100.0) as u16 } else { 0 };
(mm.mem_used, mm.mem_total, pct)
} else { (0, 0, 0) };
let g = Gauge::default()
.block(Block::default().borders(Borders::ALL).title("Memory"))
.gauge_style(Style::default().fg(Color::Magenta))
.percent(pct)
.label(format!("{} / {}", human(used), human(total)));
f.render_widget(g, area);
}
fn draw_swap(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) {
let (used, total, pct) = if let Some(mm) = m {
let pct = if mm.swap_total > 0 { (mm.swap_used as f64 / mm.swap_total as f64 * 100.0) as u16 } else { 0 };
(mm.swap_used, mm.swap_total, pct)
} else { (0, 0, 0) };
let g = Gauge::default()
.block(Block::default().borders(Borders::ALL).title("Swap"))
.gauge_style(Style::default().fg(Color::Yellow))
.percent(pct)
.label(format!("{} / {}", human(used), human(total)));
f.render_widget(g, area);
}
fn draw_disks(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) {
// Panel frame
f.render_widget(Block::default().borders(Borders::ALL).title("Disks"), area);
let Some(mm) = m else { return; };
// Inner area inside the "Disks" panel
let inner = Rect {
x: area.x + 1,
y: area.y + 1,
width: area.width.saturating_sub(2),
height: area.height.saturating_sub(2),
};
if inner.height < 3 { return; }
// Each disk gets a 3-row card: [title line] + [gauge line] + [spacer]
// If we run out of height, we show as many as we can.
let per_disk_h = 3u16;
let max_cards = (inner.height / per_disk_h).min(mm.disks.len() as u16) as usize;
// Build rows layout (Length(3) per disk)
let constraints: Vec<Constraint> = (0..max_cards).map(|_| Constraint::Length(per_disk_h)).collect();
let rows = Layout::default()
.direction(Direction::Vertical)
.constraints(constraints)
.split(inner);
for (i, slot) in rows.iter().enumerate() {
let d = &mm.disks[i];
let used = d.total.saturating_sub(d.available);
let ratio = if d.total > 0 { used as f64 / d.total as f64 } else { 0.0 };
let pct = (ratio * 100.0).round() as u16;
// Color by severity
let color = if pct < 70 { Color::Green } else if pct < 90 { Color::Yellow } else { Color::Red };
// 1) Title line (name left, usage right), inside its own little block
let title = format!(
"{} {} {} / {} ({}%)",
disk_icon(&d.name),
truncate_middle(&d.name, (slot.width.saturating_sub(6)) as usize / 2),
human(used),
human(d.total),
pct
);
// Card frame (thin border per disk)
let card = Block::default().borders(Borders::ALL).title(title);
// Render card covering the whole 3-row slot
f.render_widget(card, *slot);
// 2) Gauge on the second line inside the card
// Compute an inner rect (strip card borders), then pick the middle line for the bar
let inner_card = Rect {
x: slot.x + 1,
y: slot.y + 1,
width: slot.width.saturating_sub(2),
height: slot.height.saturating_sub(2),
};
if inner_card.height == 0 { continue; }
// Center line for the gauge
let gauge_rect = Rect {
x: inner_card.x,
y: inner_card.y + inner_card.height / 2, // 1 line down inside the card
width: inner_card.width,
height: 1,
};
let g = Gauge::default()
.percent(pct)
.gauge_style(Style::default().fg(color));
f.render_widget(g, gauge_rect);
}
}
fn disk_icon(name: &str) -> &'static str {
let n = name.to_ascii_lowercase();
if n.contains(":") { "🗄️" } // network mount
else if n.contains("nvme") { "" } // nvme
else if n.starts_with("sd") { "💽" } // sata
else if n.contains("overlay") { "📦" } // containers/overlayfs
else { "🖴" } // generic drive
}
// Optional helper to keep device names tidy in the title
fn truncate_middle(s: &str, max: usize) -> String {
if s.len() <= max { return s.to_string(); }
if max <= 3 { return "...".into(); }
let keep = max - 3;
let left = keep / 2;
let right = keep - left;
format!("{}...{}", &s[..left], &s[s.len()-right..])
}
fn draw_net_spark(
f: &mut ratatui::Frame<'_>,
area: Rect,
title: &str,
hist: &VecDeque<u64>,
color: Color,
) {
let max_points = area.width.saturating_sub(2) as usize;
let start = hist.len().saturating_sub(max_points);
let data: Vec<u64> = hist.iter().skip(start).cloned().collect();
let spark = Sparkline::default()
.block(Block::default().borders(Borders::ALL).title(title.to_string()))
.data(&data)
.style(Style::default().fg(color));
f.render_widget(spark, area);
}
fn draw_top_processes(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) {
let Some(mm) = m else {
f.render_widget(Block::default().borders(Borders::ALL).title("Top Processes"), area);
return;
};
let total_mem_bytes = mm.mem_total.max(1); // avoid div-by-zero
let title = format!("Top Processes ({} total)", mm.process_count);
// Precompute peak CPU to highlight the hog
let peak_cpu = mm.top_processes.iter().map(|p| p.cpu_usage).fold(0.0_f32, f32::max);
// Build rows with per-cell coloring + zebra striping
let rows: Vec<Row> = mm.top_processes.iter().enumerate().map(|(i, p)| {
let mem_pct = (p.mem_bytes as f64 / total_mem_bytes as f64) * 100.0;
// Color helpers
let cpu_fg = match p.cpu_usage {
x if x < 25.0 => Color::Green,
x if x < 60.0 => Color::Yellow,
_ => Color::Red,
};
let mem_fg = match mem_pct {
x if x < 5.0 => Color::Blue,
x if x < 20.0 => Color::Magenta,
_ => Color::Red,
};
// Light zebra striping (only foreground shift to avoid loud backgrounds)
let zebra = if i % 2 == 0 { Style::default().fg(Color::Gray) } else { Style::default() };
// Emphasize the single top CPU row
let emphasis = if (p.cpu_usage - peak_cpu).abs() < f32::EPSILON {
Style::default().add_modifier(Modifier::BOLD)
} else { Style::default() };
Row::new(vec![
Cell::from(p.pid.to_string()).style(Style::default().fg(Color::DarkGray)),
Cell::from(p.name.clone()),
Cell::from(format!("{:.1}%", p.cpu_usage)).style(Style::default().fg(cpu_fg)),
Cell::from(human(p.mem_bytes)),
Cell::from(format!("{:.2}%", mem_pct)).style(Style::default().fg(mem_fg)),
])
.style(zebra.patch(emphasis))
}).collect();
let header = Row::new(vec!["PID", "Name", "CPU %", "Mem", "Mem %"])
.style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD));
let table = Table::new(
rows,
vec![
Constraint::Length(8), // PID
Constraint::Percentage(40), // Name
Constraint::Length(8), // CPU %
Constraint::Length(12), // Mem
Constraint::Length(8), // Mem %
],
)
.header(header)
.column_spacing(1)
.block(Block::default().borders(Borders::ALL).title(title));
f.render_widget(table, area);
}

41
socktop/src/types.rs Normal file
View File

@ -0,0 +1,41 @@
//! Types that mirror the agent's JSON schema.
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
pub struct Disk {
pub name: String,
pub total: u64,
pub available: u64,
}
#[derive(Debug, Deserialize, Clone)]
pub struct Network {
// cumulative totals; client diffs to compute rates
pub received: u64,
pub transmitted: u64,
}
#[derive(Debug, Deserialize, Clone)]
pub struct ProcessInfo {
pub pid: u32,
pub name: String,
pub cpu_usage: f32,
pub mem_bytes: u64,
}
#[derive(Debug, Deserialize, Clone)]
pub struct Metrics {
pub cpu_total: f32,
pub cpu_per_core: Vec<f32>,
pub mem_total: u64,
pub mem_used: u64,
pub swap_total: u64,
pub swap_used: u64,
pub process_count: usize,
pub hostname: String,
pub cpu_temp_c: Option<f32>,
pub disks: Vec<Disk>,
pub networks: Vec<Network>,
pub top_processes: Vec<ProcessInfo>,
}

91
socktop/src/ui/cpu.rs Normal file
View File

@ -0,0 +1,91 @@
//! CPU average sparkline + per-core mini bars.
use ratatui::{
layout::{Constraint, Direction, Layout, Rect},
style::{Color, Style},
text::{Line, Span},
widgets::{Block, Borders, Paragraph, Sparkline},
};
use ratatui::style::Modifier;
use crate::history::PerCoreHistory;
use crate::types::Metrics;
pub fn draw_cpu_avg_graph(
f: &mut ratatui::Frame<'_>,
area: Rect,
hist: &std::collections::VecDeque<u64>,
m: Option<&Metrics>,
) {
let title = if let Some(mm) = m { format!("CPU avg (now: {:>5.1}%)", mm.cpu_total) } else { "CPU avg".into() };
let max_points = area.width.saturating_sub(2) as usize;
let start = hist.len().saturating_sub(max_points);
let data: Vec<u64> = hist.iter().skip(start).cloned().collect();
let spark = Sparkline::default()
.block(Block::default().borders(Borders::ALL).title(title))
.data(&data)
.max(100)
.style(Style::default().fg(Color::Cyan));
f.render_widget(spark, area);
}
pub fn draw_per_core_bars(
f: &mut ratatui::Frame<'_>,
area: Rect,
m: Option<&Metrics>,
per_core_hist: &PerCoreHistory,
) {
f.render_widget(Block::default().borders(Borders::ALL).title("Per-core"), area);
let Some(mm) = m else { return; };
let inner = Rect { x: area.x + 1, y: area.y + 1, width: area.width.saturating_sub(2), height: area.height.saturating_sub(2) };
if inner.height == 0 { return; }
let rows = inner.height as usize;
let show_n = rows.min(mm.cpu_per_core.len());
let constraints: Vec<Constraint> = (0..show_n).map(|_| Constraint::Length(1)).collect();
let vchunks = Layout::default().direction(Direction::Vertical).constraints(constraints).split(inner);
for i in 0..show_n {
let rect = vchunks[i];
let hchunks = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Min(6), Constraint::Length(12)])
.split(rect);
let curr = mm.cpu_per_core[i].clamp(0.0, 100.0);
let older = per_core_hist.deques.get(i)
.and_then(|d| d.iter().rev().nth(20).copied())
.map(|v| v as f32)
.unwrap_or(curr);
let trend = if curr > older + 0.2 { "" }
else if curr + 0.2 < older { "" }
else { "" };
let fg = match curr {
x if x < 25.0 => Color::Green,
x if x < 60.0 => Color::Yellow,
_ => Color::Red,
};
let hist: Vec<u64> = per_core_hist
.deques
.get(i)
.map(|d| {
let max_points = hchunks[0].width as usize;
let start = d.len().saturating_sub(max_points);
d.iter().skip(start).map(|&v| v as u64).collect()
})
.unwrap_or_default();
let spark = Sparkline::default()
.data(&hist)
.max(100)
.style(Style::default().fg(fg));
f.render_widget(spark, hchunks[0]);
let label = format!("cpu{:<2}{}{:>5.1}%", i, trend, curr);
let line = Line::from(Span::styled(label, Style::default().fg(fg).add_modifier(Modifier::BOLD)));
f.render_widget(Paragraph::new(line).right_aligned(), hchunks[1]);
}
}

73
socktop/src/ui/disks.rs Normal file
View File

@ -0,0 +1,73 @@
//! Disk cards with per-device gauge and title line.
use ratatui::{
layout::{Constraint, Direction, Layout, Rect},
style::Style,
widgets::{Block, Borders, Gauge},
};
use crate::types::Metrics;
use crate::ui::util::{human, truncate_middle, disk_icon};
pub fn draw_disks(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) {
f.render_widget(Block::default().borders(Borders::ALL).title("Disks"), area);
let Some(mm) = m else { return; };
let inner = Rect {
x: area.x + 1,
y: area.y + 1,
width: area.width.saturating_sub(2),
height: area.height.saturating_sub(2),
};
if inner.height < 3 { return; }
let per_disk_h = 3u16;
let max_cards = (inner.height / per_disk_h).min(mm.disks.len() as u16) as usize;
let constraints: Vec<Constraint> = (0..max_cards).map(|_| Constraint::Length(per_disk_h)).collect();
let rows = Layout::default()
.direction(Direction::Vertical)
.constraints(constraints)
.split(inner);
for (i, slot) in rows.iter().enumerate() {
let d = &mm.disks[i];
let used = d.total.saturating_sub(d.available);
let ratio = if d.total > 0 { used as f64 / d.total as f64 } else { 0.0 };
let pct = (ratio * 100.0).round() as u16;
let color = if pct < 70 { ratatui::style::Color::Green } else if pct < 90 { ratatui::style::Color::Yellow } else { ratatui::style::Color::Red };
let title = format!(
"{} {} {} / {} ({}%)",
disk_icon(&d.name),
truncate_middle(&d.name, (slot.width.saturating_sub(6)) as usize / 2),
human(used),
human(d.total),
pct
);
let card = Block::default().borders(Borders::ALL).title(title);
f.render_widget(card, *slot);
let inner_card = Rect {
x: slot.x + 1,
y: slot.y + 1,
width: slot.width.saturating_sub(2),
height: slot.height.saturating_sub(2),
};
if inner_card.height == 0 { continue; }
let gauge_rect = Rect {
x: inner_card.x,
y: inner_card.y + inner_card.height / 2,
width: inner_card.width,
height: 1,
};
let g = Gauge::default()
.percent(pct)
.gauge_style(Style::default().fg(color));
f.render_widget(g, gauge_rect);
}
}

20
socktop/src/ui/header.rs Normal file
View File

@ -0,0 +1,20 @@
//! Top header with hostname and CPU temperature indicator.
use ratatui::{
layout::Rect,
widgets::{Block, Borders},
};
use crate::types::Metrics;
pub fn draw_header(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) {
let title = if let Some(mm) = m {
let temp = mm.cpu_temp_c.map(|t| {
let icon = if t < 50.0 { "😎" } else if t < 85.0 { "⚠️" } else { "🔥" };
format!("CPU Temp: {:.1}°C {}", t, icon)
}).unwrap_or_else(|| "CPU Temp: N/A".into());
format!("socktop — host: {} | {} (press 'q' to quit)", mm.hostname, temp)
} else {
"socktop — connecting... (press 'q' to quit)".into()
};
f.render_widget(Block::default().title(title).borders(Borders::BOTTOM), area);
}

23
socktop/src/ui/mem.rs Normal file
View File

@ -0,0 +1,23 @@
//! Memory gauge.
use ratatui::{
layout::Rect,
style::{Color, Style},
widgets::{Block, Borders, Gauge},
};
use crate::types::Metrics;
use crate::ui::util::human;
pub fn draw_mem(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) {
let (used, total, pct) = if let Some(mm) = m {
let pct = if mm.mem_total > 0 { (mm.mem_used as f64 / mm.mem_total as f64 * 100.0) as u16 } else { 0 };
(mm.mem_used, mm.mem_total, pct)
} else { (0, 0, 0) };
let g = Gauge::default()
.block(Block::default().borders(Borders::ALL).title("Memory"))
.gauge_style(Style::default().fg(Color::Magenta))
.percent(pct)
.label(format!("{} / {}", human(used), human(total)));
f.render_widget(g, area);
}

10
socktop/src/ui/mod.rs Normal file
View File

@ -0,0 +1,10 @@
//! UI module root: exposes drawing functions for individual panels.
pub mod header;
pub mod cpu;
pub mod mem;
pub mod swap;
pub mod disks;
pub mod net;
pub mod processes;
pub mod util;

26
socktop/src/ui/net.rs Normal file
View File

@ -0,0 +1,26 @@
//! Network sparklines (download/upload).
use std::collections::VecDeque;
use ratatui::{
layout::Rect,
style::{Color, Style},
widgets::{Block, Borders, Sparkline},
};
pub fn draw_net_spark(
f: &mut ratatui::Frame<'_>,
area: Rect,
title: &str,
hist: &VecDeque<u64>,
color: Color,
) {
let max_points = area.width.saturating_sub(2) as usize;
let start = hist.len().saturating_sub(max_points);
let data: Vec<u64> = hist.iter().skip(start).cloned().collect();
let spark = Sparkline::default()
.block(Block::default().borders(Borders::ALL).title(title.to_string()))
.data(&data)
.style(Style::default().fg(color));
f.render_widget(spark, area);
}

View File

@ -0,0 +1,71 @@
//! Top processes table with per-cell coloring and zebra striping.
use ratatui::{
layout::{Constraint, Rect},
style::{Color, Style},
widgets::{Block, Borders, Cell, Row, Table},
};
use ratatui::style::Modifier;
use crate::types::Metrics;
use crate::ui::util::human;
pub fn draw_top_processes(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) {
let Some(mm) = m else {
f.render_widget(Block::default().borders(Borders::ALL).title("Top Processes"), area);
return;
};
let total_mem_bytes = mm.mem_total.max(1);
let title = format!("Top Processes ({} total)", mm.process_count);
let peak_cpu = mm.top_processes.iter().map(|p| p.cpu_usage).fold(0.0_f32, f32::max);
let rows: Vec<Row> = mm.top_processes.iter().enumerate().map(|(i, p)| {
let mem_pct = (p.mem_bytes as f64 / total_mem_bytes as f64) * 100.0;
let cpu_fg = match p.cpu_usage {
x if x < 25.0 => Color::Green,
x if x < 60.0 => Color::Yellow,
_ => Color::Red,
};
let mem_fg = match mem_pct {
x if x < 5.0 => Color::Blue,
x if x < 20.0 => Color::Magenta,
_ => Color::Red,
};
let zebra = if i % 2 == 0 { Style::default().fg(Color::Gray) } else { Style::default() };
let emphasis = if (p.cpu_usage - peak_cpu).abs() < f32::EPSILON {
Style::default().add_modifier(Modifier::BOLD)
} else { Style::default() };
Row::new(vec![
Cell::from(p.pid.to_string()).style(Style::default().fg(Color::DarkGray)),
Cell::from(p.name.clone()),
Cell::from(format!("{:.1}%", p.cpu_usage)).style(Style::default().fg(cpu_fg)),
Cell::from(human(p.mem_bytes)),
Cell::from(format!("{:.2}%", mem_pct)).style(Style::default().fg(mem_fg)),
])
.style(zebra.patch(emphasis))
}).collect();
let header = Row::new(vec!["PID", "Name", "CPU %", "Mem", "Mem %"])
.style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD));
let table = Table::new(
rows,
vec![
Constraint::Length(8),
Constraint::Percentage(40),
Constraint::Length(8),
Constraint::Length(12),
Constraint::Length(8),
],
)
.header(header)
.column_spacing(1)
.block(Block::default().borders(Borders::ALL).title(title));
f.render_widget(table, area);
}

23
socktop/src/ui/swap.rs Normal file
View File

@ -0,0 +1,23 @@
//! Swap gauge.
use ratatui::{
layout::Rect,
style::{Color, Style},
widgets::{Block, Borders, Gauge},
};
use crate::types::Metrics;
use crate::ui::util::human;
pub fn draw_swap(f: &mut ratatui::Frame<'_>, area: Rect, m: Option<&Metrics>) {
let (used, total, pct) = if let Some(mm) = m {
let pct = if mm.swap_total > 0 { (mm.swap_used as f64 / mm.swap_total as f64 * 100.0) as u16 } else { 0 };
(mm.swap_used, mm.swap_total, pct)
} else { (0, 0, 0) };
let g = Gauge::default()
.block(Block::default().borders(Borders::ALL).title("Swap"))
.gauge_style(Style::default().fg(Color::Yellow))
.percent(pct)
.label(format!("{} / {}", human(used), human(total)));
f.render_widget(g, area);
}

33
socktop/src/ui/util.rs Normal file
View File

@ -0,0 +1,33 @@
//! Small UI helpers: human-readable sizes, truncation, icons.
pub fn human(b: u64) -> String {
const K: f64 = 1024.0;
let b = b as f64;
if b < K { return format!("{b:.0}B"); }
let kb = b / K;
if kb < K { return format!("{kb:.1}KB"); }
let mb = kb / K;
if mb < K { return format!("{mb:.1}MB"); }
let gb = mb / K;
if gb < K { return format!("{gb:.1}GB"); }
let tb = gb / K;
format!("{tb:.2}TB")
}
pub fn truncate_middle(s: &str, max: usize) -> String {
if s.len() <= max { return s.to_string(); }
if max <= 3 { return "...".into(); }
let keep = max - 3;
let left = keep / 2;
let right = keep - left;
format!("{}...{}", &s[..left], &s[s.len()-right..])
}
pub fn disk_icon(name: &str) -> &'static str {
let n = name.to_ascii_lowercase();
if n.contains(':') { "🗄️" }
else if n.contains("nvme") { "" }
else if n.starts_with("sd") { "💽" }
else if n.contains("overlay") { "📦" }
else { "🖴" }
}

28
socktop/src/ws.rs Normal file
View File

@ -0,0 +1,28 @@
//! Minimal WebSocket client helpers for requesting metrics from the agent.
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use crate::types::Metrics;
pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
// Connect to the agent and return the WS stream
pub async fn connect(url: &str) -> Result<WsStream, Box<dyn std::error::Error>> {
let (ws, _) = connect_async(url).await?;
Ok(ws)
}
// Send a "get_metrics" request and await a single JSON reply
pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
if ws.send(Message::Text("get_metrics".into())).await.is_err() {
return None;
}
match ws.next().await {
Some(Ok(Message::Text(json))) => serde_json::from_str::<Metrics>(&json).ok(),
_ => None,
}
}
// Re-export SinkExt/StreamExt for call sites
use futures_util::{SinkExt, StreamExt};

View File

@ -13,3 +13,5 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"
futures-util = "0.3.31"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -1,217 +1,136 @@
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
response::IntoResponse,
routing::get,
Router,
};
use futures_util::stream::StreamExt;
use serde::Serialize;
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
//! socktop agent entrypoint: sets up sysinfo handles, launches a sampler,
//! and serves a WebSocket endpoint at /ws.
mod metrics;
mod sampler;
mod state;
mod ws;
mod types;
use axum::{routing::get, Router};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration, sync::atomic::AtomicUsize};
use sysinfo::{
Components, CpuRefreshKind, Disks, MemoryRefreshKind, Networks, ProcessRefreshKind, RefreshKind,
System,
};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock, Notify};
use tracing_subscriber::EnvFilter;
// ---------- Data types sent to the client ----------
#[derive(Debug, Serialize, Clone)]
struct ProcessInfo {
pid: u32,
name: String,
cpu_usage: f32,
mem_bytes: u64,
}
#[derive(Debug, Serialize, Clone)]
struct DiskInfo {
name: String,
total: u64,
available: u64,
}
#[derive(Debug, Serialize, Clone)]
struct NetworkInfo {
name: String,
// cumulative totals since the agent started (client should diff to get rates)
received: u64,
transmitted: u64,
}
#[derive(Debug, Serialize, Clone)]
struct Metrics {
cpu_total: f32,
cpu_per_core: Vec<f32>,
mem_total: u64,
mem_used: u64,
swap_total: u64,
swap_used: u64,
process_count: usize,
hostname: String,
cpu_temp_c: Option<f32>,
disks: Vec<DiskInfo>,
networks: Vec<NetworkInfo>,
top_processes: Vec<ProcessInfo>,
}
// ---------- Shared state ----------
type SharedSystem = Arc<Mutex<System>>;
type SharedNetworks = Arc<Mutex<Networks>>;
type SharedTotals = Arc<Mutex<HashMap<String, (u64, u64)>>>; // iface -> (rx_total, tx_total)
#[derive(Clone)]
struct AppState {
sys: SharedSystem,
nets: SharedNetworks,
net_totals: SharedTotals,
}
use state::{AppState, SharedTotals};
use sampler::spawn_sampler;
use ws::ws_handler;
#[tokio::main]
async fn main() {
// sysinfo 0.36: build specifics
// Init logging; configure with RUST_LOG (e.g., RUST_LOG=info).
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_target(false)
.compact()
.init();
// sysinfo build specifics (scopes what refresh_all() will touch internally)
let refresh_kind = RefreshKind::nothing()
.with_cpu(CpuRefreshKind::everything())
.with_memory(MemoryRefreshKind::everything())
.with_processes(ProcessRefreshKind::everything());
// Initialize sysinfo handles once and keep them alive
let mut sys = System::new_with_specifics(refresh_kind);
sys.refresh_all();
// Keep Networks alive across requests so received()/transmitted() deltas work
let mut nets = Networks::new();
nets.refresh(true);
let shared = Arc::new(Mutex::new(sys));
let shared_nets = Arc::new(Mutex::new(nets));
let net_totals: SharedTotals = Arc::new(Mutex::new(HashMap::new()));
let mut components = Components::new();
components.refresh(true);
let app = Router::new()
.route("/ws", get(ws_handler))
.with_state(AppState {
sys: shared,
nets: shared_nets,
net_totals,
});
let mut disks = Disks::new();
disks.refresh(true);
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
// Shared state across requests
let state = AppState {
sys: Arc::new(Mutex::new(sys)),
nets: Arc::new(Mutex::new(nets)),
net_totals: Arc::new(Mutex::new(HashMap::<String, (u64, u64)>::new())) as SharedTotals,
components: Arc::new(Mutex::new(components)),
disks: Arc::new(Mutex::new(disks)),
last_json: Arc::new(RwLock::new(String::new())),
// new: adaptive sampling controls
client_count: Arc::new(AtomicUsize::new(0)),
wake_sampler: Arc::new(Notify::new()),
auth_token: std::env::var("SOCKTOP_TOKEN").ok().filter(|s| !s.is_empty()),
};
// Start background sampler (adjust cadence as needed)
let _sampler = spawn_sampler(state.clone(), Duration::from_millis(500));
// Web app
let port = resolve_port();
let app = Router::new().route("/ws", get(ws_handler)).with_state(state);
let addr = SocketAddr::from(([0, 0, 0, 0], port));
//output to console
println!("Remote agent running at http://{}", addr);
println!("WebSocket endpoint: ws://{}/ws", addr);
//trace logging
tracing::info!("Remote agent running at http://{} (ws at /ws)", addr);
tracing::info!("WebSocket endpoint: ws://{}/ws", addr);
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn ws_handler(ws: WebSocketUpgrade, State(state): State<AppState>) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, state))
}
// Resolve the listening port from CLI args/env with a 3000 default.
// Supports: --port <PORT>, -p <PORT>, a bare numeric positional arg, or SOCKTOP_PORT.
fn resolve_port() -> u16 {
const DEFAULT: u16 = 3000;
async fn handle_socket(mut socket: WebSocket, state: AppState) {
while let Some(Ok(msg)) = socket.next().await {
if let Message::Text(text) = msg {
if text == "get_metrics" {
let metrics = collect_metrics(&state).await;
let json = serde_json::to_string(&metrics).unwrap();
let _ = socket.send(Message::Text(json)).await;
// Env takes precedence over positional, but is overridden by explicit flags if present.
if let Ok(s) = std::env::var("SOCKTOP_PORT") {
if let Ok(p) = s.parse::<u16>() {
if p != 0 {
return p;
}
}
eprintln!("Warning: invalid SOCKTOP_PORT='{}'; using default {}", s, DEFAULT);
}
let mut args = std::env::args().skip(1);
while let Some(arg) = args.next() {
match arg.as_str() {
"--port" | "-p" => {
if let Some(v) = args.next() {
match v.parse::<u16>() {
Ok(p) if p != 0 => return p,
_ => {
eprintln!("Invalid port '{}'; using default {}", v, DEFAULT);
return DEFAULT;
}
}
} else {
eprintln!("Missing value for {} ; using default {}", arg, DEFAULT);
return DEFAULT;
}
}
"--help" | "-h" => {
println!("Usage: socktop_agent [--port <PORT>] [PORT]\n SOCKTOP_PORT=<PORT> socktop_agent");
std::process::exit(0);
}
s => {
if let Ok(p) = s.parse::<u16>() {
if p != 0 {
return p;
}
}
}
}
}
DEFAULT
}
// ---------- Metrics collection ----------
async fn collect_metrics(state: &AppState) -> Metrics {
// System (CPU/mem/proc)
let mut sys = state.sys.lock().await;
sys.refresh_all();
let hostname = System::host_name().unwrap_or_else(|| "unknown".into());
// Temps via Components (separate handle in 0.36)
let mut components = Components::new();
components.refresh(true);
let cpu_temp_c = best_cpu_temp(&components);
// Disks (separate handle in 0.36)
let mut disks_struct = Disks::new();
disks_struct.refresh(true);
// Filter anything with available == 0 (e.g., overlay)
let disks: Vec<DiskInfo> = disks_struct
.list()
.iter()
.filter(|d| d.available_space() > 0)
.map(|d| DiskInfo {
name: d.name().to_string_lossy().to_string(),
total: d.total_space(),
available: d.available_space(),
})
.collect();
// Networks: use a persistent Networks + rolling totals
let mut nets = state.nets.lock().await;
nets.refresh(true);
let mut totals = state.net_totals.lock().await;
let mut networks: Vec<NetworkInfo> = Vec::new();
for (name, data) in nets.iter() {
// sysinfo 0.36: data.received()/transmitted() are deltas since *last* refresh
let delta_rx = data.received();
let delta_tx = data.transmitted();
let entry = totals.entry(name.clone()).or_insert((0, 0));
entry.0 = entry.0.saturating_add(delta_rx);
entry.1 = entry.1.saturating_add(delta_tx);
networks.push(NetworkInfo {
name: name.clone(),
received: entry.0,
transmitted: entry.1,
});
}
// get number of cpu cores
let n_cpus = sys.cpus().len().max(1) as f32;
// Top processes: include PID and memory, top 20 by CPU
let mut top_processes: Vec<ProcessInfo> = sys
.processes()
.values()
.map(|p| ProcessInfo {
pid: p.pid().as_u32(),
name: p.name().to_string_lossy().to_string(),
cpu_usage: (p.cpu_usage() / n_cpus).min(100.0),
mem_bytes: p.memory(), // sysinfo 0.36: bytes
})
.collect();
top_processes.sort_by(|a, b| b.cpu_usage.partial_cmp(&a.cpu_usage).unwrap());
top_processes.truncate(20);
Metrics {
cpu_total: sys.global_cpu_usage(),
cpu_per_core: sys.cpus().iter().map(|c| c.cpu_usage()).collect(),
mem_total: sys.total_memory(),
mem_used: sys.used_memory(),
swap_total: sys.total_swap(),
swap_used: sys.used_swap(),
process_count: sys.processes().len(),
hostname,
cpu_temp_c,
disks,
networks,
top_processes,
}
}
fn best_cpu_temp(components: &Components) -> Option<f32> {
components
.iter()
.filter(|c| {
let label = c.label().to_lowercase();
label.contains("cpu") || label.contains("package") || label.contains("tctl") || label.contains("tdie")
})
.filter_map(|c| c.temperature())
.max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
}

View File

@ -0,0 +1,128 @@
//! Metrics collection using sysinfo. Keeps sysinfo handles in AppState to
//! avoid repeated allocations and allow efficient refreshes.
use crate::state::AppState;
use crate::types::{DiskInfo, Metrics, NetworkInfo, ProcessInfo};
use sysinfo::{Components, System};
pub async fn collect_metrics(state: &AppState) -> Metrics {
// System (CPU/mem/proc)
let mut sys = state.sys.lock().await;
// Simple and safe — can be replaced by more granular refresh if desired:
// sys.refresh_cpu(); sys.refresh_memory(); sys.refresh_processes_specifics(...);
//sys.refresh_all();
//refresh all was found to use 2X CPU rather than individual refreshes
sys.refresh_cpu_all();
sys.refresh_memory();
sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
let hostname = System::host_name().unwrap_or_else(|| "unknown".into());
// Temps via a persistent Components handle
let mut components = state.components.lock().await;
components.refresh(true);
let cpu_temp_c = best_cpu_temp(&components);
// Disks via a persistent Disks handle
let mut disks_struct = state.disks.lock().await;
disks_struct.refresh(true);
// Filter anything with available == 0 (e.g., overlay/virtual)
let disks: Vec<DiskInfo> = disks_struct
.list()
.iter()
.filter(|d| d.available_space() > 0)
.map(|d| DiskInfo {
name: d.name().to_string_lossy().to_string(),
total: d.total_space(),
available: d.available_space(),
})
.collect();
// Networks: use a persistent Networks + rolling totals
let mut nets = state.nets.lock().await;
nets.refresh(true);
let mut totals = state.net_totals.lock().await;
let mut networks: Vec<NetworkInfo> = Vec::new();
for (name, data) in nets.iter() {
// sysinfo: received()/transmitted() are deltas since last refresh
let delta_rx = data.received();
let delta_tx = data.transmitted();
let entry = totals.entry(name.clone()).or_insert((0, 0));
entry.0 = entry.0.saturating_add(delta_rx);
entry.1 = entry.1.saturating_add(delta_tx);
networks.push(NetworkInfo {
name: name.clone(),
received: entry.0,
transmitted: entry.1,
});
}
// Normalize process CPU to 0..100 across all cores
let n_cpus = sys.cpus().len().max(1) as f32;
// Build process list
let mut procs: Vec<ProcessInfo> = sys
.processes()
.values()
.map(|p| ProcessInfo {
pid: p.pid().as_u32(),
name: p.name().to_string_lossy().to_string(),
cpu_usage: (p.cpu_usage() / n_cpus).min(100.0),
mem_bytes: p.memory(),
})
.collect();
// Partial select: get the top 20 by CPU without fully sorting the vector
const TOP_N: usize = 20;
if procs.len() > TOP_N {
// nth index is TOP_N-1 (0-based)
let nth = TOP_N - 1;
procs.select_nth_unstable_by(nth, |a, b| {
b.cpu_usage
.partial_cmp(&a.cpu_usage)
.unwrap_or(std::cmp::Ordering::Equal)
});
procs.truncate(TOP_N);
// Order those 20 nicely for display
procs.sort_by(|a, b| {
b.cpu_usage
.partial_cmp(&a.cpu_usage)
.unwrap_or(std::cmp::Ordering::Equal)
});
} else {
procs.sort_by(|a, b| {
b.cpu_usage
.partial_cmp(&a.cpu_usage)
.unwrap_or(std::cmp::Ordering::Equal)
});
}
Metrics {
cpu_total: sys.global_cpu_usage(),
cpu_per_core: sys.cpus().iter().map(|c| c.cpu_usage()).collect(),
mem_total: sys.total_memory(),
mem_used: sys.used_memory(),
swap_total: sys.total_swap(),
swap_used: sys.used_swap(),
process_count: sys.processes().len(),
hostname,
cpu_temp_c,
disks,
networks,
top_processes: procs,
}
}
// Pick the hottest CPU-like sensor (labels vary by platform)
pub fn best_cpu_temp(components: &Components) -> Option<f32> {
components
.iter()
.filter(|c| {
let label = c.label().to_lowercase();
label.contains("cpu") || label.contains("package") || label.contains("tctl") || label.contains("tdie")
})
.filter_map(|c| c.temperature())
.max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
}

View File

@ -0,0 +1,36 @@
//! Background sampler: periodically collects metrics and updates a JSON cache,
//! so WS replies are just a read of the cached string.
use crate::metrics::collect_metrics;
use crate::state::AppState;
//use serde_json::to_string;
use tokio::task::JoinHandle;
use tokio::time::{Duration, interval, MissedTickBehavior};
pub fn spawn_sampler(state: AppState, period: Duration) -> JoinHandle<()> {
tokio::spawn(async move {
let idle_period = Duration::from_secs(10);
loop {
let active = state.client_count.load(std::sync::atomic::Ordering::Relaxed) > 0;
let mut ticker = interval(if active { period } else { idle_period });
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
ticker.tick().await;
if !active {
tokio::select! {
_ = ticker.tick() => {},
_ = state.wake_sampler.notified() => continue,
}
}
if let Ok(json) = async {
let m = collect_metrics(&state).await;
serde_json::to_string(&m)
}
.await
{
*state.last_json.write().await = json;
}
}
})
}

View File

@ -0,0 +1,30 @@
//! Shared agent state: sysinfo handles and hot JSON cache.
use std::{collections::HashMap, sync::Arc};
use std::sync::atomic::AtomicUsize;
use sysinfo::{Components, Disks, Networks, System};
use tokio::sync::{Mutex, RwLock, Notify};
pub type SharedSystem = Arc<Mutex<System>>;
pub type SharedNetworks = Arc<Mutex<Networks>>;
pub type SharedTotals = Arc<Mutex<HashMap<String, (u64, u64)>>>;
pub type SharedComponents = Arc<Mutex<Components>>;
pub type SharedDisks = Arc<Mutex<Disks>>;
#[derive(Clone)]
pub struct AppState {
// Persistent sysinfo handles
pub sys: SharedSystem,
pub nets: SharedNetworks,
pub net_totals: SharedTotals, // iface -> (rx_total, tx_total)
pub components: SharedComponents,
pub disks: SharedDisks,
// Last serialized JSON snapshot for fast WS responses
pub last_json: Arc<RwLock<String>>,
// Adaptive sampling controls
pub client_count: Arc<AtomicUsize>,
pub wake_sampler: Arc<Notify>,
pub auth_token: Option<String>,
}

View File

@ -0,0 +1,43 @@
//! Data types sent to the client over WebSocket.
//! Keep this module minimal and stable — it defines the wire format.
use serde::Serialize;
#[derive(Debug, Serialize, Clone)]
pub struct ProcessInfo {
pub pid: u32,
pub name: String,
pub cpu_usage: f32,
pub mem_bytes: u64,
}
#[derive(Debug, Serialize, Clone)]
pub struct DiskInfo {
pub name: String,
pub total: u64,
pub available: u64,
}
#[derive(Debug, Serialize, Clone)]
pub struct NetworkInfo {
pub name: String,
// cumulative totals since the agent started (client should diff to get rates)
pub received: u64,
pub transmitted: u64,
}
#[derive(Debug, Serialize, Clone)]
pub struct Metrics {
pub cpu_total: f32,
pub cpu_per_core: Vec<f32>,
pub mem_total: u64,
pub mem_used: u64,
pub swap_total: u64,
pub swap_used: u64,
pub process_count: usize,
pub hostname: String,
pub cpu_temp_c: Option<f32>,
pub disks: Vec<DiskInfo>,
pub networks: Vec<NetworkInfo>,
pub top_processes: Vec<ProcessInfo>,
}

66
socktop_agent/src/ws.rs Normal file
View File

@ -0,0 +1,66 @@
//! WebSocket upgrade and per-connection handler. Serves cached JSON quickly.
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Query, State,
},
http::StatusCode,
response::{IntoResponse, Response},
};
use futures_util::stream::StreamExt;
use crate::metrics::collect_metrics;
use crate::state::AppState;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
pub async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Query(q): Query<HashMap<String, String>>,
) -> Response {
if let Some(expected) = state.auth_token.as_ref() {
match q.get("token") {
Some(t) if t == expected => {}
_ => return StatusCode::UNAUTHORIZED.into_response(),
}
}
ws.on_upgrade(move |socket| handle_socket(socket, state))
}
async fn handle_socket(mut socket: WebSocket, state: AppState) {
// Bump client count on connect and wake the sampler.
state.client_count.fetch_add(1, Ordering::Relaxed);
state.wake_sampler.notify_waiters();
// Ensure we decrement on disconnect (drop).
struct ClientGuard(AppState);
impl Drop for ClientGuard {
fn drop(&mut self) {
self.0.client_count.fetch_sub(1, Ordering::Relaxed);
self.0.wake_sampler.notify_waiters();
}
}
let _guard = ClientGuard(state.clone());
while let Some(Ok(msg)) = socket.next().await {
match msg {
Message::Text(text) if text == "get_metrics" => {
// Serve the cached JSON quickly; if empty (cold start), collect once.
let cached = state.last_json.read().await.clone();
if !cached.is_empty() {
let _ = socket.send(Message::Text(cached)).await;
} else {
let metrics = collect_metrics(&state).await;
if let Ok(js) = serde_json::to_string(&metrics) {
let _ = socket.send(Message::Text(js)).await;
}
}
}
Message::Close(_) => break,
_ => {}
}
}
}