Merge branch 'master' into feature/wss-selfsigned

This commit is contained in:
jasonwitty 2025-08-19 15:31:10 -07:00 committed by GitHub
commit 6b58ac67f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 181 additions and 17 deletions

View File

@ -23,6 +23,59 @@ jobs:
run: cargo clippy --all-targets --all-features -- -D warnings run: cargo clippy --all-targets --all-features -- -D warnings
- name: Build (release) - name: Build (release)
run: cargo build --release --workspace run: cargo build --release --workspace
- name: Start agent (Ubuntu)
if: matrix.os == 'ubuntu-latest'
shell: bash
run: |
set -euo pipefail
# Use debug build for faster startup in CI
RUST_LOG=info cargo run -p socktop_agent -- -p 3000 &
AGENT_PID=$!
echo "AGENT_PID=$AGENT_PID" >> $GITHUB_ENV
# Wait for port 3000 to accept connections (30s max)
for i in {1..60}; do
if bash -lc "</dev/tcp/127.0.0.1/3000" &>/dev/null; then
echo "agent is ready"
break
fi
sleep 0.5
done
- name: Run WS probe test (Ubuntu)
if: matrix.os == 'ubuntu-latest'
shell: bash
env:
SOCKTOP_WS: ws://127.0.0.1:3000/ws
run: |
set -euo pipefail
cargo test -p socktop --test ws_probe -- --nocapture
- name: Stop agent (Ubuntu)
if: always() && matrix.os == 'ubuntu-latest'
shell: bash
run: |
if [ -n "${AGENT_PID:-}" ]; then kill $AGENT_PID || true; fi
- name: Start agent (Windows)
if: matrix.os == 'windows-latest'
shell: pwsh
run: |
$p = Start-Process -FilePath "cargo" -ArgumentList "run -p socktop_agent -- -p 3000" -PassThru
echo "AGENT_PID=$($p.Id)" | Out-File -FilePath $env:GITHUB_ENV -Append
$ready = $false
for ($i = 0; $i -lt 60; $i++) {
if (Test-NetConnection -ComputerName 127.0.0.1 -Port 3000 -InformationLevel Quiet) { $ready = $true; break }
Start-Sleep -Milliseconds 500
}
if (-not $ready) { Write-Error "agent did not become ready" }
- name: Run WS probe test (Windows)
if: matrix.os == 'windows-latest'
shell: pwsh
run: |
$env:SOCKTOP_WS = "ws://127.0.0.1:3000/ws"
cargo test -p socktop --test ws_probe -- --nocapture
- name: Stop agent (Windows)
if: always() && matrix.os == 'windows-latest'
shell: pwsh
run: |
if ($env:AGENT_PID) { Stop-Process -Id $env:AGENT_PID -Force -ErrorAction SilentlyContinue }
- name: Smoke test (client --help) - name: Smoke test (client --help)
run: cargo run -p socktop -- --help run: cargo run -p socktop -- --help
- name: Package artifacts - name: Package artifacts

4
Cargo.lock generated
View File

