diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0bbaaf8..973def7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -5,16 +5,16 @@ on: jobs: build: + runs-on: ${{ matrix.os }} strategy: matrix: os: [ubuntu-latest, windows-latest] - runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable with: components: clippy, rustfmt - - name: Install system dependencies + - name: Install system dependencies (Linux) if: matrix.os == 'ubuntu-latest' run: sudo apt-get update && sudo apt-get install -y libdrm-dev libdrm-amdgpu1 - name: Cargo fmt @@ -23,75 +23,89 @@ jobs: run: cargo clippy --all-targets --all-features -- -D warnings - name: Build (release) run: cargo build --release --workspace - - name: Start agent (Ubuntu) + + - name: "Linux: start agent and run WS probe" if: matrix.os == 'ubuntu-latest' shell: bash run: | set -euo pipefail - # Use debug build for faster startup in CI - RUST_LOG=info cargo run -p socktop_agent -- -p 3000 & + RUST_LOG=info SOCKTOP_ENABLE_SSL=0 SOCKTOP_AGENT_GPU=0 SOCKTOP_AGENT_TEMP=0 ./target/release/socktop_agent -p 3000 > agent.log 2>&1 & AGENT_PID=$! - echo "AGENT_PID=$AGENT_PID" >> $GITHUB_ENV - # Wait for port 3000 to accept connections (30s max) for i in {1..60}; do - if bash -lc "/dev/null; then - echo "agent is ready" - break - fi - sleep 0.5 + if curl -fsS http://127.0.0.1:3000/healthz >/dev/null; then break; fi + sleep 1 done - - name: Run WS probe test (Ubuntu) - if: matrix.os == 'ubuntu-latest' - shell: bash - env: - SOCKTOP_WS: ws://127.0.0.1:3000/ws - run: | - set -euo pipefail - cargo test -p socktop --test ws_probe -- --nocapture - - name: Stop agent (Ubuntu) - if: always() && matrix.os == 'ubuntu-latest' - shell: bash - run: | - if [ -n "${AGENT_PID:-}" ]; then kill $AGENT_PID || true; fi - - name: Start agent (Windows) + if ! curl -fsS http://127.0.0.1:3000/healthz >/dev/null; then + echo "--- agent.log (tail) ---" + tail -n 200 agent.log || true + (command -v ss >/dev/null && ss -ltnp || netstat -ltnp) || true + kill $AGENT_PID || true + exit 1 + fi + SOCKTOP_WS=ws://127.0.0.1:3000/ws cargo test -p socktop --test ws_probe -- --nocapture + kill $AGENT_PID || true + + - name: "Windows: start agent and run WS probe" if: matrix.os == 'windows-latest' shell: pwsh run: | - $p = Start-Process -FilePath "cargo" -ArgumentList "run -p socktop_agent -- -p 3000" -PassThru - echo "AGENT_PID=$($p.Id)" | Out-File -FilePath $env:GITHUB_ENV -Append + $env:SOCKTOP_ENABLE_SSL = "0" + $env:SOCKTOP_AGENT_GPU = "0" + $env:SOCKTOP_AGENT_TEMP = "0" + $out = Join-Path $PWD "agent.out.txt" + $err = Join-Path $PWD "agent.err.txt" + $p = Start-Process -FilePath "${PWD}\target\release\socktop_agent.exe" -ArgumentList "-p 3000" -RedirectStandardOutput $out -RedirectStandardError $err -PassThru -NoNewWindow $ready = $false for ($i = 0; $i -lt 60; $i++) { - if (Test-NetConnection -ComputerName 127.0.0.1 -Port 3000 -InformationLevel Quiet) { $ready = $true; break } - Start-Sleep -Milliseconds 500 + $pinfo = New-Object System.Diagnostics.ProcessStartInfo + $pinfo.FileName = "curl.exe" + $pinfo.Arguments = "-fsS http://127.0.0.1:3000/healthz" + $pinfo.RedirectStandardOutput = $true + $pinfo.RedirectStandardError = $true + $pinfo.UseShellExecute = $false + $proc = [System.Diagnostics.Process]::Start($pinfo) + $proc.WaitForExit() + if ($proc.ExitCode -eq 0) { $ready = $true; break } + Start-Sleep -Seconds 1 + } + if (-not $ready) { + Write-Warning "TCP connect to (127.0.0.1 : 3000) failed" + if (Test-Path $out) { Write-Host "--- agent.out (full) ---"; Get-Content $out } + if (Test-Path $err) { Write-Host "--- agent.err (full) ---"; Get-Content $err } + Write-Host "--- netstat ---" + netstat -ano | Select-String ":3000" | ForEach-Object { $_.Line } + if ($p -and !$p.HasExited) { Stop-Process -Id $p.Id -Force -ErrorAction SilentlyContinue } + throw "agent did not become ready" } - if (-not $ready) { Write-Error "agent did not become ready" } - - name: Run WS probe test (Windows) - if: matrix.os == 'windows-latest' - shell: pwsh - run: | $env:SOCKTOP_WS = "ws://127.0.0.1:3000/ws" - cargo test -p socktop --test ws_probe -- --nocapture - - name: Stop agent (Windows) - if: always() && matrix.os == 'windows-latest' - shell: pwsh - run: | - if ($env:AGENT_PID) { Stop-Process -Id $env:AGENT_PID -Force -ErrorAction SilentlyContinue } + try { + cargo test -p socktop --test ws_probe -- --nocapture + } finally { + if ($p -and !$p.HasExited) { Stop-Process -Id $p.Id -Force -ErrorAction SilentlyContinue } + } + - name: Smoke test (client --help) run: cargo run -p socktop -- --help - - name: Package artifacts + + - name: Package artifacts (Linux) + if: matrix.os == 'ubuntu-latest' shell: bash run: | set -e - mkdir dist - if [[ "${{ matrix.os }}" == "windows-latest" ]]; then - cp target/release/socktop.exe dist/ - cp target/release/socktop_agent.exe dist/ - 7z a socktop-${{ matrix.os }}.zip dist/* - else - cp target/release/socktop dist/ - cp target/release/socktop_agent dist/ - tar czf socktop-${{ matrix.os }}.tar.gz -C dist . - fi + mkdir -p dist + cp target/release/socktop dist/ + cp target/release/socktop_agent dist/ + tar czf socktop-${{ matrix.os }}.tar.gz -C dist . + + - name: Package artifacts (Windows) + if: matrix.os == 'windows-latest' + shell: pwsh + run: | + New-Item -ItemType Directory -Force -Path dist | Out-Null + Copy-Item target\release\socktop.exe dist\ + Copy-Item target\release\socktop_agent.exe dist\ + Compress-Archive -Path dist\* -DestinationPath socktop-${{ matrix.os }}.zip -Force + - name: Upload build artifacts (ephemeral) uses: actions/upload-artifact@v4 with: @@ -99,6 +113,7 @@ jobs: path: | *.tar.gz *.zip + - name: Upload to rolling GitHub Release (main only) if: github.ref == 'refs/heads/main' && github.event_name == 'push' uses: softprops/action-gh-release@v2 diff --git a/Cargo.lock b/Cargo.lock index b4a47f4..934e637 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -591,6 +591,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.2" @@ -1322,6 +1328,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "nom" version = "7.1.3" @@ -1506,6 +1518,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project" version = "1.1.10" @@ -1608,6 +1630,122 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck", + "itertools 0.13.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + +[[package]] +name = "protoc-bin-vendored" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa" +dependencies = [ + "protoc-bin-vendored-linux-aarch_64", + "protoc-bin-vendored-linux-ppcle_64", + "protoc-bin-vendored-linux-s390_64", + "protoc-bin-vendored-linux-x86_32", + "protoc-bin-vendored-linux-x86_64", + "protoc-bin-vendored-macos-aarch_64", + "protoc-bin-vendored-macos-x86_64", + "protoc-bin-vendored-win32", +] + +[[package]] +name = "protoc-bin-vendored-linux-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c" + +[[package]] +name = "protoc-bin-vendored-linux-ppcle_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c" + +[[package]] +name = "protoc-bin-vendored-linux-s390_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0" + +[[package]] +name = "protoc-bin-vendored-linux-x86_32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5" + +[[package]] +name = "protoc-bin-vendored-linux-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78" + +[[package]] +name = "protoc-bin-vendored-macos-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092" + +[[package]] +name = "protoc-bin-vendored-macos-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756" + +[[package]] +name = "protoc-bin-vendored-win32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3" + [[package]] name = "quote" version = "1.0.40" @@ -2042,11 +2180,15 @@ version = "0.1.11" dependencies = [ "anyhow", "assert_cmd", + "bytes", "chrono", "crossterm 0.27.0", "flate2", "futures", "futures-util", + "prost", + "prost-build", + "protoc-bin-vendored", "ratatui", "rustls 0.23.31", "rustls-pemfile", @@ -2065,6 +2207,7 @@ dependencies = [ "assert_cmd", "axum", "axum-server", + "bytes", "flate2", "futures", "futures-util", @@ -2073,6 +2216,10 @@ dependencies = [ "nvml-wrapper", "once_cell", "openssl", + "prost", + "prost-build", + "prost-types", + "protoc-bin-vendored", "rustls 0.23.31", "rustls-pemfile", "serde", @@ -2080,6 +2227,7 @@ dependencies = [ "sysinfo", "tempfile", "tokio", + "tonic-build", "tracing", "tracing-subscriber", "tungstenite 0.27.0", @@ -2330,6 +2478,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index eb286dd..fa97049 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,3 +34,16 @@ chrono = { version = "0.4", features = ["serde"] } # web server (remote-agent) axum = { version = "0.7", features = ["ws"] } + +# protobuf +prost = "0.13" +prost-types = "0.13" +bytes = "1" + +[profile.release] +# Favor smaller, simpler binaries with good runtime perf +lto = "thin" +codegen-units = 1 +panic = "abort" +opt-level = 3 +strip = "symbols" \ No newline at end of file diff --git a/nohup.out b/nohup.out new file mode 100644 index 0000000..86587c9 --- /dev/null +++ b/nohup.out @@ -0,0 +1,8 @@ +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws +Error: Address already in use (os error 98) +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws +Error: Address already in use (os error 98) +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8443/ws +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8443/ws diff --git a/proto/processes.proto b/proto/processes.proto new file mode 100644 index 0000000..631e162 --- /dev/null +++ b/proto/processes.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +package socktop; + +// All running processes. Sorting is done client-side. +message Processes { + uint64 process_count = 1; // total processes in the system + repeated Process rows = 2; // all processes +} + +message Process { + uint32 pid = 1; + string name = 2; + float cpu_usage = 3; // 0..100 + uint64 mem_bytes = 4; // RSS bytes +} diff --git a/socktop/Cargo.toml b/socktop/Cargo.toml index cb9c5ce..526001a 100644 --- a/socktop/Cargo.toml +++ b/socktop/Cargo.toml @@ -21,6 +21,12 @@ anyhow = { workspace = true } flate2 = { version = "1", default-features = false, features = ["rust_backend"] } rustls = "0.23" rustls-pemfile = "2.1" +prost = { workspace = true } +bytes = { workspace = true } [dev-dependencies] -assert_cmd = "2.0" \ No newline at end of file +assert_cmd = "2.0" + +[build-dependencies] +prost-build = "0.13" +protoc-bin-vendored = "3" \ No newline at end of file diff --git a/socktop/build.rs b/socktop/build.rs new file mode 100644 index 0000000..0cf4635 --- /dev/null +++ b/socktop/build.rs @@ -0,0 +1,8 @@ +fn main() { + let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc"); + std::env::set_var("PROTOC", protoc); + let mut cfg = prost_build::Config::new(); + cfg.out_dir(std::env::var("OUT_DIR").unwrap()); + cfg.compile_protos(&["../proto/processes.proto"], &["../proto"]) + .expect("compile protos"); +} diff --git a/socktop/src/ws.rs b/socktop/src/ws.rs index 70a2db7..7f85e9a 100644 --- a/socktop/src/ws.rs +++ b/socktop/src/ws.rs @@ -2,19 +2,24 @@ use flate2::bufread::GzDecoder; use futures_util::{SinkExt, StreamExt}; +use prost::Message as _; use rustls::{ClientConfig, RootCertStore}; use rustls_pemfile::Item; -use std::io::{Read}; +use std::io::Read; use std::{fs::File, io::BufReader, sync::Arc}; use tokio::net::TcpStream; -use tokio::time::{interval, Duration}; use tokio_tungstenite::{ connect_async, connect_async_tls_with_config, tungstenite::client::IntoClientRequest, tungstenite::Message, Connector, MaybeTlsStream, WebSocketStream, }; use url::Url; -use crate::types::{DiskInfo, Metrics, ProcessesPayload}; +use crate::types::{DiskInfo, Metrics, ProcessInfo, ProcessesPayload}; + +mod pb { + // generated by build.rs + include!(concat!(env!("OUT_DIR"), "/socktop.rs")); +} pub type WsStream = WebSocketStream>; @@ -56,7 +61,6 @@ async fn connect_with_ca(url: &str, ca_path: &str) -> Result Option { if ws.send(Message::Text("get_metrics".into())).await.is_err() { @@ -79,6 +83,16 @@ fn gunzip_to_string(bytes: &[u8]) -> Option { Some(out) } +fn gunzip_to_vec(bytes: &[u8]) -> Option> { + let mut dec = GzDecoder::new(bytes); + let mut out = Vec::new(); + dec.read_to_end(&mut out).ok()?; + Some(out) +} + +fn is_gzip(bytes: &[u8]) -> bool { + bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b +} // Suppress dead_code until these are wired into the app #[allow(dead_code)] pub enum Payload { @@ -87,23 +101,6 @@ pub enum Payload { Processes(ProcessesPayload), } -#[allow(dead_code)] -fn parse_any_payload(json: &str) -> Result { - if let Ok(m) = serde_json::from_str::(json) { - return Ok(Payload::Metrics(m)); - } - if let Ok(d) = serde_json::from_str::>(json) { - return Ok(Payload::Disks(d)); - } - if let Ok(p) = serde_json::from_str::(json) { - return Ok(Payload::Processes(p)); - } - Err(serde_json::Error::io(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "unknown payload", - ))) -} - // Send a "get_disks" request and await a JSON Vec pub async fn request_disks(ws: &mut WsStream) -> Option> { if ws.send(Message::Text("get_disks".into())).await.is_err() { @@ -118,7 +115,7 @@ pub async fn request_disks(ws: &mut WsStream) -> Option> { } } -// Send a "get_processes" request and await a JSON ProcessesPayload +// Send a "get_processes" request and await a ProcessesPayload decoded from protobuf (binary, may be gzipped) pub async fn request_processes(ws: &mut WsStream) -> Option { if ws .send(Message::Text("get_processes".into())) @@ -129,68 +126,38 @@ pub async fn request_processes(ws: &mut WsStream) -> Option { } match ws.next().await { Some(Ok(Message::Binary(b))) => { - gunzip_to_string(&b).and_then(|s| serde_json::from_str::(&s).ok()) + let gz = is_gzip(&b); + let data = if gz { gunzip_to_vec(&b)? } else { b }; + match pb::Processes::decode(data.as_slice()) { + Ok(pb) => { + let rows: Vec = pb + .rows + .into_iter() + .map(|p: pb::Process| ProcessInfo { + pid: p.pid, + name: p.name, + cpu_usage: p.cpu_usage, + mem_bytes: p.mem_bytes, + }) + .collect(); + Some(ProcessesPayload { + process_count: pb.process_count as usize, + top_processes: rows, + }) + } + Err(e) => { + if std::env::var("SOCKTOP_DEBUG").ok().as_deref() == Some("1") { + eprintln!("protobuf decode failed: {e}"); + } + // Fallback: maybe it's JSON (bytes already decompressed if gz) + match String::from_utf8(data) { + Ok(s) => serde_json::from_str::(&s).ok(), + Err(_) => None, + } + } + } } Some(Ok(Message::Text(json))) => serde_json::from_str::(&json).ok(), _ => None, } } - -#[allow(dead_code)] -pub async fn start_ws_polling(mut ws: WsStream) { - let mut t_fast = interval(Duration::from_millis(500)); - let mut t_procs = interval(Duration::from_secs(2)); - let mut t_disks = interval(Duration::from_secs(5)); - - let _ = ws.send(Message::Text("get_metrics".into())).await; - let _ = ws.send(Message::Text("get_processes".into())).await; - let _ = ws.send(Message::Text("get_disks".into())).await; - - loop { - tokio::select! { - _ = t_fast.tick() => { - let _ = ws.send(Message::Text("get_metrics".into())).await; - } - _ = t_procs.tick() => { - let _ = ws.send(Message::Text("get_processes".into())).await; - } - _ = t_disks.tick() => { - let _ = ws.send(Message::Text("get_disks".into())).await; - } - maybe = ws.next() => { - let Some(result) = maybe else { break; }; - let Ok(msg) = result else { break; }; - match msg { - Message::Binary(b) => { - if let Some(json) = gunzip_to_string(&b) { - if let Ok(payload) = parse_any_payload(&json) { - match payload { - Payload::Metrics(_m) => { - // update your app state with fast metrics - } - Payload::Disks(_d) => { - // update your app state with disks - } - Payload::Processes(_p) => { - // update your app state with processes - } - } - } - } - } - Message::Text(s) => { - if let Ok(payload) = parse_any_payload(&s) { - match payload { - Payload::Metrics(_m) => {} - Payload::Disks(_d) => {} - Payload::Processes(_p) => {} - } - } - } - Message::Close(_) => break, - _ => {} - } - } - } - } -} diff --git a/socktop/tests/ws_probe.rs b/socktop/tests/ws_probe.rs index 523466a..1c7a04f 100644 --- a/socktop/tests/ws_probe.rs +++ b/socktop/tests/ws_probe.rs @@ -17,9 +17,7 @@ async fn probe_ws_endpoints() { // Optional pinned CA for WSS/self-signed setups let tls_ca = std::env::var("SOCKTOP_TLS_CA").ok(); - let mut ws = connect(&url, tls_ca.as_deref()) - .await - .expect("connect ws"); + let mut ws = connect(&url, tls_ca.as_deref()).await.expect("connect ws"); // Should get fast metrics quickly let m = request_metrics(&mut ws).await; diff --git a/socktop_agent/Cargo.toml b/socktop_agent/Cargo.toml index f9bca56..69b1a24 100644 --- a/socktop_agent/Cargo.toml +++ b/socktop_agent/Cargo.toml @@ -27,6 +27,14 @@ rustls-pemfile = "2.1" openssl = { version = "0.10", features = ["vendored"] } # for cross‑platform self‑signed generation anyhow = "1" hostname = "0.3" +bytes = { workspace = true } +prost = { workspace = true } + +[build-dependencies] +prost-build = "0.13" +prost-types = { workspace = true } +tonic-build = { version = "0.12", default-features = false, optional = true } +protoc-bin-vendored = "3" [dev-dependencies] assert_cmd = "2.0" tempfile = "3.10" \ No newline at end of file diff --git a/socktop_agent/build.rs b/socktop_agent/build.rs new file mode 100644 index 0000000..366d490 --- /dev/null +++ b/socktop_agent/build.rs @@ -0,0 +1,11 @@ +fn main() { + // Ensure protoc exists (vendored for reproducible builds) + let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc"); + std::env::set_var("PROTOC", protoc); + + // Compile protobuf definitions for processes + let mut cfg = prost_build::Config::new(); + cfg.out_dir(std::env::var("OUT_DIR").unwrap()); + cfg.compile_protos(&["../proto/processes.proto"], &["../proto"]) + .expect("compile protos"); +} diff --git a/socktop_agent/src/main.rs b/socktop_agent/src/main.rs index 7d4a22d..4f082bf 100644 --- a/socktop_agent/src/main.rs +++ b/socktop_agent/src/main.rs @@ -3,12 +3,13 @@ mod gpu; mod metrics; +mod proto; mod sampler; mod state; mod types; mod ws; -use axum::{routing::get, Router}; +use axum::{http::StatusCode, routing::get, Router}; use std::net::SocketAddr; use std::str::FromStr; @@ -47,8 +48,13 @@ async fn main() -> anyhow::Result<()> { let _h_disks = spawn_disks_sampler(state.clone(), std::time::Duration::from_secs(5)); // Web app: route /ws to the websocket handler + async fn healthz() -> StatusCode { + println!("/healthz request"); + StatusCode::OK + } let app = Router::new() .route("/ws", get(ws::ws_handler)) + .route("/healthz", get(healthz)) .with_state(state.clone()); let enable_ssl = diff --git a/socktop_agent/src/metrics.rs b/socktop_agent/src/metrics.rs index a1fc744..52b1858 100644 --- a/socktop_agent/src/metrics.rs +++ b/socktop_agent/src/metrics.rs @@ -236,9 +236,9 @@ fn read_proc_jiffies(pid: u32) -> Option { Some(utime.saturating_add(stime)) } -/// Collect top processes (Linux variant): compute CPU% via /proc jiffies delta. +/// Collect all processes (Linux): compute CPU% via /proc jiffies delta; sorting moved to client. #[cfg(target_os = "linux")] -pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload { +pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { // Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows). let mut sys = System::new(); sys.refresh_processes_specifics( @@ -291,7 +291,7 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay .collect(); return ProcessesPayload { process_count: total_count, - top_processes: top_k_sorted(procs, k), + top_processes: procs, }; } @@ -317,13 +317,13 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay ProcessesPayload { process_count: total_count, - top_processes: top_k_sorted(procs, k), + top_processes: procs, } } -/// Collect top processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh. +/// Collect all processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh. #[cfg(not(target_os = "linux"))] -pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload { +pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { use tokio::time::sleep; let mut sys = state.sys.lock().await; @@ -344,7 +344,7 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay let total_count = sys.processes().len(); - let mut procs: Vec = sys + let procs: Vec = sys .processes() .values() .map(|p| ProcessInfo { @@ -354,8 +354,6 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay mem_bytes: p.memory(), }) .collect(); - - procs = top_k_sorted(procs, k); ProcessesPayload { process_count: total_count, top_processes: procs, @@ -363,19 +361,4 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay } // Small helper to select and sort top-k by cpu -fn top_k_sorted(mut v: Vec, k: usize) -> Vec { - if v.len() > k { - v.select_nth_unstable_by(k, |a, b| { - b.cpu_usage - .partial_cmp(&a.cpu_usage) - .unwrap_or(std::cmp::Ordering::Equal) - }); - v.truncate(k); - } - v.sort_by(|a, b| { - b.cpu_usage - .partial_cmp(&a.cpu_usage) - .unwrap_or(std::cmp::Ordering::Equal) - }); - v -} +// Client now handles sorting/pagination. diff --git a/socktop_agent/src/proto.rs b/socktop_agent/src/proto.rs index 3ff466f..13c6f6a 100644 --- a/socktop_agent/src/proto.rs +++ b/socktop_agent/src/proto.rs @@ -1,32 +1,5 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Metrics { - pub ts_unix_ms: i64, - pub host: String, - pub uptime_secs: u64, - pub cpu_overall: f32, - pub cpu_per_core: Vec, - pub load_avg: (f64, f64, f64), - pub mem_total_mb: u64, - pub mem_used_mb: u64, - pub swap_total_mb: u64, - pub swap_used_mb: u64, - pub net_aggregate: NetTotals, - pub top_processes: Vec, +// Generated protobuf modules live under OUT_DIR; include them here. +// This module will expose socktop::Processes and socktop::Process types. +pub mod pb { + include!(concat!(env!("OUT_DIR"), "/socktop.rs")); } - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NetTotals { - pub rx_bytes: u64, - pub tx_bytes: u64, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Proc { - pub pid: i32, - pub name: String, - pub cpu: f32, - pub mem_mb: u64, - pub status: String, -} \ No newline at end of file diff --git a/socktop_agent/src/ws.rs b/socktop_agent/src/ws.rs index 343489e..07cee80 100644 --- a/socktop_agent/src/ws.rs +++ b/socktop_agent/src/ws.rs @@ -10,7 +10,8 @@ use futures_util::StreamExt; use std::collections::HashMap; use std::io::Write; -use crate::metrics::{collect_disks, collect_fast_metrics, collect_processes_top_k}; +use crate::metrics::{collect_disks, collect_fast_metrics, collect_processes_all}; +use crate::proto::pb; use crate::state::AppState; pub async fn ws_handler( @@ -44,8 +45,39 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { let _ = send_json(&mut socket, &d).await; } Message::Text(ref text) if text == "get_processes" => { - let p = collect_processes_top_k(&state, 50).await; - let _ = send_json(&mut socket, &p).await; + let payload = collect_processes_all(&state).await; + // Map to protobuf message + let rows: Vec = payload + .top_processes + .into_iter() + .map(|p| pb::Process { + pid: p.pid, + name: p.name, + cpu_usage: p.cpu_usage, + mem_bytes: p.mem_bytes, + }) + .collect(); + let pb = pb::Processes { + process_count: payload.process_count as u64, + rows, + }; + let mut buf = Vec::with_capacity(8 * 1024); + if prost::Message::encode(&pb, &mut buf).is_err() { + let _ = socket.send(Message::Close(None)).await; + } else { + // compress if large + if buf.len() <= 768 { + let _ = socket.send(Message::Binary(buf)).await; + } else { + let mut enc = GzEncoder::new(Vec::new(), Compression::fast()); + if enc.write_all(&buf).is_ok() { + let bin = enc.finish().unwrap_or(buf); + let _ = socket.send(Message::Binary(bin)).await; + } else { + let _ = socket.send(Message::Binary(buf)).await; + } + } + } } Message::Close(_) => break, _ => {}