protobuff Process list
BREAKING: Process list over WS is now Protocol Buffers; client required. Agent: returns all processes (no server-side top-k); large payloads gzip-compressed. Client: decodes protobuf (gz/raw), moves sorting/pagination to TUI. Build: add prost/prost-build with vendored protoc; enable thin LTO, panic=abort, strip symbols. Cleanup: cfg-gate Linux-only code; fix Clippy across platforms; tests updated (ws probe TLS CA).
This commit is contained in:
parent
10501168c5
commit
554a2c349f
160
Cargo.lock
generated
160
Cargo.lock
generated
@ -591,6 +591,12 @@ version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.5.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.1.2"
|
||||
@ -1322,6 +1328,12 @@ dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "multimap"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
@ -1506,6 +1518,16 @@ version = "2.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
||||
|
||||
[[package]]
|
||||
name = "petgraph"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772"
|
||||
dependencies = [
|
||||
"fixedbitset",
|
||||
"indexmap",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.1.10"
|
||||
@ -1608,6 +1630,122 @@ dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-build"
|
||||
version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"itertools 0.13.0",
|
||||
"log",
|
||||
"multimap",
|
||||
"once_cell",
|
||||
"petgraph",
|
||||
"prettyplease",
|
||||
"prost",
|
||||
"prost-types",
|
||||
"regex",
|
||||
"syn",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.13.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-types"
|
||||
version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
|
||||
dependencies = [
|
||||
"prost",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protoc-bin-vendored"
|
||||
version = "3.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa"
|
||||
dependencies = [
|
||||
"protoc-bin-vendored-linux-aarch_64",
|
||||
"protoc-bin-vendored-linux-ppcle_64",
|
||||
"protoc-bin-vendored-linux-s390_64",
|
||||
"protoc-bin-vendored-linux-x86_32",
|
||||
"protoc-bin-vendored-linux-x86_64",
|
||||
"protoc-bin-vendored-macos-aarch_64",
|
||||
"protoc-bin-vendored-macos-x86_64",
|
||||
"protoc-bin-vendored-win32",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protoc-bin-vendored-linux-aarch_64"
|
||||
version = "3.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c"
|
||||
|
||||
[[package]]
|
||||
name = "protoc-bin-vendored-linux-ppcle_64"
|
||||
version = "3.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c"
|
||||
|
||||
[[package]]
|
||||
name = "protoc-bin-vendored-linux-s390_64"
|
||||
version = "3.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0"
|
||||
|
||||
[[package]]
|
||||
name = "protoc-bin-vendored-linux-x86_32"
|
||||
version = "3.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5"
|
||||
|
||||
[[package]]
|
||||
name = "protoc-bin-vendored-linux-x86_64"
|
||||
version = "3.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78"
|
||||
|
||||
[[package]]
|
||||
name = "protoc-bin-vendored-macos-aarch_64"
|
||||
version = "3.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092"
|
||||
|
||||
[[package]]
|
||||
name = "protoc-bin-vendored-macos-x86_64"
|
||||
version = "3.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756"
|
||||
|
||||
[[package]]
|
||||
name = "protoc-bin-vendored-win32"
|
||||
version = "3.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3"
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.40"
|
||||
@ -2042,11 +2180,15 @@ version = "0.1.11"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"assert_cmd",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crossterm 0.27.0",
|
||||
"flate2",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"prost",
|
||||
"prost-build",
|
||||
"protoc-bin-vendored",
|
||||
"ratatui",
|
||||
"rustls 0.23.31",
|
||||
"rustls-pemfile",
|
||||
@ -2065,6 +2207,7 @@ dependencies = [
|
||||
"assert_cmd",
|
||||
"axum",
|
||||
"axum-server",
|
||||
"bytes",
|
||||
"flate2",
|
||||
"futures",
|
||||
"futures-util",
|
||||
@ -2073,6 +2216,10 @@ dependencies = [
|
||||
"nvml-wrapper",
|
||||
"once_cell",
|
||||
"openssl",
|
||||
"prost",
|
||||
"prost-build",
|
||||
"prost-types",
|
||||
"protoc-bin-vendored",
|
||||
"rustls 0.23.31",
|
||||
"rustls-pemfile",
|
||||
"serde",
|
||||
@ -2080,6 +2227,7 @@ dependencies = [
|
||||
"sysinfo",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tonic-build",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"tungstenite 0.27.0",
|
||||
@ -2330,6 +2478,18 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic-build"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11"
|
||||
dependencies = [
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
|
||||
13
Cargo.toml
13
Cargo.toml
@ -34,3 +34,16 @@ chrono = { version = "0.4", features = ["serde"] }
|
||||
|
||||
# web server (remote-agent)
|
||||
axum = { version = "0.7", features = ["ws"] }
|
||||
|
||||
# protobuf
|
||||
prost = "0.13"
|
||||
prost-types = "0.13"
|
||||
bytes = "1"
|
||||
|
||||
[profile.release]
|
||||
# Favor smaller, simpler binaries with good runtime perf
|
||||
lto = "thin"
|
||||
codegen-units = 1
|
||||
panic = "abort"
|
||||
opt-level = 3
|
||||
strip = "symbols"
|
||||
8
nohup.out
Normal file
8
nohup.out
Normal file
@ -0,0 +1,8 @@
|
||||
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws
|
||||
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws
|
||||
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws
|
||||
Error: Address already in use (os error 98)
|
||||
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws
|
||||
Error: Address already in use (os error 98)
|
||||
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8443/ws
|
||||
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8443/ws
|
||||
15
proto/processes.proto
Normal file
15
proto/processes.proto
Normal file
@ -0,0 +1,15 @@
|
||||
syntax = "proto3";
|
||||
package socktop;
|
||||
|
||||
// All running processes. Sorting is done client-side.
|
||||
message Processes {
|
||||
uint64 process_count = 1; // total processes in the system
|
||||
repeated Process rows = 2; // all processes
|
||||
}
|
||||
|
||||
message Process {
|
||||
uint32 pid = 1;
|
||||
string name = 2;
|
||||
float cpu_usage = 3; // 0..100
|
||||
uint64 mem_bytes = 4; // RSS bytes
|
||||
}
|
||||
@ -21,6 +21,12 @@ anyhow = { workspace = true }
|
||||
flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
|
||||
rustls = "0.23"
|
||||
rustls-pemfile = "2.1"
|
||||
prost = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
assert_cmd = "2.0"
|
||||
assert_cmd = "2.0"
|
||||
|
||||
[build-dependencies]
|
||||
prost-build = "0.13"
|
||||
protoc-bin-vendored = "3"
|
||||
8
socktop/build.rs
Normal file
8
socktop/build.rs
Normal file
@ -0,0 +1,8 @@
|
||||
fn main() {
|
||||
let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc");
|
||||
std::env::set_var("PROTOC", protoc);
|
||||
let mut cfg = prost_build::Config::new();
|
||||
cfg.out_dir(std::env::var("OUT_DIR").unwrap());
|
||||
cfg.compile_protos(&["../proto/processes.proto"], &["../proto"])
|
||||
.expect("compile protos");
|
||||
}
|
||||
@ -2,19 +2,24 @@
|
||||
|
||||
use flate2::bufread::GzDecoder;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use prost::Message as _;
|
||||
use rustls::{ClientConfig, RootCertStore};
|
||||
use rustls_pemfile::Item;
|
||||
use std::io::{Read};
|
||||
use std::io::Read;
|
||||
use std::{fs::File, io::BufReader, sync::Arc};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::{interval, Duration};
|
||||
use tokio_tungstenite::{
|
||||
connect_async, connect_async_tls_with_config, tungstenite::client::IntoClientRequest,
|
||||
tungstenite::Message, Connector, MaybeTlsStream, WebSocketStream,
|
||||
};
|
||||
use url::Url;
|
||||
|
||||
use crate::types::{DiskInfo, Metrics, ProcessesPayload};
|
||||
use crate::types::{DiskInfo, Metrics, ProcessInfo, ProcessesPayload};
|
||||
|
||||
mod pb {
|
||||
// generated by build.rs
|
||||
include!(concat!(env!("OUT_DIR"), "/socktop.rs"));
|
||||
}
|
||||
|
||||
pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
||||
|
||||
@ -56,7 +61,6 @@ async fn connect_with_ca(url: &str, ca_path: &str) -> Result<WsStream, Box<dyn s
|
||||
Ok(ws)
|
||||
}
|
||||
|
||||
|
||||
// Send a "get_metrics" request and await a single JSON reply
|
||||
pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
|
||||
if ws.send(Message::Text("get_metrics".into())).await.is_err() {
|
||||
@ -79,6 +83,16 @@ fn gunzip_to_string(bytes: &[u8]) -> Option<String> {
|
||||
Some(out)
|
||||
}
|
||||
|
||||
fn gunzip_to_vec(bytes: &[u8]) -> Option<Vec<u8>> {
|
||||
let mut dec = GzDecoder::new(bytes);
|
||||
let mut out = Vec::new();
|
||||
dec.read_to_end(&mut out).ok()?;
|
||||
Some(out)
|
||||
}
|
||||
|
||||
fn is_gzip(bytes: &[u8]) -> bool {
|
||||
bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b
|
||||
}
|
||||
// Suppress dead_code until these are wired into the app
|
||||
#[allow(dead_code)]
|
||||
pub enum Payload {
|
||||
@ -87,23 +101,6 @@ pub enum Payload {
|
||||
Processes(ProcessesPayload),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn parse_any_payload(json: &str) -> Result<Payload, serde_json::Error> {
|
||||
if let Ok(m) = serde_json::from_str::<Metrics>(json) {
|
||||
return Ok(Payload::Metrics(m));
|
||||
}
|
||||
if let Ok(d) = serde_json::from_str::<Vec<DiskInfo>>(json) {
|
||||
return Ok(Payload::Disks(d));
|
||||
}
|
||||
if let Ok(p) = serde_json::from_str::<ProcessesPayload>(json) {
|
||||
return Ok(Payload::Processes(p));
|
||||
}
|
||||
Err(serde_json::Error::io(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"unknown payload",
|
||||
)))
|
||||
}
|
||||
|
||||
// Send a "get_disks" request and await a JSON Vec<DiskInfo>
|
||||
pub async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
|
||||
if ws.send(Message::Text("get_disks".into())).await.is_err() {
|
||||
@ -118,7 +115,7 @@ pub async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
|
||||
}
|
||||
}
|
||||
|
||||
// Send a "get_processes" request and await a JSON ProcessesPayload
|
||||
// Send a "get_processes" request and await a ProcessesPayload decoded from protobuf (binary, may be gzipped)
|
||||
pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
|
||||
if ws
|
||||
.send(Message::Text("get_processes".into()))
|
||||
@ -129,68 +126,38 @@ pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
|
||||
}
|
||||
match ws.next().await {
|
||||
Some(Ok(Message::Binary(b))) => {
|
||||
gunzip_to_string(&b).and_then(|s| serde_json::from_str::<ProcessesPayload>(&s).ok())
|
||||
let gz = is_gzip(&b);
|
||||
let data = if gz { gunzip_to_vec(&b)? } else { b };
|
||||
match pb::Processes::decode(data.as_slice()) {
|
||||
Ok(pb) => {
|
||||
let rows: Vec<ProcessInfo> = pb
|
||||
.rows
|
||||
.into_iter()
|
||||
.map(|p: pb::Process| ProcessInfo {
|
||||
pid: p.pid,
|
||||
name: p.name,
|
||||
cpu_usage: p.cpu_usage,
|
||||
mem_bytes: p.mem_bytes,
|
||||
})
|
||||
.collect();
|
||||
Some(ProcessesPayload {
|
||||
process_count: pb.process_count as usize,
|
||||
top_processes: rows,
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
if std::env::var("SOCKTOP_DEBUG").ok().as_deref() == Some("1") {
|
||||
eprintln!("protobuf decode failed: {e}");
|
||||
}
|
||||
// Fallback: maybe it's JSON (bytes already decompressed if gz)
|
||||
match String::from_utf8(data) {
|
||||
Ok(s) => serde_json::from_str::<ProcessesPayload>(&s).ok(),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Text(json))) => serde_json::from_str::<ProcessesPayload>(&json).ok(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn start_ws_polling(mut ws: WsStream) {
|
||||
let mut t_fast = interval(Duration::from_millis(500));
|
||||
let mut t_procs = interval(Duration::from_secs(2));
|
||||
let mut t_disks = interval(Duration::from_secs(5));
|
||||
|
||||
let _ = ws.send(Message::Text("get_metrics".into())).await;
|
||||
let _ = ws.send(Message::Text("get_processes".into())).await;
|
||||
let _ = ws.send(Message::Text("get_disks".into())).await;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = t_fast.tick() => {
|
||||
let _ = ws.send(Message::Text("get_metrics".into())).await;
|
||||
}
|
||||
_ = t_procs.tick() => {
|
||||
let _ = ws.send(Message::Text("get_processes".into())).await;
|
||||
}
|
||||
_ = t_disks.tick() => {
|
||||
let _ = ws.send(Message::Text("get_disks".into())).await;
|
||||
}
|
||||
maybe = ws.next() => {
|
||||
let Some(result) = maybe else { break; };
|
||||
let Ok(msg) = result else { break; };
|
||||
match msg {
|
||||
Message::Binary(b) => {
|
||||
if let Some(json) = gunzip_to_string(&b) {
|
||||
if let Ok(payload) = parse_any_payload(&json) {
|
||||
match payload {
|
||||
Payload::Metrics(_m) => {
|
||||
// update your app state with fast metrics
|
||||
}
|
||||
Payload::Disks(_d) => {
|
||||
// update your app state with disks
|
||||
}
|
||||
Payload::Processes(_p) => {
|
||||
// update your app state with processes
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Message::Text(s) => {
|
||||
if let Ok(payload) = parse_any_payload(&s) {
|
||||
match payload {
|
||||
Payload::Metrics(_m) => {}
|
||||
Payload::Disks(_d) => {}
|
||||
Payload::Processes(_p) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
Message::Close(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,9 +17,7 @@ async fn probe_ws_endpoints() {
|
||||
|
||||
// Optional pinned CA for WSS/self-signed setups
|
||||
let tls_ca = std::env::var("SOCKTOP_TLS_CA").ok();
|
||||
let mut ws = connect(&url, tls_ca.as_deref())
|
||||
.await
|
||||
.expect("connect ws");
|
||||
let mut ws = connect(&url, tls_ca.as_deref()).await.expect("connect ws");
|
||||
|
||||
// Should get fast metrics quickly
|
||||
let m = request_metrics(&mut ws).await;
|
||||
|
||||
@ -27,6 +27,14 @@ rustls-pemfile = "2.1"
|
||||
openssl = { version = "0.10", features = ["vendored"] } # for cross‑platform self‑signed generation
|
||||
anyhow = "1"
|
||||
hostname = "0.3"
|
||||
bytes = { workspace = true }
|
||||
prost = { workspace = true }
|
||||
|
||||
[build-dependencies]
|
||||
prost-build = "0.13"
|
||||
prost-types = { workspace = true }
|
||||
tonic-build = { version = "0.12", default-features = false, optional = true }
|
||||
protoc-bin-vendored = "3"
|
||||
[dev-dependencies]
|
||||
assert_cmd = "2.0"
|
||||
tempfile = "3.10"
|
||||
11
socktop_agent/build.rs
Normal file
11
socktop_agent/build.rs
Normal file
@ -0,0 +1,11 @@
|
||||
fn main() {
|
||||
// Ensure protoc exists (vendored for reproducible builds)
|
||||
let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc");
|
||||
std::env::set_var("PROTOC", protoc);
|
||||
|
||||
// Compile protobuf definitions for processes
|
||||
let mut cfg = prost_build::Config::new();
|
||||
cfg.out_dir(std::env::var("OUT_DIR").unwrap());
|
||||
cfg.compile_protos(&["../proto/processes.proto"], &["../proto"])
|
||||
.expect("compile protos");
|
||||
}
|
||||
@ -3,6 +3,7 @@
|
||||
|
||||
mod gpu;
|
||||
mod metrics;
|
||||
mod proto;
|
||||
mod sampler;
|
||||
mod state;
|
||||
mod types;
|
||||
|
||||
@ -236,9 +236,9 @@ fn read_proc_jiffies(pid: u32) -> Option<u64> {
|
||||
Some(utime.saturating_add(stime))
|
||||
}
|
||||
|
||||
/// Collect top processes (Linux variant): compute CPU% via /proc jiffies delta.
|
||||
/// Collect all processes (Linux): compute CPU% via /proc jiffies delta; sorting moved to client.
|
||||
#[cfg(target_os = "linux")]
|
||||
pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload {
|
||||
pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
|
||||
// Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows).
|
||||
let mut sys = System::new();
|
||||
sys.refresh_processes_specifics(
|
||||
@ -291,7 +291,7 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
|
||||
.collect();
|
||||
return ProcessesPayload {
|
||||
process_count: total_count,
|
||||
top_processes: top_k_sorted(procs, k),
|
||||
top_processes: procs,
|
||||
};
|
||||
}
|
||||
|
||||
@ -317,13 +317,13 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
|
||||
|
||||
ProcessesPayload {
|
||||
process_count: total_count,
|
||||
top_processes: top_k_sorted(procs, k),
|
||||
top_processes: procs,
|
||||
}
|
||||
}
|
||||
|
||||
/// Collect top processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh.
|
||||
/// Collect all processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh.
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload {
|
||||
pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
|
||||
use tokio::time::sleep;
|
||||
|
||||
let mut sys = state.sys.lock().await;
|
||||
@ -354,8 +354,6 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
|
||||
mem_bytes: p.memory(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
procs = top_k_sorted(procs, k);
|
||||
ProcessesPayload {
|
||||
process_count: total_count,
|
||||
top_processes: procs,
|
||||
@ -363,19 +361,4 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
|
||||
}
|
||||
|
||||
// Small helper to select and sort top-k by cpu
|
||||
fn top_k_sorted(mut v: Vec<ProcessInfo>, k: usize) -> Vec<ProcessInfo> {
|
||||
if v.len() > k {
|
||||
v.select_nth_unstable_by(k, |a, b| {
|
||||
b.cpu_usage
|
||||
.partial_cmp(&a.cpu_usage)
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
});
|
||||
v.truncate(k);
|
||||
}
|
||||
v.sort_by(|a, b| {
|
||||
b.cpu_usage
|
||||
.partial_cmp(&a.cpu_usage)
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
});
|
||||
v
|
||||
}
|
||||
// Client now handles sorting/pagination.
|
||||
|
||||
@ -1,32 +1,5 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Metrics {
|
||||
pub ts_unix_ms: i64,
|
||||
pub host: String,
|
||||
pub uptime_secs: u64,
|
||||
pub cpu_overall: f32,
|
||||
pub cpu_per_core: Vec<f32>,
|
||||
pub load_avg: (f64, f64, f64),
|
||||
pub mem_total_mb: u64,
|
||||
pub mem_used_mb: u64,
|
||||
pub swap_total_mb: u64,
|
||||
pub swap_used_mb: u64,
|
||||
pub net_aggregate: NetTotals,
|
||||
pub top_processes: Vec<Proc>,
|
||||
// Generated protobuf modules live under OUT_DIR; include them here.
|
||||
// This module will expose socktop::Processes and socktop::Process types.
|
||||
pub mod pb {
|
||||
include!(concat!(env!("OUT_DIR"), "/socktop.rs"));
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct NetTotals {
|
||||
pub rx_bytes: u64,
|
||||
pub tx_bytes: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Proc {
|
||||
pub pid: i32,
|
||||
pub name: String,
|
||||
pub cpu: f32,
|
||||
pub mem_mb: u64,
|
||||
pub status: String,
|
||||
}
|
||||
@ -10,7 +10,8 @@ use futures_util::StreamExt;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
|
||||
use crate::metrics::{collect_disks, collect_fast_metrics, collect_processes_top_k};
|
||||
use crate::metrics::{collect_disks, collect_fast_metrics, collect_processes_all};
|
||||
use crate::proto::pb;
|
||||
use crate::state::AppState;
|
||||
|
||||
pub async fn ws_handler(
|
||||
@ -44,8 +45,39 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
||||
let _ = send_json(&mut socket, &d).await;
|
||||
}
|
||||
Message::Text(ref text) if text == "get_processes" => {
|
||||
let p = collect_processes_top_k(&state, 50).await;
|
||||
let _ = send_json(&mut socket, &p).await;
|
||||
let payload = collect_processes_all(&state).await;
|
||||
// Map to protobuf message
|
||||
let rows: Vec<pb::Process> = payload
|
||||
.top_processes
|
||||
.into_iter()
|
||||
.map(|p| pb::Process {
|
||||
pid: p.pid,
|
||||
name: p.name,
|
||||
cpu_usage: p.cpu_usage,
|
||||
mem_bytes: p.mem_bytes,
|
||||
})
|
||||
.collect();
|
||||
let pb = pb::Processes {
|
||||
process_count: payload.process_count as u64,
|
||||
rows,
|
||||
};
|
||||
let mut buf = Vec::with_capacity(8 * 1024);
|
||||
if prost::Message::encode(&pb, &mut buf).is_err() {
|
||||
let _ = socket.send(Message::Close(None)).await;
|
||||
} else {
|
||||
// compress if large
|
||||
if buf.len() <= 768 {
|
||||
let _ = socket.send(Message::Binary(buf)).await;
|
||||
} else {
|
||||
let mut enc = GzEncoder::new(Vec::new(), Compression::fast());
|
||||
if enc.write_all(&buf).is_ok() {
|
||||
let bin = enc.finish().unwrap_or(buf);
|
||||
let _ = socket.send(Message::Binary(bin)).await;
|
||||
} else {
|
||||
let _ = socket.send(Message::Binary(buf)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Message::Close(_) => break,
|
||||
_ => {}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user