@ -2038,7 +2038,7 @@ dependencies = [
[[package]] [[package]]
name = "socktop" name = "socktop"
version = "0.1.1" version = "0.1.11"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"assert_cmd", "assert_cmd",
@ -2059,7 +2059,7 @@ dependencies = [
[[package]] [[package]]
name = "socktop_agent" name = "socktop_agent"
version = "0.1.1" version = "0.1.11"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"assert_cmd", "assert_cmd",

View File

@ -51,7 +51,7 @@ exec bash # or: exec zsh / exec fish
Windows (for the brave): install from https://rustup.rs with the MSVC toolchain. Yes, youll need Visual Studio Build Tools. You chose Windows — enjoy the ride. Windows (for the brave): install from https://rustup.rs with the MSVC toolchain. Yes, youll need Visual Studio Build Tools. You chose Windows — enjoy the ride.
### Raspberry Pi (required) ### Raspberry Pi / Ubuntu / PopOS (required)
Install GPU support with apt command below Install GPU support with apt command below

3
rust-toolchain.toml Normal file
View File

@ -0,0 +1,3 @@
[toolchain]
channel = "stable"
components = ["clippy", "rustfmt"]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "socktop" name = "socktop"
version = "0.1.1" version = "0.1.11"
authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"] authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"]
description = "Remote system monitor over WebSocket, TUI like top" description = "Remote system monitor over WebSocket, TUI like top"
edition = "2021" edition = "2021"

View File

@ -63,6 +63,9 @@ pub struct App {
last_disks_poll: Instant, last_disks_poll: Instant,
procs_interval: Duration, procs_interval: Duration,
disks_interval: Duration, disks_interval: Duration,
// For reconnects
ws_url: String,
} }
impl App { impl App {
@ -91,6 +94,7 @@ impl App {
.unwrap_or_else(Instant::now), .unwrap_or_else(Instant::now),
procs_interval: Duration::from_secs(2), procs_interval: Duration::from_secs(2),
disks_interval: Duration::from_secs(5), disks_interval: Duration::from_secs(5),
ws_url: String::new(),
} }
} }
@ -99,7 +103,10 @@ impl App {
url: &str, url: &str,
tls_ca: Option<&str>, tls_ca: Option<&str>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
// Connect to agent // Connect to agent
//let mut ws = connect(url, tls_ca).await?;
self.ws_url = url.to_string();
let mut ws = connect(url, tls_ca).await?; let mut ws = connect(url, tls_ca).await?;
// Terminal setup // Terminal setup
@ -465,6 +472,7 @@ impl Default for App {
.unwrap_or_else(Instant::now), .unwrap_or_else(Instant::now),
procs_interval: Duration::from_secs(2), procs_interval: Duration::from_secs(2),
disks_interval: Duration::from_secs(5), disks_interval: Duration::from_secs(5),
ws_url: String::new(),
} }
} }
} }

4
socktop/src/lib.rs Normal file
View File

@ -0,0 +1,4 @@
//! Library surface for integration tests and reuse.
pub mod types;
pub mod ws;

View File

