Merge pull request #2 from jasonwitty/feature/protobuf-processes

Feature/protobuf processes
This commit is contained in:
jasonwitty 2025-08-21 11:50:27 -07:00 committed by GitHub
commit 4cef273e57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 401 additions and 198 deletions

View File

@ -5,16 +5,16 @@ on:
jobs: jobs:
build: build:
runs-on: ${{ matrix.os }}
strategy: strategy:
matrix: matrix:
os: [ubuntu-latest, windows-latest] os: [ubuntu-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable - uses: dtolnay/rust-toolchain@stable
with: with:
components: clippy, rustfmt components: clippy, rustfmt
- name: Install system dependencies - name: Install system dependencies (Linux)
if: matrix.os == 'ubuntu-latest' if: matrix.os == 'ubuntu-latest'
run: sudo apt-get update && sudo apt-get install -y libdrm-dev libdrm-amdgpu1 run: sudo apt-get update && sudo apt-get install -y libdrm-dev libdrm-amdgpu1
- name: Cargo fmt - name: Cargo fmt
@ -23,75 +23,89 @@ jobs:
run: cargo clippy --all-targets --all-features -- -D warnings run: cargo clippy --all-targets --all-features -- -D warnings
- name: Build (release) - name: Build (release)
run: cargo build --release --workspace run: cargo build --release --workspace
- name: Start agent (Ubuntu)
- name: "Linux: start agent and run WS probe"
if: matrix.os == 'ubuntu-latest' if: matrix.os == 'ubuntu-latest'
shell: bash shell: bash
run: | run: |
set -euo pipefail set -euo pipefail
# Use debug build for faster startup in CI RUST_LOG=info SOCKTOP_ENABLE_SSL=0 SOCKTOP_AGENT_GPU=0 SOCKTOP_AGENT_TEMP=0 ./target/release/socktop_agent -p 3000 > agent.log 2>&1 &
RUST_LOG=info cargo run -p socktop_agent -- -p 3000 &
AGENT_PID=$! AGENT_PID=$!
echo "AGENT_PID=$AGENT_PID" >> $GITHUB_ENV
# Wait for port 3000 to accept connections (30s max)
for i in {1..60}; do for i in {1..60}; do
if bash -lc "</dev/tcp/127.0.0.1/3000" &>/dev/null; then if curl -fsS http://127.0.0.1:3000/healthz >/dev/null; then break; fi
echo "agent is ready" sleep 1
break
fi
sleep 0.5
done done
- name: Run WS probe test (Ubuntu) if ! curl -fsS http://127.0.0.1:3000/healthz >/dev/null; then
if: matrix.os == 'ubuntu-latest' echo "--- agent.log (tail) ---"
shell: bash tail -n 200 agent.log || true
env: (command -v ss >/dev/null && ss -ltnp || netstat -ltnp) || true
SOCKTOP_WS: ws://127.0.0.1:3000/ws kill $AGENT_PID || true
run: | exit 1
set -euo pipefail fi
cargo test -p socktop --test ws_probe -- --nocapture SOCKTOP_WS=ws://127.0.0.1:3000/ws cargo test -p socktop --test ws_probe -- --nocapture
- name: Stop agent (Ubuntu) kill $AGENT_PID || true
if: always() && matrix.os == 'ubuntu-latest'
shell: bash - name: "Windows: start agent and run WS probe"
run: |
if [ -n "${AGENT_PID:-}" ]; then kill $AGENT_PID || true; fi
- name: Start agent (Windows)
if: matrix.os == 'windows-latest' if: matrix.os == 'windows-latest'
shell: pwsh shell: pwsh
run: | run: |
$p = Start-Process -FilePath "cargo" -ArgumentList "run -p socktop_agent -- -p 3000" -PassThru $env:SOCKTOP_ENABLE_SSL = "0"
echo "AGENT_PID=$($p.Id)" | Out-File -FilePath $env:GITHUB_ENV -Append $env:SOCKTOP_AGENT_GPU = "0"
$env:SOCKTOP_AGENT_TEMP = "0"
$out = Join-Path $PWD "agent.out.txt"
$err = Join-Path $PWD "agent.err.txt"
$p = Start-Process -FilePath "${PWD}\target\release\socktop_agent.exe" -ArgumentList "-p 3000" -RedirectStandardOutput $out -RedirectStandardError $err -PassThru -NoNewWindow
$ready = $false $ready = $false
for ($i = 0; $i -lt 60; $i++) { for ($i = 0; $i -lt 60; $i++) {
if (Test-NetConnection -ComputerName 127.0.0.1 -Port 3000 -InformationLevel Quiet) { $ready = $true; break } $pinfo = New-Object System.Diagnostics.ProcessStartInfo
Start-Sleep -Milliseconds 500 $pinfo.FileName = "curl.exe"
$pinfo.Arguments = "-fsS http://127.0.0.1:3000/healthz"
$pinfo.RedirectStandardOutput = $true
$pinfo.RedirectStandardError = $true
$pinfo.UseShellExecute = $false
$proc = [System.Diagnostics.Process]::Start($pinfo)
$proc.WaitForExit()
if ($proc.ExitCode -eq 0) { $ready = $true; break }
Start-Sleep -Seconds 1
}
if (-not $ready) {
Write-Warning "TCP connect to (127.0.0.1 : 3000) failed"
if (Test-Path $out) { Write-Host "--- agent.out (full) ---"; Get-Content $out }
if (Test-Path $err) { Write-Host "--- agent.err (full) ---"; Get-Content $err }
Write-Host "--- netstat ---"
netstat -ano | Select-String ":3000" | ForEach-Object { $_.Line }
if ($p -and !$p.HasExited) { Stop-Process -Id $p.Id -Force -ErrorAction SilentlyContinue }
throw "agent did not become ready"
} }
if (-not $ready) { Write-Error "agent did not become ready" }
- name: Run WS probe test (Windows)
if: matrix.os == 'windows-latest'
shell: pwsh
run: |
$env:SOCKTOP_WS = "ws://127.0.0.1:3000/ws" $env:SOCKTOP_WS = "ws://127.0.0.1:3000/ws"
try {
cargo test -p socktop --test ws_probe -- --nocapture cargo test -p socktop --test ws_probe -- --nocapture
- name: Stop agent (Windows) } finally {
if: always() && matrix.os == 'windows-latest' if ($p -and !$p.HasExited) { Stop-Process -Id $p.Id -Force -ErrorAction SilentlyContinue }
shell: pwsh }
run: |
if ($env:AGENT_PID) { Stop-Process -Id $env:AGENT_PID -Force -ErrorAction SilentlyContinue }
- name: Smoke test (client --help) - name: Smoke test (client --help)
run: cargo run -p socktop -- --help run: cargo run -p socktop -- --help
- name: Package artifacts
- name: Package artifacts (Linux)
if: matrix.os == 'ubuntu-latest'
shell: bash shell: bash
run: | run: |
set -e set -e
mkdir dist mkdir -p dist
if [[ "${{ matrix.os }}" == "windows-latest" ]]; then
cp target/release/socktop.exe dist/
cp target/release/socktop_agent.exe dist/
7z a socktop-${{ matrix.os }}.zip dist/*
else
cp target/release/socktop dist/ cp target/release/socktop dist/
cp target/release/socktop_agent dist/ cp target/release/socktop_agent dist/
tar czf socktop-${{ matrix.os }}.tar.gz -C dist . tar czf socktop-${{ matrix.os }}.tar.gz -C dist .
fi
- name: Package artifacts (Windows)
if: matrix.os == 'windows-latest'
shell: pwsh
run: |
New-Item -ItemType Directory -Force -Path dist | Out-Null
Copy-Item target\release\socktop.exe dist\
Copy-Item target\release\socktop_agent.exe dist\
Compress-Archive -Path dist\* -DestinationPath socktop-${{ matrix.os }}.zip -Force
- name: Upload build artifacts (ephemeral) - name: Upload build artifacts (ephemeral)
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
@ -99,6 +113,7 @@ jobs:
path: | path: |
*.tar.gz *.tar.gz
*.zip *.zip
- name: Upload to rolling GitHub Release (main only) - name: Upload to rolling GitHub Release (main only)
if: github.ref == 'refs/heads/main' && github.event_name == 'push' if: github.ref == 'refs/heads/main' && github.event_name == 'push'
uses: softprops/action-gh-release@v2 uses: softprops/action-gh-release@v2

160
Cargo.lock generated
View File

@ -591,6 +591,12 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "fixedbitset"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.1.2" version = "1.1.2"
@ -1322,6 +1328,12 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "multimap"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.3" version = "7.1.3"
@ -1506,6 +1518,16 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "petgraph"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.1.10" version = "1.1.10"
@ -1608,6 +1630,122 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "prost"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
dependencies = [
"heck",
"itertools 0.13.0",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn",
"tempfile",
]
[[package]]
name = "prost-derive"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.13.0",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
dependencies = [
"prost",
]
[[package]]
name = "protoc-bin-vendored"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa"
dependencies = [
"protoc-bin-vendored-linux-aarch_64",
"protoc-bin-vendored-linux-ppcle_64",
"protoc-bin-vendored-linux-s390_64",
"protoc-bin-vendored-linux-x86_32",
"protoc-bin-vendored-linux-x86_64",
"protoc-bin-vendored-macos-aarch_64",
"protoc-bin-vendored-macos-x86_64",
"protoc-bin-vendored-win32",
]
[[package]]
name = "protoc-bin-vendored-linux-aarch_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c"
[[package]]
name = "protoc-bin-vendored-linux-ppcle_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c"
[[package]]
name = "protoc-bin-vendored-linux-s390_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0"
[[package]]
name = "protoc-bin-vendored-linux-x86_32"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5"
[[package]]
name = "protoc-bin-vendored-linux-x86_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78"
[[package]]
name = "protoc-bin-vendored-macos-aarch_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092"
[[package]]
name = "protoc-bin-vendored-macos-x86_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756"
[[package]]
name = "protoc-bin-vendored-win32"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3"
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.40" version = "1.0.40"
@ -2042,11 +2180,15 @@ version = "0.1.11"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"assert_cmd", "assert_cmd",
"bytes",
"chrono", "chrono",
"crossterm 0.27.0", "crossterm 0.27.0",
"flate2", "flate2",
"futures", "futures",
"futures-util", "futures-util",
"prost",
"prost-build",
"protoc-bin-vendored",
"ratatui", "ratatui",
"rustls 0.23.31", "rustls 0.23.31",
"rustls-pemfile", "rustls-pemfile",
@ -2065,6 +2207,7 @@ dependencies = [
"assert_cmd", "assert_cmd",
"axum", "axum",
"axum-server", "axum-server",
"bytes",
"flate2", "flate2",
"futures", "futures",
"futures-util", "futures-util",
@ -2073,6 +2216,10 @@ dependencies = [
"nvml-wrapper", "nvml-wrapper",
"once_cell", "once_cell",
"openssl", "openssl",
"prost",
"prost-build",
"prost-types",
"protoc-bin-vendored",
"rustls 0.23.31", "rustls 0.23.31",
"rustls-pemfile", "rustls-pemfile",
"serde", "serde",
@ -2080,6 +2227,7 @@ dependencies = [
"sysinfo", "sysinfo",
"tempfile", "tempfile",
"tokio", "tokio",
"tonic-build",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"tungstenite 0.27.0", "tungstenite 0.27.0",
@ -2330,6 +2478,18 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tonic-build"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11"
dependencies = [
"prettyplease",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.13" version = "0.4.13"

View File

@ -34,3 +34,16 @@ chrono = { version = "0.4", features = ["serde"] }
# web server (remote-agent) # web server (remote-agent)
axum = { version = "0.7", features = ["ws"] } axum = { version = "0.7", features = ["ws"] }
# protobuf
prost = "0.13"
prost-types = "0.13"
bytes = "1"
[profile.release]
# Favor smaller, simpler binaries with good runtime perf
lto = "thin"
codegen-units = 1
panic = "abort"
opt-level = 3
strip = "symbols"

8
nohup.out Normal file
View File

@ -0,0 +1,8 @@
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws
Error: Address already in use (os error 98)
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws
Error: Address already in use (os error 98)
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8443/ws
socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8443/ws

15
proto/processes.proto Normal file
View File

@ -0,0 +1,15 @@
syntax = "proto3";
package socktop;
// All running processes. Sorting is done client-side.
message Processes {
uint64 process_count = 1; // total processes in the system
repeated Process rows = 2; // all processes
}
message Process {
uint32 pid = 1;
string name = 2;
float cpu_usage = 3; // 0..100
uint64 mem_bytes = 4; // RSS bytes
}

View File

@ -21,6 +21,12 @@ anyhow = { workspace = true }
flate2 = { version = "1", default-features = false, features = ["rust_backend"] } flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
rustls = "0.23" rustls = "0.23"
rustls-pemfile = "2.1" rustls-pemfile = "2.1"
prost = { workspace = true }
bytes = { workspace = true }
[dev-dependencies] [dev-dependencies]
assert_cmd = "2.0" assert_cmd = "2.0"
[build-dependencies]
prost-build = "0.13"
protoc-bin-vendored = "3"

8
socktop/build.rs Normal file
View File

@ -0,0 +1,8 @@
fn main() {
let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc");
std::env::set_var("PROTOC", protoc);
let mut cfg = prost_build::Config::new();
cfg.out_dir(std::env::var("OUT_DIR").unwrap());
cfg.compile_protos(&["../proto/processes.proto"], &["../proto"])
.expect("compile protos");
}

View File

@ -2,19 +2,24 @@
use flate2::bufread::GzDecoder; use flate2::bufread::GzDecoder;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use prost::Message as _;
use rustls::{ClientConfig, RootCertStore}; use rustls::{ClientConfig, RootCertStore};
use rustls_pemfile::Item; use rustls_pemfile::Item;
use std::io::{Read}; use std::io::Read;
use std::{fs::File, io::BufReader, sync::Arc}; use std::{fs::File, io::BufReader, sync::Arc};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::time::{interval, Duration};
use tokio_tungstenite::{ use tokio_tungstenite::{
connect_async, connect_async_tls_with_config, tungstenite::client::IntoClientRequest, connect_async, connect_async_tls_with_config, tungstenite::client::IntoClientRequest,
tungstenite::Message, Connector, MaybeTlsStream, WebSocketStream, tungstenite::Message, Connector, MaybeTlsStream, WebSocketStream,
}; };
use url::Url; use url::Url;
use crate::types::{DiskInfo, Metrics, ProcessesPayload}; use crate::types::{DiskInfo, Metrics, ProcessInfo, ProcessesPayload};
mod pb {
// generated by build.rs
include!(concat!(env!("OUT_DIR"), "/socktop.rs"));
}
pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>; pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
@ -56,7 +61,6 @@ async fn connect_with_ca(url: &str, ca_path: &str) -> Result<WsStream, Box<dyn s
Ok(ws) Ok(ws)
} }
// Send a "get_metrics" request and await a single JSON reply // Send a "get_metrics" request and await a single JSON reply
pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> { pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
if ws.send(Message::Text("get_metrics".into())).await.is_err() { if ws.send(Message::Text("get_metrics".into())).await.is_err() {
@ -79,6 +83,16 @@ fn gunzip_to_string(bytes: &[u8]) -> Option<String> {
Some(out) Some(out)
} }
fn gunzip_to_vec(bytes: &[u8]) -> Option<Vec<u8>> {
let mut dec = GzDecoder::new(bytes);
let mut out = Vec::new();
dec.read_to_end(&mut out).ok()?;
Some(out)
}
fn is_gzip(bytes: &[u8]) -> bool {
bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b
}
// Suppress dead_code until these are wired into the app // Suppress dead_code until these are wired into the app
#[allow(dead_code)] #[allow(dead_code)]
pub enum Payload { pub enum Payload {
@ -87,23 +101,6 @@ pub enum Payload {
Processes(ProcessesPayload), Processes(ProcessesPayload),
} }
#[allow(dead_code)]
fn parse_any_payload(json: &str) -> Result<Payload, serde_json::Error> {
if let Ok(m) = serde_json::from_str::<Metrics>(json) {
return Ok(Payload::Metrics(m));
}
if let Ok(d) = serde_json::from_str::<Vec<DiskInfo>>(json) {
return Ok(Payload::Disks(d));
}
if let Ok(p) = serde_json::from_str::<ProcessesPayload>(json) {
return Ok(Payload::Processes(p));
}
Err(serde_json::Error::io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"unknown payload",
)))
}
// Send a "get_disks" request and await a JSON Vec<DiskInfo> // Send a "get_disks" request and await a JSON Vec<DiskInfo>
pub async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> { pub async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
if ws.send(Message::Text("get_disks".into())).await.is_err() { if ws.send(Message::Text("get_disks".into())).await.is_err() {
@ -118,7 +115,7 @@ pub async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
} }
} }
// Send a "get_processes" request and await a JSON ProcessesPayload // Send a "get_processes" request and await a ProcessesPayload decoded from protobuf (binary, may be gzipped)
pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> { pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
if ws if ws
.send(Message::Text("get_processes".into())) .send(Message::Text("get_processes".into()))
@ -129,68 +126,38 @@ pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
} }
match ws.next().await { match ws.next().await {
Some(Ok(Message::Binary(b))) => { Some(Ok(Message::Binary(b))) => {
gunzip_to_string(&b).and_then(|s| serde_json::from_str::<ProcessesPayload>(&s).ok()) let gz = is_gzip(&b);
let data = if gz { gunzip_to_vec(&b)? } else { b };
match pb::Processes::decode(data.as_slice()) {
Ok(pb) => {
let rows: Vec<ProcessInfo> = pb
.rows
.into_iter()
.map(|p: pb::Process| ProcessInfo {
pid: p.pid,
name: p.name,
cpu_usage: p.cpu_usage,
mem_bytes: p.mem_bytes,
})
.collect();
Some(ProcessesPayload {
process_count: pb.process_count as usize,
top_processes: rows,
})
}
Err(e) => {
if std::env::var("SOCKTOP_DEBUG").ok().as_deref() == Some("1") {
eprintln!("protobuf decode failed: {e}");
}
// Fallback: maybe it's JSON (bytes already decompressed if gz)
match String::from_utf8(data) {
Ok(s) => serde_json::from_str::<ProcessesPayload>(&s).ok(),
Err(_) => None,
}
}
}
} }
Some(Ok(Message::Text(json))) => serde_json::from_str::<ProcessesPayload>(&json).ok(), Some(Ok(Message::Text(json))) => serde_json::from_str::<ProcessesPayload>(&json).ok(),
_ => None, _ => None,
} }
} }
#[allow(dead_code)]
pub async fn start_ws_polling(mut ws: WsStream) {
let mut t_fast = interval(Duration::from_millis(500));
let mut t_procs = interval(Duration::from_secs(2));
let mut t_disks = interval(Duration::from_secs(5));
let _ = ws.send(Message::Text("get_metrics".into())).await;
let _ = ws.send(Message::Text("get_processes".into())).await;
let _ = ws.send(Message::Text("get_disks".into())).await;
loop {
tokio::select! {
_ = t_fast.tick() => {
let _ = ws.send(Message::Text("get_metrics".into())).await;
}
_ = t_procs.tick() => {
let _ = ws.send(Message::Text("get_processes".into())).await;
}
_ = t_disks.tick() => {
let _ = ws.send(Message::Text("get_disks".into())).await;
}
maybe = ws.next() => {
let Some(result) = maybe else { break; };
let Ok(msg) = result else { break; };
match msg {
Message::Binary(b) => {
if let Some(json) = gunzip_to_string(&b) {
if let Ok(payload) = parse_any_payload(&json) {
match payload {
Payload::Metrics(_m) => {
// update your app state with fast metrics
}
Payload::Disks(_d) => {
// update your app state with disks
}
Payload::Processes(_p) => {
// update your app state with processes
}
}
}
}
}
Message::Text(s) => {
if let Ok(payload) = parse_any_payload(&s) {
match payload {
Payload::Metrics(_m) => {}
Payload::Disks(_d) => {}
Payload::Processes(_p) => {}
}
}
}
Message::Close(_) => break,
_ => {}
}
}
}
}
}

View File

@ -17,9 +17,7 @@ async fn probe_ws_endpoints() {
// Optional pinned CA for WSS/self-signed setups // Optional pinned CA for WSS/self-signed setups
let tls_ca = std::env::var("SOCKTOP_TLS_CA").ok(); let tls_ca = std::env::var("SOCKTOP_TLS_CA").ok();
let mut ws = connect(&url, tls_ca.as_deref()) let mut ws = connect(&url, tls_ca.as_deref()).await.expect("connect ws");
.await
.expect("connect ws");
// Should get fast metrics quickly // Should get fast metrics quickly
let m = request_metrics(&mut ws).await; let m = request_metrics(&mut ws).await;

View File

@ -27,6 +27,14 @@ rustls-pemfile = "2.1"
openssl = { version = "0.10", features = ["vendored"] } # for crossplatform selfsigned generation openssl = { version = "0.10", features = ["vendored"] } # for crossplatform selfsigned generation
anyhow = "1" anyhow = "1"
hostname = "0.3" hostname = "0.3"
bytes = { workspace = true }
prost = { workspace = true }
[build-dependencies]
prost-build = "0.13"
prost-types = { workspace = true }
tonic-build = { version = "0.12", default-features = false, optional = true }
protoc-bin-vendored = "3"
[dev-dependencies] [dev-dependencies]
assert_cmd = "2.0" assert_cmd = "2.0"
tempfile = "3.10" tempfile = "3.10"

11
socktop_agent/build.rs Normal file
View File

@ -0,0 +1,11 @@
fn main() {
// Ensure protoc exists (vendored for reproducible builds)
let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc");
std::env::set_var("PROTOC", protoc);
// Compile protobuf definitions for processes
let mut cfg = prost_build::Config::new();
cfg.out_dir(std::env::var("OUT_DIR").unwrap());
cfg.compile_protos(&["../proto/processes.proto"], &["../proto"])
.expect("compile protos");
}

View File

@ -3,12 +3,13 @@
mod gpu; mod gpu;
mod metrics; mod metrics;
mod proto;
mod sampler; mod sampler;
mod state; mod state;
mod types; mod types;
mod ws; mod ws;
use axum::{routing::get, Router}; use axum::{http::StatusCode, routing::get, Router};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::str::FromStr; use std::str::FromStr;
@ -47,8 +48,13 @@ async fn main() -> anyhow::Result<()> {
let _h_disks = spawn_disks_sampler(state.clone(), std::time::Duration::from_secs(5)); let _h_disks = spawn_disks_sampler(state.clone(), std::time::Duration::from_secs(5));
// Web app: route /ws to the websocket handler // Web app: route /ws to the websocket handler
async fn healthz() -> StatusCode {
println!("/healthz request");
StatusCode::OK
}
let app = Router::new() let app = Router::new()
.route("/ws", get(ws::ws_handler)) .route("/ws", get(ws::ws_handler))
.route("/healthz", get(healthz))
.with_state(state.clone()); .with_state(state.clone());
let enable_ssl = let enable_ssl =

View File

@ -236,9 +236,9 @@ fn read_proc_jiffies(pid: u32) -> Option<u64> {
Some(utime.saturating_add(stime)) Some(utime.saturating_add(stime))
} }
/// Collect top processes (Linux variant): compute CPU% via /proc jiffies delta. /// Collect all processes (Linux): compute CPU% via /proc jiffies delta; sorting moved to client.
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload { pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
// Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows). // Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows).
let mut sys = System::new(); let mut sys = System::new();
sys.refresh_processes_specifics( sys.refresh_processes_specifics(
@ -291,7 +291,7 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
.collect(); .collect();
return ProcessesPayload { return ProcessesPayload {
process_count: total_count, process_count: total_count,
top_processes: top_k_sorted(procs, k), top_processes: procs,
}; };
} }
@ -317,13 +317,13 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
ProcessesPayload { ProcessesPayload {
process_count: total_count, process_count: total_count,
top_processes: top_k_sorted(procs, k), top_processes: procs,
} }
} }
/// Collect top processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh. /// Collect all processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh.
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload { pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload {
use tokio::time::sleep; use tokio::time::sleep;
let mut sys = state.sys.lock().await; let mut sys = state.sys.lock().await;
@ -344,7 +344,7 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
let total_count = sys.processes().len(); let total_count = sys.processes().len();
let mut procs: Vec<ProcessInfo> = sys let procs: Vec<ProcessInfo> = sys
.processes() .processes()
.values() .values()
.map(|p| ProcessInfo { .map(|p| ProcessInfo {
@ -354,8 +354,6 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
mem_bytes: p.memory(), mem_bytes: p.memory(),
}) })
.collect(); .collect();
procs = top_k_sorted(procs, k);
ProcessesPayload { ProcessesPayload {
process_count: total_count, process_count: total_count,
top_processes: procs, top_processes: procs,
@ -363,19 +361,4 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
} }
// Small helper to select and sort top-k by cpu // Small helper to select and sort top-k by cpu
fn top_k_sorted(mut v: Vec<ProcessInfo>, k: usize) -> Vec<ProcessInfo> { // Client now handles sorting/pagination.
if v.len() > k {
v.select_nth_unstable_by(k, |a, b| {
b.cpu_usage
.partial_cmp(&a.cpu_usage)
.unwrap_or(std::cmp::Ordering::Equal)
});
v.truncate(k);
}
v.sort_by(|a, b| {
b.cpu_usage
.partial_cmp(&a.cpu_usage)
.unwrap_or(std::cmp::Ordering::Equal)
});
v
}

View File

@ -1,32 +1,5 @@
use serde::{Deserialize, Serialize}; // Generated protobuf modules live under OUT_DIR; include them here.
// This module will expose socktop::Processes and socktop::Process types.
#[derive(Debug, Clone, Serialize, Deserialize)] pub mod pb {
pub struct Metrics { include!(concat!(env!("OUT_DIR"), "/socktop.rs"));
pub ts_unix_ms: i64,
pub host: String,
pub uptime_secs: u64,
pub cpu_overall: f32,
pub cpu_per_core: Vec<f32>,
pub load_avg: (f64, f64, f64),
pub mem_total_mb: u64,
pub mem_used_mb: u64,
pub swap_total_mb: u64,
pub swap_used_mb: u64,
pub net_aggregate: NetTotals,
pub top_processes: Vec<Proc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetTotals {
pub rx_bytes: u64,
pub tx_bytes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Proc {
pub pid: i32,
pub name: String,
pub cpu: f32,
pub mem_mb: u64,
pub status: String,
} }

View File

@ -10,7 +10,8 @@ use futures_util::StreamExt;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write; use std::io::Write;
use crate::metrics::{collect_disks, collect_fast_metrics, collect_processes_top_k}; use crate::metrics::{collect_disks, collect_fast_metrics, collect_processes_all};
use crate::proto::pb;
use crate::state::AppState; use crate::state::AppState;
pub async fn ws_handler( pub async fn ws_handler(
@ -44,8 +45,39 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
let _ = send_json(&mut socket, &d).await; let _ = send_json(&mut socket, &d).await;
} }
Message::Text(ref text) if text == "get_processes" => { Message::Text(ref text) if text == "get_processes" => {
let p = collect_processes_top_k(&state, 50).await; let payload = collect_processes_all(&state).await;
let _ = send_json(&mut socket, &p).await; // Map to protobuf message
let rows: Vec<pb::Process> = payload
.top_processes
.into_iter()
.map(|p| pb::Process {
pid: p.pid,
name: p.name,
cpu_usage: p.cpu_usage,
mem_bytes: p.mem_bytes,
})
.collect();
let pb = pb::Processes {
process_count: payload.process_count as u64,
rows,
};
let mut buf = Vec::with_capacity(8 * 1024);
if prost::Message::encode(&pb, &mut buf).is_err() {
let _ = socket.send(Message::Close(None)).await;
} else {
// compress if large
if buf.len() <= 768 {
let _ = socket.send(Message::Binary(buf)).await;
} else {
let mut enc = GzEncoder::new(Vec::new(), Compression::fast());
if enc.write_all(&buf).is_ok() {
let bin = enc.finish().unwrap_or(buf);
let _ = socket.send(Message::Binary(bin)).await;
} else {
let _ = socket.send(Message::Binary(buf)).await;
}
}
}
} }
Message::Close(_) => break, Message::Close(_) => break,
_ => {} _ => {}