patch for macbook compatibility issues.
This commit is contained in:
parent
f980b6ace9
commit
c6b8c9c905
53
.github/workflows/ci.yml
vendored
53
.github/workflows/ci.yml
vendored
@ -23,6 +23,59 @@ jobs:
|
|||||||
run: cargo clippy --all-targets --all-features -- -D warnings
|
run: cargo clippy --all-targets --all-features -- -D warnings
|
||||||
- name: Build (release)
|
- name: Build (release)
|
||||||
run: cargo build --release --workspace
|
run: cargo build --release --workspace
|
||||||
|
- name: Start agent (Ubuntu)
|
||||||
|
if: matrix.os == 'ubuntu-latest'
|
||||||
|
shell: bash
|
||||||
|
run: |
|
||||||
|
set -euo pipefail
|
||||||
|
# Use debug build for faster startup in CI
|
||||||
|
RUST_LOG=info cargo run -p socktop_agent -- -p 3000 &
|
||||||
|
AGENT_PID=$!
|
||||||
|
echo "AGENT_PID=$AGENT_PID" >> $GITHUB_ENV
|
||||||
|
# Wait for port 3000 to accept connections (30s max)
|
||||||
|
for i in {1..60}; do
|
||||||
|
if bash -lc "</dev/tcp/127.0.0.1/3000" &>/dev/null; then
|
||||||
|
echo "agent is ready"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 0.5
|
||||||
|
done
|
||||||
|
- name: Run WS probe test (Ubuntu)
|
||||||
|
if: matrix.os == 'ubuntu-latest'
|
||||||
|
shell: bash
|
||||||
|
env:
|
||||||
|
SOCKTOP_WS: ws://127.0.0.1:3000/ws
|
||||||
|
run: |
|
||||||
|
set -euo pipefail
|
||||||
|
cargo test -p socktop --test ws_probe -- --nocapture
|
||||||
|
- name: Stop agent (Ubuntu)
|
||||||
|
if: always() && matrix.os == 'ubuntu-latest'
|
||||||
|
shell: bash
|
||||||
|
run: |
|
||||||
|
if [ -n "${AGENT_PID:-}" ]; then kill $AGENT_PID || true; fi
|
||||||
|
- name: Start agent (Windows)
|
||||||
|
if: matrix.os == 'windows-latest'
|
||||||
|
shell: pwsh
|
||||||
|
run: |
|
||||||
|
$p = Start-Process -FilePath "cargo" -ArgumentList "run -p socktop_agent -- -p 3000" -PassThru
|
||||||
|
echo "AGENT_PID=$($p.Id)" | Out-File -FilePath $env:GITHUB_ENV -Append
|
||||||
|
$ready = $false
|
||||||
|
for ($i = 0; $i -lt 60; $i++) {
|
||||||
|
if (Test-NetConnection -ComputerName 127.0.0.1 -Port 3000 -InformationLevel Quiet) { $ready = $true; break }
|
||||||
|
Start-Sleep -Milliseconds 500
|
||||||
|
}
|
||||||
|
if (-not $ready) { Write-Error "agent did not become ready" }
|
||||||
|
- name: Run WS probe test (Windows)
|
||||||
|
if: matrix.os == 'windows-latest'
|
||||||
|
shell: pwsh
|
||||||
|
run: |
|
||||||
|
$env:SOCKTOP_WS = "ws://127.0.0.1:3000/ws"
|
||||||
|
cargo test -p socktop --test ws_probe -- --nocapture
|
||||||
|
- name: Stop agent (Windows)
|
||||||
|
if: always() && matrix.os == 'windows-latest'
|
||||||
|
shell: pwsh
|
||||||
|
run: |
|
||||||
|
if ($env:AGENT_PID) { Stop-Process -Id $env:AGENT_PID -Force -ErrorAction SilentlyContinue }
|
||||||
- name: Smoke test (client --help)
|
- name: Smoke test (client --help)
|
||||||
run: cargo run -p socktop -- --help
|
run: cargo run -p socktop -- --help
|
||||||
- name: Package artifacts
|
- name: Package artifacts
|
||||||
|
|||||||
@ -50,7 +50,7 @@ exec bash # or: exec zsh / exec fish
|
|||||||
|
|
||||||
Windows (for the brave): install from https://rustup.rs with the MSVC toolchain. Yes, you’ll need Visual Studio Build Tools. You chose Windows — enjoy the ride.
|
Windows (for the brave): install from https://rustup.rs with the MSVC toolchain. Yes, you’ll need Visual Studio Build Tools. You chose Windows — enjoy the ride.
|
||||||
|
|
||||||
### Raspberry Pi / Ubuntu (required)
|
### Raspberry Pi / Ubuntu / PopOS (required)
|
||||||
|
|
||||||
Install GPU support with apt command below
|
Install GPU support with apt command below
|
||||||
|
|
||||||
|
|||||||
3
rust-toolchain.toml
Normal file
3
rust-toolchain.toml
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
[toolchain]
|
||||||
|
channel = "stable"
|
||||||
|
components = ["clippy", "rustfmt"]
|
||||||
@ -63,6 +63,9 @@ pub struct App {
|
|||||||
last_disks_poll: Instant,
|
last_disks_poll: Instant,
|
||||||
procs_interval: Duration,
|
procs_interval: Duration,
|
||||||
disks_interval: Duration,
|
disks_interval: Duration,
|
||||||
|
|
||||||
|
// For reconnects
|
||||||
|
ws_url: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl App {
|
impl App {
|
||||||
@ -91,11 +94,13 @@ impl App {
|
|||||||
.unwrap_or_else(Instant::now),
|
.unwrap_or_else(Instant::now),
|
||||||
procs_interval: Duration::from_secs(2),
|
procs_interval: Duration::from_secs(2),
|
||||||
disks_interval: Duration::from_secs(5),
|
disks_interval: Duration::from_secs(5),
|
||||||
|
ws_url: String::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&mut self, url: &str) -> Result<(), Box<dyn std::error::Error>> {
|
pub async fn run(&mut self, url: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
// Connect to agent
|
// Connect to agent
|
||||||
|
self.ws_url = url.to_string();
|
||||||
let mut ws = connect(url).await?;
|
let mut ws = connect(url).await?;
|
||||||
|
|
||||||
// Terminal setup
|
// Terminal setup
|
||||||
@ -244,7 +249,10 @@ impl App {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch and update
|
// Draw current frame first so the UI never feels blocked
|
||||||
|
terminal.draw(|f| self.draw(f))?;
|
||||||
|
|
||||||
|
// Then fetch and update
|
||||||
if let Some(m) = request_metrics(ws).await {
|
if let Some(m) = request_metrics(ws).await {
|
||||||
self.update_with_metrics(m);
|
self.update_with_metrics(m);
|
||||||
|
|
||||||
@ -268,10 +276,12 @@ impl App {
|
|||||||
}
|
}
|
||||||
self.last_disks_poll = Instant::now();
|
self.last_disks_poll = Instant::now();
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// If we couldn't get metrics, try to reconnect once
|
||||||
|
if let Ok(new_ws) = connect(&self.ws_url).await {
|
||||||
|
*ws = new_ws;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Draw
|
|
||||||
terminal.draw(|f| self.draw(f))?;
|
|
||||||
|
|
||||||
// Tick rate
|
// Tick rate
|
||||||
sleep(Duration::from_millis(500)).await;
|
sleep(Duration::from_millis(500)).await;
|
||||||
@ -461,6 +471,7 @@ impl Default for App {
|
|||||||
.unwrap_or_else(Instant::now),
|
.unwrap_or_else(Instant::now),
|
||||||
procs_interval: Duration::from_secs(2),
|
procs_interval: Duration::from_secs(2),
|
||||||
disks_interval: Duration::from_secs(5),
|
disks_interval: Duration::from_secs(5),
|
||||||
|
ws_url: String::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
4
socktop/src/lib.rs
Normal file
4
socktop/src/lib.rs
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
//! Library surface for integration tests and reuse.
|
||||||
|
|
||||||
|
pub mod types;
|
||||||
|
pub mod ws;
|
||||||
@ -1,44 +1,82 @@
|
|||||||
//! Minimal WebSocket client helpers for requesting metrics from the agent.
|
//! Minimal WebSocket client helpers for requesting metrics from the agent.
|
||||||
|
|
||||||
use flate2::bufread::GzDecoder;
|
use flate2::read::GzDecoder;
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use std::io::Read;
|
use std::io::{Cursor, Read};
|
||||||
|
use std::sync::OnceLock;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::time::{interval, Duration};
|
use tokio::time::{interval, timeout, Duration};
|
||||||
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
|
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
|
||||||
|
|
||||||
use crate::types::{DiskInfo, Metrics, ProcessesPayload};
|
use crate::types::{DiskInfo, Metrics, ProcessesPayload};
|
||||||
|
|
||||||
pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
||||||
|
|
||||||
// Connect to the agent and return the WS stream
|
#[inline]
|
||||||
pub async fn connect(url: &str) -> Result<WsStream, Box<dyn std::error::Error>> {
|
fn debug_on() -> bool {
|
||||||
let (ws, _) = connect_async(url).await?;
|
static ON: OnceLock<bool> = OnceLock::new();
|
||||||
Ok(ws)
|
*ON.get_or_init(|| {
|
||||||
|
std::env::var("SOCKTOP_DEBUG")
|
||||||
|
.map(|v| v != "0")
|
||||||
|
.unwrap_or(false)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a "get_metrics" request and await a single JSON reply
|
fn log_msg(msg: &Message) {
|
||||||
pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
|
match msg {
|
||||||
if ws.send(Message::Text("get_metrics".into())).await.is_err() {
|
Message::Binary(b) => eprintln!("ws: Binary {} bytes", b.len()),
|
||||||
return None;
|
Message::Text(s) => eprintln!("ws: Text {} bytes", s.len()),
|
||||||
|
Message::Close(_) => eprintln!("ws: Close"),
|
||||||
|
_ => eprintln!("ws: Other frame"),
|
||||||
}
|
}
|
||||||
match ws.next().await {
|
}
|
||||||
Some(Ok(Message::Binary(b))) => {
|
|
||||||
gunzip_to_string(&b).and_then(|s| serde_json::from_str::<Metrics>(&s).ok())
|
// Connect to the agent and return the WS stream
|
||||||
|
pub async fn connect(url: &str) -> Result<WsStream, Box<dyn std::error::Error>> {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: connecting to {url}");
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Text(json))) => serde_json::from_str::<Metrics>(&json).ok(),
|
let (ws, _) = connect_async(url).await?;
|
||||||
_ => None,
|
if debug_on() {
|
||||||
|
eprintln!("ws: connected");
|
||||||
}
|
}
|
||||||
|
Ok(ws)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decompress a gzip-compressed binary frame into a String.
|
// Decompress a gzip-compressed binary frame into a String.
|
||||||
fn gunzip_to_string(bytes: &[u8]) -> Option<String> {
|
fn gunzip_to_string(bytes: &[u8]) -> Option<String> {
|
||||||
let mut dec = GzDecoder::new(bytes);
|
let cursor = Cursor::new(bytes);
|
||||||
|
let mut dec = GzDecoder::new(cursor);
|
||||||
let mut out = String::new();
|
let mut out = String::new();
|
||||||
dec.read_to_string(&mut out).ok()?;
|
dec.read_to_string(&mut out).ok()?;
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: gunzip decoded {} bytes", out.len());
|
||||||
|
}
|
||||||
Some(out)
|
Some(out)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn message_to_json(msg: &Message) -> Option<String> {
|
||||||
|
match msg {
|
||||||
|
Message::Binary(b) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: <- Binary frame {} bytes", b.len());
|
||||||
|
}
|
||||||
|
if let Some(s) = gunzip_to_string(b) {
|
||||||
|
return Some(s);
|
||||||
|
}
|
||||||
|
// Fallback: try interpreting as UTF-8 JSON in a binary frame
|
||||||
|
String::from_utf8(b.clone()).ok()
|
||||||
|
}
|
||||||
|
Message::Text(s) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: <- Text frame {} bytes", s.len());
|
||||||
|
}
|
||||||
|
Some(s.clone())
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Suppress dead_code until these are wired into the app
|
// Suppress dead_code until these are wired into the app
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub enum Payload {
|
pub enum Payload {
|
||||||
@ -64,22 +102,108 @@ fn parse_any_payload(json: &str) -> Result<Payload, serde_json::Error> {
|
|||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send a "get_metrics" request and await a single JSON reply
|
||||||
|
pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: -> get_metrics");
|
||||||
|
}
|
||||||
|
if ws.send(Message::Text("get_metrics".into())).await.is_err() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
// Drain a few messages until we find Metrics (handle out-of-order replies)
|
||||||
|
for _ in 0..8 {
|
||||||
|
match timeout(Duration::from_millis(800), ws.next()).await {
|
||||||
|
Ok(Some(Ok(msg))) => {
|
||||||
|
if debug_on() {
|
||||||
|
log_msg(&msg);
|
||||||
|
}
|
||||||
|
if let Some(json) = message_to_json(&msg) {
|
||||||
|
match parse_any_payload(&json) {
|
||||||
|
Ok(Payload::Metrics(m)) => return Some(m),
|
||||||
|
Ok(Payload::Disks(_)) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: got Disks while waiting for Metrics");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Payload::Processes(_)) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: got Processes while waiting for Metrics");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_e) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!(
|
||||||
|
"ws: unknown payload while waiting for Metrics (len={})",
|
||||||
|
json.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if debug_on() {
|
||||||
|
eprintln!("ws: non-json frame while waiting for Metrics");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Some(Err(_e))) => continue,
|
||||||
|
Ok(None) => return None,
|
||||||
|
Err(_elapsed) => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
// Send a "get_disks" request and await a JSON Vec<DiskInfo>
|
// Send a "get_disks" request and await a JSON Vec<DiskInfo>
|
||||||
pub async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
|
pub async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: -> get_disks");
|
||||||
|
}
|
||||||
if ws.send(Message::Text("get_disks".into())).await.is_err() {
|
if ws.send(Message::Text("get_disks".into())).await.is_err() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
match ws.next().await {
|
for _ in 0..8 {
|
||||||
Some(Ok(Message::Binary(b))) => {
|
match timeout(Duration::from_millis(800), ws.next()).await {
|
||||||
gunzip_to_string(&b).and_then(|s| serde_json::from_str::<Vec<DiskInfo>>(&s).ok())
|
Ok(Some(Ok(msg))) => {
|
||||||
|
if debug_on() {
|
||||||
|
log_msg(&msg);
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Text(json))) => serde_json::from_str::<Vec<DiskInfo>>(&json).ok(),
|
if let Some(json) = message_to_json(&msg) {
|
||||||
_ => None,
|
match parse_any_payload(&json) {
|
||||||
|
Ok(Payload::Disks(d)) => return Some(d),
|
||||||
|
Ok(Payload::Metrics(_)) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: got Metrics while waiting for Disks");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
Ok(Payload::Processes(_)) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: got Processes while waiting for Disks");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_e) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!(
|
||||||
|
"ws: unknown payload while waiting for Disks (len={})",
|
||||||
|
json.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if debug_on() {
|
||||||
|
eprintln!("ws: non-json frame while waiting for Disks");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Some(Err(_e))) => continue,
|
||||||
|
Ok(None) => return None,
|
||||||
|
Err(_elapsed) => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a "get_processes" request and await a JSON ProcessesPayload
|
// Send a "get_processes" request and await a JSON ProcessesPayload
|
||||||
pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
|
pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: -> get_processes");
|
||||||
|
}
|
||||||
if ws
|
if ws
|
||||||
.send(Message::Text("get_processes".into()))
|
.send(Message::Text("get_processes".into()))
|
||||||
.await
|
.await
|
||||||
@ -87,13 +211,45 @@ pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
|
|||||||
{
|
{
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
match ws.next().await {
|
for _ in 0..16 {
|
||||||
Some(Ok(Message::Binary(b))) => {
|
// allow a few more cycles due to gzip size
|
||||||
gunzip_to_string(&b).and_then(|s| serde_json::from_str::<ProcessesPayload>(&s).ok())
|
match timeout(Duration::from_millis(1200), ws.next()).await {
|
||||||
|
Ok(Some(Ok(msg))) => {
|
||||||
|
if debug_on() {
|
||||||
|
log_msg(&msg);
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Text(json))) => serde_json::from_str::<ProcessesPayload>(&json).ok(),
|
if let Some(json) = message_to_json(&msg) {
|
||||||
_ => None,
|
match parse_any_payload(&json) {
|
||||||
|
Ok(Payload::Processes(p)) => return Some(p),
|
||||||
|
Ok(Payload::Metrics(_)) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: got Metrics while waiting for Processes");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
Ok(Payload::Disks(_)) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!("ws: got Disks while waiting for Processes");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_e) => {
|
||||||
|
if debug_on() {
|
||||||
|
eprintln!(
|
||||||
|
"ws: unknown payload while waiting for Processes (len={})",
|
||||||
|
json.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if debug_on() {
|
||||||
|
eprintln!("ws: non-json frame while waiting for Processes");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Some(Err(_e))) => continue,
|
||||||
|
Ok(None) => return None,
|
||||||
|
Err(_elapsed) => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
@ -120,20 +276,15 @@ pub async fn start_ws_polling(mut ws: WsStream) {
|
|||||||
maybe = ws.next() => {
|
maybe = ws.next() => {
|
||||||
let Some(result) = maybe else { break; };
|
let Some(result) = maybe else { break; };
|
||||||
let Ok(msg) = result else { break; };
|
let Ok(msg) = result else { break; };
|
||||||
|
if debug_on() { log_msg(&msg); }
|
||||||
match msg {
|
match msg {
|
||||||
Message::Binary(b) => {
|
Message::Binary(b) => {
|
||||||
if let Some(json) = gunzip_to_string(&b) {
|
if let Some(json) = gunzip_to_string(&b) {
|
||||||
if let Ok(payload) = parse_any_payload(&json) {
|
if let Ok(payload) = parse_any_payload(&json) {
|
||||||
match payload {
|
match payload {
|
||||||
Payload::Metrics(_m) => {
|
Payload::Metrics(_m) => {},
|
||||||
// update your app state with fast metrics
|
Payload::Disks(_d) => {},
|
||||||
}
|
Payload::Processes(_p) => {},
|
||||||
Payload::Disks(_d) => {
|
|
||||||
// update your app state with disks
|
|
||||||
}
|
|
||||||
Payload::Processes(_p) => {
|
|
||||||
// update your app state with processes
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -141,9 +292,9 @@ pub async fn start_ws_polling(mut ws: WsStream) {
|
|||||||
Message::Text(s) => {
|
Message::Text(s) => {
|
||||||
if let Ok(payload) = parse_any_payload(&s) {
|
if let Ok(payload) = parse_any_payload(&s) {
|
||||||
match payload {
|
match payload {
|
||||||
Payload::Metrics(_m) => {}
|
Payload::Metrics(_m) => {},
|
||||||
Payload::Disks(_d) => {}
|
Payload::Disks(_d) => {},
|
||||||
Payload::Processes(_p) => {}
|
Payload::Processes(_p) => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
27
socktop/tests/ws_probe.rs
Normal file
27
socktop/tests/ws_probe.rs
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
use socktop::ws::{connect, request_metrics, request_processes};
|
||||||
|
|
||||||
|
// Integration probe: only runs when SOCKTOP_WS is set to an agent WebSocket URL.
|
||||||
|
// Example: SOCKTOP_WS=ws://127.0.0.1:3000/ws cargo test -p socktop --test ws_probe -- --nocapture
|
||||||
|
#[tokio::test]
|
||||||
|
async fn probe_ws_endpoints() {
|
||||||
|
// Gate the test to avoid CI failures when no agent is running.
|
||||||
|
let url = match std::env::var("SOCKTOP_WS") {
|
||||||
|
Ok(v) if !v.is_empty() => v,
|
||||||
|
_ => {
|
||||||
|
eprintln!(
|
||||||
|
"skipping ws_probe: set SOCKTOP_WS=ws://host:port/ws to run this integration test"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut ws = connect(&url).await.expect("connect ws");
|
||||||
|
|
||||||
|
// Should get fast metrics quickly
|
||||||
|
let m = request_metrics(&mut ws).await;
|
||||||
|
assert!(m.is_some(), "expected Metrics payload within timeout");
|
||||||
|
|
||||||
|
// Processes may be gzipped and a bit slower, but should arrive
|
||||||
|
let p = request_processes(&mut ws).await;
|
||||||
|
assert!(p.is_some(), "expected Processes payload within timeout");
|
||||||
|
}
|
||||||
@ -5,7 +5,9 @@ use crate::state::AppState;
|
|||||||
use crate::types::{DiskInfo, Metrics, NetworkInfo, ProcessInfo, ProcessesPayload};
|
use crate::types::{DiskInfo, Metrics, NetworkInfo, ProcessInfo, ProcessesPayload};
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
@ -198,6 +200,8 @@ pub async fn collect_disks(state: &AppState) -> Vec<DiskInfo> {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Linux-only helpers and implementation using /proc deltas for accurate CPU%.
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
#[inline]
|
#[inline]
|
||||||
fn read_total_jiffies() -> io::Result<u64> {
|
fn read_total_jiffies() -> io::Result<u64> {
|
||||||
// /proc/stat first line: "cpu user nice system idle iowait irq softirq steal ..."
|
// /proc/stat first line: "cpu user nice system idle iowait irq softirq steal ..."
|
||||||
@ -216,6 +220,7 @@ fn read_total_jiffies() -> io::Result<u64> {
|
|||||||
Err(io::Error::other("no cpu line"))
|
Err(io::Error::other("no cpu line"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
#[inline]
|
#[inline]
|
||||||
fn read_proc_jiffies(pid: u32) -> Option<u64> {
|
fn read_proc_jiffies(pid: u32) -> Option<u64> {
|
||||||
let path = format!("/proc/{pid}/stat");
|
let path = format!("/proc/{pid}/stat");
|
||||||
@ -230,11 +235,10 @@ fn read_proc_jiffies(pid: u32) -> Option<u64> {
|
|||||||
Some(utime.saturating_add(stime))
|
Some(utime.saturating_add(stime))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replace the body of collect_processes_top_k to use /proc deltas.
|
/// Collect top processes (Linux variant): compute CPU% via /proc jiffies delta.
|
||||||
// This makes CPU% = (delta_proc / delta_total) * 100 over the 2s interval.
|
#[cfg(target_os = "linux")]
|
||||||
pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload {
|
pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload {
|
||||||
// Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows).
|
// Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows).
|
||||||
// Only processes, no per-thread entries.
|
|
||||||
let mut sys = System::new();
|
let mut sys = System::new();
|
||||||
sys.refresh_processes_specifics(
|
sys.refresh_processes_specifics(
|
||||||
ProcessesToUpdate::All,
|
ProcessesToUpdate::All,
|
||||||
@ -308,6 +312,47 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Collect top processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh.
|
||||||
|
#[cfg(not(target_os = "linux"))]
|
||||||
|
pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload {
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
let mut sys = state.sys.lock().await;
|
||||||
|
|
||||||
|
// First refresh to set baseline
|
||||||
|
sys.refresh_processes_specifics(
|
||||||
|
ProcessesToUpdate::All,
|
||||||
|
false,
|
||||||
|
ProcessRefreshKind::everything().without_tasks(),
|
||||||
|
);
|
||||||
|
// Small delay so sysinfo can compute CPU deltas on next refresh
|
||||||
|
sleep(Duration::from_millis(250)).await;
|
||||||
|
sys.refresh_processes_specifics(
|
||||||
|
ProcessesToUpdate::All,
|
||||||
|
false,
|
||||||
|
ProcessRefreshKind::everything().without_tasks(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let total_count = sys.processes().len();
|
||||||
|
|
||||||
|
let mut procs: Vec<ProcessInfo> = sys
|
||||||
|
.processes()
|
||||||
|
.values()
|
||||||
|
.map(|p| ProcessInfo {
|
||||||
|
pid: p.pid().as_u32(),
|
||||||
|
name: p.name().to_string_lossy().into_owned(),
|
||||||
|
cpu_usage: p.cpu_usage(),
|
||||||
|
mem_bytes: p.memory(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
procs = top_k_sorted(procs, k);
|
||||||
|
ProcessesPayload {
|
||||||
|
process_count: total_count,
|
||||||
|
top_processes: procs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Small helper to select and sort top-k by cpu
|
// Small helper to select and sort top-k by cpu
|
||||||
fn top_k_sorted(mut v: Vec<ProcessInfo>, k: usize) -> Vec<ProcessInfo> {
|
fn top_k_sorted(mut v: Vec<ProcessInfo>, k: usize) -> Vec<ProcessInfo> {
|
||||||
if v.len() > k {
|
if v.len() > k {
|
||||||
|
|||||||
@ -24,7 +24,7 @@ pub struct AppState {
|
|||||||
pub disks: SharedDisks,
|
pub disks: SharedDisks,
|
||||||
pub networks: SharedNetworks,
|
pub networks: SharedNetworks,
|
||||||
|
|
||||||
// For correct per-process CPU% using /proc deltas
|
// For correct per-process CPU% using /proc deltas (Linux only path uses this tracker)
|
||||||
pub proc_cpu: Arc<Mutex<ProcCpuTracker>>,
|
pub proc_cpu: Arc<Mutex<ProcCpuTracker>>,
|
||||||
|
|
||||||
// Connection tracking (to allow future idle sleeps if desired)
|
// Connection tracking (to allow future idle sleeps if desired)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user