@ -4,10 +4,11 @@ use flate2::bufread::GzDecoder;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use rustls::{ClientConfig, RootCertStore}; use rustls::{ClientConfig, RootCertStore};
use rustls_pemfile::Item; use rustls_pemfile::Item;
use std::io::Read; use std::io::{Cursor, Read};
use std::sync::OnceLock;
use std::{fs::File, io::BufReader, sync::Arc}; use std::{fs::File, io::BufReader, sync::Arc};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::time::{interval, Duration}; use tokio::time::{interval, timeout, Duration};
use tokio_tungstenite::{ use tokio_tungstenite::{
connect_async, connect_async_tls_with_config, tungstenite::client::IntoClientRequest, connect_async, connect_async_tls_with_config, tungstenite::client::IntoClientRequest,
tungstenite::Message, Connector, MaybeTlsStream, WebSocketStream, tungstenite::Message, Connector, MaybeTlsStream, WebSocketStream,
@ -56,6 +57,16 @@ async fn connect_with_ca(url: &str, ca_path: &str) -> Result<WsStream, Box<dyn s
Ok(ws) Ok(ws)
} }
#[inline]
fn debug_on() -> bool {
static ON: OnceLock<bool> = OnceLock::new();
*ON.get_or_init(|| {
std::env::var("SOCKTOP_DEBUG")
.map(|v| v != "0")
.unwrap_or(false)
})
}
// Send a "get_metrics" request and await a single JSON reply // Send a "get_metrics" request and await a single JSON reply
pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> { pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
if ws.send(Message::Text("get_metrics".into())).await.is_err() { if ws.send(Message::Text("get_metrics".into())).await.is_err() {

27
socktop/tests/ws_probe.rs Normal file
View File

@ -0,0 +1,27 @@
use socktop::ws::{connect, request_metrics, request_processes};
// Integration probe: only runs when SOCKTOP_WS is set to an agent WebSocket URL.
// Example: SOCKTOP_WS=ws://127.0.0.1:3000/ws cargo test -p socktop --test ws_probe -- --nocapture
#[tokio::test]
async fn probe_ws_endpoints() {
// Gate the test to avoid CI failures when no agent is running.
let url = match std::env::var("SOCKTOP_WS") {
Ok(v) if !v.is_empty() => v,
_ => {
eprintln!(
"skipping ws_probe: set SOCKTOP_WS=ws://host:port/ws to run this integration test"
);
return;
}
};
let mut ws = connect(&url).await.expect("connect ws");
// Should get fast metrics quickly
let m = request_metrics(&mut ws).await;
assert!(m.is_some(), "expected Metrics payload within timeout");
// Processes may be gzipped and a bit slower, but should arrive
let p = request_processes(&mut ws).await;
assert!(p.is_some(), "expected Processes payload within timeout");
}

View File

@ -1,6 +1,6 @@
[package] [package]
name = "socktop_agent" name = "socktop_agent"
version = "0.1.1" version = "0.1.11"
authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"] authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"]
description = "Remote system monitor over WebSocket, TUI like top" description = "Remote system monitor over WebSocket, TUI like top"
edition = "2021" edition = "2021"

View File

@ -4,8 +4,11 @@ use crate::gpu::collect_all_gpus;
use crate::state::AppState; use crate::state::AppState;
use crate::types::{DiskInfo, Metrics, NetworkInfo, ProcessInfo, ProcessesPayload}; use crate::types::{DiskInfo, Metrics, NetworkInfo, ProcessInfo, ProcessesPayload};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
#[cfg(target_os = "linux")]
use std::collections::HashMap; use std::collections::HashMap;
#[cfg(target_os = "linux")]
use std::fs; use std::fs;
#[cfg(target_os = "linux")]
use std::io; use std::io;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -198,6 +201,8 @@ pub async fn collect_disks(state: &AppState) -> Vec<DiskInfo> {
.collect() .collect()
} }
// Linux-only helpers and implementation using /proc deltas for accurate CPU%.
#[cfg(target_os = "linux")]
#[inline] #[inline]
fn read_total_jiffies() -> io::Result<u64> { fn read_total_jiffies() -> io::Result<u64> {
// /proc/stat first line: "cpu user nice system idle iowait irq softirq steal ..." // /proc/stat first line: "cpu user nice system idle iowait irq softirq steal ..."
@ -216,6 +221,7 @@ fn read_total_jiffies() -> io::Result<u64> {
Err(io::Error::other("no cpu line")) Err(io::Error::other("no cpu line"))
} }
#[cfg(target_os = "linux")]
#[inline] #[inline]
fn read_proc_jiffies(pid: u32) -> Option<u64> { fn read_proc_jiffies(pid: u32) -> Option<u64> {
let path = format!("/proc/{pid}/stat"); let path = format!("/proc/{pid}/stat");
@ -230,11 +236,10 @@ fn read_proc_jiffies(pid: u32) -> Option<u64> {
Some(utime.saturating_add(stime)) Some(utime.saturating_add(stime))
} }
// Replace the body of collect_processes_top_k to use /proc deltas. /// Collect top processes (Linux variant): compute CPU% via /proc jiffies delta.
// This makes CPU% = (delta_proc / delta_total) * 100 over the 2s interval. #[cfg(target_os = "linux")]
pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload { pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload {
// Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows). // Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows).
// Only processes, no per-thread entries.
let mut sys = System::new(); let mut sys = System::new();
sys.refresh_processes_specifics( sys.refresh_processes_specifics(
ProcessesToUpdate::All, ProcessesToUpdate::All,
@ -256,12 +261,20 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
// Compute deltas vs last sample // Compute deltas vs last sample
let (last_total, mut last_map) = { let (last_total, mut last_map) = {
#[cfg(target_os = "linux")]
{
let mut t = state.proc_cpu.lock().await; let mut t = state.proc_cpu.lock().await;
let lt = t.last_total; let lt = t.last_total;
let lm = std::mem::take(&mut t.last_per_pid); let lm = std::mem::take(&mut t.last_per_pid);
t.last_total = total_now; t.last_total = total_now;
t.last_per_pid = current.clone(); t.last_per_pid = current.clone();
(lt, lm) (lt, lm)
}
#[cfg(not(target_os = "linux"))]
{
let _: u64 = total_now; // silence unused warning
(0u64, HashMap::new())
}
}; };
// On first run or if total delta is tiny, report zeros // On first run or if total delta is tiny, report zeros
@ -308,6 +321,47 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
} }
} }
/// Collect top processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh.
#[cfg(not(target_os = "linux"))]
pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload {
use tokio::time::sleep;
let mut sys = state.sys.lock().await;
// First refresh to set baseline
sys.refresh_processes_specifics(
ProcessesToUpdate::All,
false,
ProcessRefreshKind::everything().without_tasks(),
);
// Small delay so sysinfo can compute CPU deltas on next refresh
sleep(Duration::from_millis(250)).await;
sys.refresh_processes_specifics(
ProcessesToUpdate::All,
false,
ProcessRefreshKind::everything().without_tasks(),
);
let total_count = sys.processes().len();
let mut procs: Vec<ProcessInfo> = sys
.processes()
.values()
.map(|p| ProcessInfo {
pid: p.pid().as_u32(),
name: p.name().to_string_lossy().into_owned(),
cpu_usage: p.cpu_usage(),
mem_bytes: p.memory(),
})
.collect();
procs = top_k_sorted(procs, k);
ProcessesPayload {
process_count: total_count,
top_processes: procs,
}
}
// Small helper to select and sort top-k by cpu // Small helper to select and sort top-k by cpu
fn top_k_sorted(mut v: Vec<ProcessInfo>, k: usize) -> Vec<ProcessInfo> { fn top_k_sorted(mut v: Vec<ProcessInfo>, k: usize) -> Vec<ProcessInfo> {
if v.len() > k { if v.len() > k {

View File

@ -1,5 +1,6 @@
//! Shared agent state: sysinfo handles and hot JSON cache. //! Shared agent state: sysinfo handles and hot JSON cache.
#[cfg(target_os = "linux")]
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
@ -11,6 +12,7 @@ pub type SharedComponents = Arc<Mutex<Components>>;
pub type SharedDisks = Arc<Mutex<Disks>>; pub type SharedDisks = Arc<Mutex<Disks>>;
pub type SharedNetworks = Arc<Mutex<Networks>>; pub type SharedNetworks = Arc<Mutex<Networks>>;
#[cfg(target_os = "linux")]
#[derive(Default)] #[derive(Default)]
pub struct ProcCpuTracker { pub struct ProcCpuTracker {
pub last_total: u64, pub last_total: u64,
@ -24,7 +26,8 @@ pub struct AppState {
pub disks: SharedDisks, pub disks: SharedDisks,
pub networks: SharedNetworks, pub networks: SharedNetworks,
// For correct per-process CPU% using /proc deltas // For correct per-process CPU% using /proc deltas (Linux only path uses this tracker)
#[cfg(target_os = "linux")]
pub proc_cpu: Arc<Mutex<ProcCpuTracker>>, pub proc_cpu: Arc<Mutex<ProcCpuTracker>>,
// Connection tracking (to allow future idle sleeps if desired) // Connection tracking (to allow future idle sleeps if desired)
@ -45,6 +48,7 @@ impl AppState {
components: Arc::new(Mutex::new(components)), components: Arc::new(Mutex::new(components)),
disks: Arc::new(Mutex::new(disks)), disks: Arc::new(Mutex::new(disks)),
networks: Arc::new(Mutex::new(networks)), networks: Arc::new(Mutex::new(networks)),
#[cfg(target_os = "linux")]
proc_cpu: Arc::new(Mutex::new(ProcCpuTracker::default())), proc_cpu: Arc::new(Mutex::new(ProcCpuTracker::default())),
client_count: Arc::new(AtomicUsize::new(0)), client_count: Arc::new(AtomicUsize::new(0)),
auth_token: std::env::var("SOCKTOP_TOKEN") auth_token: std::env::var("SOCKTOP_TOKEN")