Refactor for additional socktop connector library

- socktop connector allows you to communicate with socktop agent directly from you code without needing to implement the agent API directly.
- will also be used for non tui implementation of "socktop collector" in the future.
- moved to rust 2024 to take advantage of some new features that helped with refactor.
- fixed everything that exploded with update.
- added rust docs for lib.
This commit is contained in:
jasonwitty 2025-09-04 05:30:25 -07:00
parent 622767a605
commit e51054811c
25 changed files with 823 additions and 229 deletions

27
Cargo.lock generated
View File

@ -1953,6 +1953,7 @@ dependencies = [
"aws-lc-rs", "aws-lc-rs",
"log", "log",
"once_cell", "once_cell",
"ring",
"rustls-pki-types", "rustls-pki-types",
"rustls-webpki 0.103.4", "rustls-webpki 0.103.4",
"subtle", "subtle",
@ -2168,20 +2169,14 @@ dependencies = [
"assert_cmd", "assert_cmd",
"crossterm 0.27.0", "crossterm 0.27.0",
"dirs-next", "dirs-next",
"flate2",
"futures-util", "futures-util",
"prost",
"prost-build",
"protoc-bin-vendored",
"ratatui", "ratatui",
"rustls 0.23.31",
"rustls-pemfile",
"serde", "serde",
"serde_json", "serde_json",
"socktop_connector",
"sysinfo", "sysinfo",
"tempfile", "tempfile",
"tokio", "tokio",
"tokio-tungstenite 0.24.0",
"url", "url",
] ]
@ -2216,6 +2211,24 @@ dependencies = [
"tracing-subscriber", "tracing-subscriber",
] ]
[[package]]
name = "socktop_connector"
version = "0.1.0"
dependencies = [
"anyhow",
"flate2",
"futures-util",
"prost",
"prost-build",
"rustls 0.23.31",
"rustls-pemfile",
"serde",
"serde_json",
"tokio",
"tokio-tungstenite 0.24.0",
"url",
]
[[package]] [[package]]
name = "stable_deref_trait" name = "stable_deref_trait"
version = "1.2.0" version = "1.2.0"

View File

@ -2,7 +2,8 @@
resolver = "2" resolver = "2"
members = [ members = [
"socktop", "socktop",
"socktop_agent" "socktop_agent",
"socktop_connector"
] ]
[workspace.dependencies] [workspace.dependencies]
@ -26,7 +27,6 @@ sysinfo = "0.37"
ratatui = "0.28" ratatui = "0.28"
crossterm = "0.27" crossterm = "0.27"
# web server (remote-agent) # web server (remote-agent)
axum = { version = "0.7", features = ["ws"] } axum = { version = "0.7", features = ["ws"] }
@ -34,6 +34,13 @@ axum = { version = "0.7", features = ["ws"] }
prost = "0.13" prost = "0.13"
dirs-next = "2" dirs-next = "2"
# compression
flate2 = "1.0"
# TLS
rustls = { version = "0.23", features = ["ring"] }
rustls-pemfile = "2.1"
[profile.release] [profile.release]
# Favor smaller, simpler binaries with good runtime perf # Favor smaller, simpler binaries with good runtime perf
lto = "thin" lto = "thin"

View File

@ -3,13 +3,15 @@ name = "socktop"
version = "1.40.0" version = "1.40.0"
authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"] authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"]
description = "Remote system monitor over WebSocket, TUI like top" description = "Remote system monitor over WebSocket, TUI like top"
edition = "2021" edition = "2024"
license = "MIT" license = "MIT"
readme = "README.md" readme = "README.md"
[dependencies] [dependencies]
# socktop connector for agent communication
socktop_connector = { path = "../socktop_connector" }
tokio = { workspace = true } tokio = { workspace = true }
tokio-tungstenite = { workspace = true }
futures-util = { workspace = true } futures-util = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
@ -17,17 +19,9 @@ url = { workspace = true }
ratatui = { workspace = true } ratatui = { workspace = true }
crossterm = { workspace = true } crossterm = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
dirs-next = { workspace = true } dirs-next = { workspace = true }
sysinfo = { workspace = true } sysinfo = { workspace = true }
rustls = "0.23"
rustls-pemfile = "2.1"
prost = { workspace = true }
[dev-dependencies] [dev-dependencies]
assert_cmd = "2.0" assert_cmd = "2.0"
tempfile = "3" tempfile = "3"
[build-dependencies]
prost-build = "0.13"
protoc-bin-vendored = "3"

View File

@ -1,14 +0,0 @@
fn main() {
// Vendored protoc for reproducible builds (works on crates.io build machines)
let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc");
std::env::set_var("PROTOC", &protoc);
// Tell Cargo when to re-run
println!("cargo:rerun-if-changed=proto/processes.proto");
let mut cfg = prost_build::Config::new();
cfg.out_dir(std::env::var("OUT_DIR").unwrap());
// Use in-crate relative path so `cargo package` includes the file
cfg.compile_protos(&["proto/processes.proto"], &["proto"]) // paths relative to CARGO_MANIFEST_DIR
.expect("compile protos");
}

View File

@ -9,28 +9,36 @@ use std::{
use crossterm::{ use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
execute, execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
}; };
use ratatui::{ use ratatui::{
backend::CrosstermBackend,
layout::{Constraint, Direction, Rect},
//style::Color, // + add Color //style::Color, // + add Color
Terminal, Terminal,
backend::CrosstermBackend,
layout::{Constraint, Direction, Rect},
}; };
use tokio::time::sleep; use tokio::time::sleep;
use crate::history::{push_capped, PerCoreHistory}; use crate::history::{PerCoreHistory, push_capped};
use crate::types::Metrics; use crate::types::Metrics;
use crate::ui::cpu::{ use crate::ui::cpu::{
draw_cpu_avg_graph, draw_per_core_bars, per_core_clamp, per_core_content_area, PerCoreScrollDrag, draw_cpu_avg_graph, draw_per_core_bars, per_core_clamp,
per_core_handle_key, per_core_handle_mouse, per_core_handle_scrollbar_mouse, PerCoreScrollDrag, per_core_content_area, per_core_handle_key, per_core_handle_mouse,
per_core_handle_scrollbar_mouse,
}; };
use crate::ui::processes::{processes_handle_key, processes_handle_mouse, ProcSortBy}; use crate::ui::processes::{ProcSortBy, processes_handle_key, processes_handle_mouse};
use crate::ui::{ use crate::ui::{
disks::draw_disks, gpu::draw_gpu, header::draw_header, mem::draw_mem, net::draw_net_spark, disks::draw_disks, gpu::draw_gpu, header::draw_header, mem::draw_mem, net::draw_net_spark,
swap::draw_swap, swap::draw_swap,
}; };
use crate::ws::{connect, request_disks, request_metrics, request_processes}; use socktop_connector::{
AgentRequest, AgentResponse, SocktopConnector, connect_to_socktop_agent,
connect_to_socktop_agent_with_tls,
};
// Constants for minimum intervals to ensure reasonable performance
const MIN_METRICS_INTERVAL_MS: u64 = 100;
const MIN_PROCESSES_INTERVAL_MS: u64 = 200;
pub struct App { pub struct App {
// Latest metrics + histories // Latest metrics + histories
@ -106,12 +114,12 @@ impl App {
} }
pub fn with_intervals(mut self, metrics_ms: Option<u64>, procs_ms: Option<u64>) -> Self { pub fn with_intervals(mut self, metrics_ms: Option<u64>, procs_ms: Option<u64>) -> Self {
if let Some(m) = metrics_ms { metrics_ms.inspect(|&m| {
self.metrics_interval = Duration::from_millis(m.max(100)); self.metrics_interval = Duration::from_millis(m.max(MIN_METRICS_INTERVAL_MS));
} });
if let Some(p) = procs_ms { procs_ms.inspect(|&p| {
self.procs_interval = Duration::from_millis(p.max(200)); self.procs_interval = Duration::from_millis(p.max(MIN_PROCESSES_INTERVAL_MS));
} });
self self
} }
@ -125,11 +133,15 @@ impl App {
&mut self, &mut self,
url: &str, url: &str,
tls_ca: Option<&str>, tls_ca: Option<&str>,
verify_hostname: bool,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
// Connect to agent // Connect to agent
//let mut ws = connect(url, tls_ca).await?;
self.ws_url = url.to_string(); self.ws_url = url.to_string();
let mut ws = connect(url, tls_ca).await?; let mut ws = if let Some(ca_path) = tls_ca {
connect_to_socktop_agent_with_tls(url, ca_path, verify_hostname).await?
} else {
connect_to_socktop_agent(url).await?
};
// Terminal setup // Terminal setup
enable_raw_mode()?; enable_raw_mode()?;
@ -154,7 +166,7 @@ impl App {
async fn event_loop<B: ratatui::backend::Backend>( async fn event_loop<B: ratatui::backend::Backend>(
&mut self, &mut self,
terminal: &mut Terminal<B>, terminal: &mut Terminal<B>,
ws: &mut crate::ws::WsStream, ws: &mut SocktopConnector,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
loop { loop {
// Input (non-blocking) // Input (non-blocking)
@ -278,12 +290,16 @@ impl App {
} }
// Fetch and update // Fetch and update
if let Some(m) = request_metrics(ws).await { if let Ok(response) = ws.request(AgentRequest::Metrics).await {
self.update_with_metrics(m); if let AgentResponse::Metrics(m) = response {
self.update_with_metrics(m);
}
// Only poll processes every 2s // Only poll processes every 2s
if self.last_procs_poll.elapsed() >= self.procs_interval { if self.last_procs_poll.elapsed() >= self.procs_interval {
if let Some(procs) = request_processes(ws).await { if let Ok(AgentResponse::Processes(procs)) =
ws.request(AgentRequest::Processes).await
{
if let Some(mm) = self.last_metrics.as_mut() { if let Some(mm) = self.last_metrics.as_mut() {
mm.top_processes = procs.top_processes; mm.top_processes = procs.top_processes;
mm.process_count = Some(procs.process_count); mm.process_count = Some(procs.process_count);
@ -294,7 +310,7 @@ impl App {
// Only poll disks every 5s // Only poll disks every 5s
if self.last_disks_poll.elapsed() >= self.disks_interval { if self.last_disks_poll.elapsed() >= self.disks_interval {
if let Some(disks) = request_disks(ws).await { if let Ok(AgentResponse::Disks(disks)) = ws.request(AgentRequest::Disks).await {
if let Some(mm) = self.last_metrics.as_mut() { if let Some(mm) = self.last_metrics.as_mut() {
mm.disks = disks; mm.disks = disks;
} }

View File

@ -1,4 +1,6 @@
//! Library surface for integration tests and reuse. //! Library surface for integration tests and reuse.
pub mod types; pub mod types;
pub mod ws;
// Re-export connector functionality
pub use socktop_connector::{SocktopConnector, connect_to_socktop_agent};

View File

@ -5,10 +5,9 @@ mod history;
mod profiles; mod profiles;
mod types; mod types;
mod ui; mod ui;
mod ws;
use app::App; use app::App;
use profiles::{load_profiles, save_profiles, ProfileEntry, ProfileRequest, ResolveProfile}; use profiles::{ProfileEntry, ProfileRequest, ResolveProfile, load_profiles, save_profiles};
use std::env; use std::env;
use std::io::{self, Write}; use std::io::{self, Write};
@ -39,7 +38,9 @@ pub(crate) fn parse_args<I: IntoIterator<Item = String>>(args: I) -> Result<Pars
while let Some(arg) = it.next() { while let Some(arg) = it.next() {
match arg.as_str() { match arg.as_str() {
"-h" | "--help" => { "-h" | "--help" => {
return Err(format!("Usage: {prog} [--tls-ca CERT_PEM|-t CERT_PEM] [--verify-hostname] [--profile NAME|-P NAME] [--save] [--demo] [--metrics-interval-ms N] [--processes-interval-ms N] [ws://HOST:PORT/ws]\n")); return Err(format!(
"Usage: {prog} [--tls-ca CERT_PEM|-t CERT_PEM] [--verify-hostname] [--profile NAME|-P NAME] [--save] [--demo] [--metrics-interval-ms N] [--processes-interval-ms N] [ws://HOST:PORT/ws]\n"
));
} }
"--tls-ca" | "-t" => { "--tls-ca" | "-t" => {
tls_ca = it.next(); tls_ca = it.next();
@ -97,7 +98,9 @@ pub(crate) fn parse_args<I: IntoIterator<Item = String>>(args: I) -> Result<Pars
if url.is_none() { if url.is_none() {
url = Some(arg); url = Some(arg);
} else { } else {
return Err(format!("Unexpected argument. Usage: {prog} [--tls-ca CERT_PEM|-t CERT_PEM] [--verify-hostname] [--profile NAME|-P NAME] [--save] [--demo] [ws://HOST:PORT/ws]")); return Err(format!(
"Unexpected argument. Usage: {prog} [--tls-ca CERT_PEM|-t CERT_PEM] [--verify-hostname] [--profile NAME|-P NAME] [--save] [--demo] [ws://HOST:PORT/ws]"
));
} }
} }
} }
@ -135,11 +138,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
return run_demo_mode(parsed.tls_ca.as_deref()).await; return run_demo_mode(parsed.tls_ca.as_deref()).await;
} }
if parsed.verify_hostname {
// Set env var consumed by ws::connect logic
std::env::set_var("SOCKTOP_VERIFY_NAME", "1");
}
let profiles_file = load_profiles(); let profiles_file = load_profiles();
let req = ProfileRequest { let req = ProfileRequest {
profile_name: parsed.profile.clone(), profile_name: parsed.profile.clone(),
@ -239,7 +237,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut line = String::new(); let mut line = String::new();
if io::stdin().read_line(&mut line).is_ok() { if io::stdin().read_line(&mut line).is_ok() {
if let Ok(idx) = line.trim().parse::<usize>() { if let Ok(idx) = line.trim().parse::<usize>() {
if idx >= 1 && idx <= names.len() { if (1..=names.len()).contains(&idx) {
let name = &names[idx - 1]; let name = &names[idx - 1];
if name == "demo" { if name == "demo" {
return run_demo_mode(parsed.tls_ca.as_deref()).await; return run_demo_mode(parsed.tls_ca.as_deref()).await;
@ -297,7 +295,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
if profiles_mut.profiles.is_empty() && parsed.url.is_none() { if profiles_mut.profiles.is_empty() && parsed.url.is_none() {
eprintln!("Welcome to socktop!"); eprintln!("Welcome to socktop!");
eprintln!("It looks like this is your first time running the application."); eprintln!("It looks like this is your first time running the application.");
eprintln!("You can connect to a socktop_agent instance to monitor system metrics and processes."); eprintln!(
"You can connect to a socktop_agent instance to monitor system metrics and processes."
);
eprintln!("If you don't have an agent running, you can try the demo mode."); eprintln!("If you don't have an agent running, you can try the demo mode.");
if prompt_yes_no("Would you like to start the demo mode now? [Y/n]: ") { if prompt_yes_no("Would you like to start the demo mode now? [Y/n]: ") {
return run_demo_mode(parsed.tls_ca.as_deref()).await; return run_demo_mode(parsed.tls_ca.as_deref()).await;
@ -318,7 +318,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
if parsed.dry_run { if parsed.dry_run {
return Ok(()); return Ok(());
} }
app.run(&url, tls_ca.as_deref()).await app.run(&url, tls_ca.as_deref(), parsed.verify_hostname)
.await
} }
fn prompt_yes_no(prompt: &str) -> bool { fn prompt_yes_no(prompt: &str) -> bool {
@ -382,7 +383,8 @@ async fn run_demo_mode(_tls_ca: Option<&str>) -> Result<(), Box<dyn std::error::
let url = format!("ws://127.0.0.1:{port}/ws"); let url = format!("ws://127.0.0.1:{port}/ws");
let child = spawn_demo_agent(port)?; let child = spawn_demo_agent(port)?;
let mut app = App::new(); let mut app = App::new();
tokio::select! { res=app.run(&url,None)=>{ drop(child); res } _=tokio::signal::ctrl_c()=>{ drop(child); Ok(()) } } // Demo mode connects to localhost, so disable hostname verification
tokio::select! { res=app.run(&url,None,false)=>{ drop(child); res } _=tokio::signal::ctrl_c()=>{ drop(child); Ok(()) } }
} }
struct DemoGuard { struct DemoGuard {
port: u16, port: u16,

View File

@ -77,12 +77,13 @@ impl ProfileRequest {
pub fn resolve(self, pf: &ProfilesFile) -> ResolveProfile { pub fn resolve(self, pf: &ProfilesFile) -> ResolveProfile {
// Case: only profile name given -> try load // Case: only profile name given -> try load
if self.url.is_none() && self.profile_name.is_some() { if self.url.is_none() && self.profile_name.is_some() {
let name = self.profile_name.unwrap(); let Some(name) = self.profile_name else {
if let Some(entry) = pf.profiles.get(&name) { unreachable!("Already checked profile_name.is_some()")
return ResolveProfile::Loaded(entry.url.clone(), entry.tls_ca.clone()); };
} else { let Some(entry) = pf.profiles.get(&name) else {
return ResolveProfile::PromptCreate(name); return ResolveProfile::PromptCreate(name);
} };
return ResolveProfile::Loaded(entry.url.clone(), entry.tls_ca.clone());
} }
// Both provided -> direct (maybe later saved by caller) // Both provided -> direct (maybe later saved by caller)
if let Some(u) = self.url { if let Some(u) = self.url {

View File

@ -1,78 +1,4 @@
//! Types that mirror the agent's JSON schema. //! Types that mirror the agent's JSON schema.
use serde::Deserialize; // Re-export commonly used types from socktop_connector
pub use socktop_connector::Metrics;
#[derive(Debug, Clone, Deserialize)]
pub struct ProcessInfo {
pub pid: u32,
pub name: String,
pub cpu_usage: f32,
pub mem_bytes: u64,
}
#[derive(Debug, Clone, Deserialize)]
pub struct DiskInfo {
pub name: String,
pub total: u64,
pub available: u64,
}
#[derive(Debug, Clone, Deserialize)]
pub struct NetworkInfo {
#[allow(dead_code)]
pub name: String,
pub received: u64,
pub transmitted: u64,
}
#[derive(Debug, Clone, Deserialize)]
pub struct GpuInfo {
pub name: Option<String>,
#[allow(dead_code)]
pub vendor: Option<String>,
// Accept both the new and legacy keys
#[serde(
default,
alias = "utilization_gpu_pct",
alias = "gpu_util_pct",
alias = "gpu_utilization"
)]
pub utilization: Option<f32>,
#[serde(default, alias = "mem_used_bytes", alias = "vram_used_bytes")]
pub mem_used: Option<u64>,
#[serde(default, alias = "mem_total_bytes", alias = "vram_total_bytes")]
pub mem_total: Option<u64>,
#[allow(dead_code)]
#[serde(default, alias = "temp_c", alias = "temperature_c")]
pub temperature: Option<f32>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Metrics {
pub cpu_total: f32,
pub cpu_per_core: Vec<f32>,
pub mem_total: u64,
pub mem_used: u64,
pub swap_total: u64,
pub swap_used: u64,
pub hostname: String,
pub cpu_temp_c: Option<f32>,
pub disks: Vec<DiskInfo>,
pub networks: Vec<NetworkInfo>,
pub top_processes: Vec<ProcessInfo>,
pub gpus: Option<Vec<GpuInfo>>,
// New: keep the last reported total process count
#[serde(default)]
pub process_count: Option<usize>,
}
#[allow(dead_code)]
#[derive(Debug, Clone, Deserialize)]
pub struct ProcessesPayload {
pub process_count: usize,
pub top_processes: Vec<ProcessInfo>,
}

View File

@ -60,7 +60,9 @@ fn test_profile_created_on_first_use() {
let _guard = ENV_LOCK.lock().unwrap(); let _guard = ENV_LOCK.lock().unwrap();
// Isolate config in a temp dir // Isolate config in a temp dir
let td = tempfile::tempdir().unwrap(); let td = tempfile::tempdir().unwrap();
std::env::set_var("XDG_CONFIG_HOME", td.path()); unsafe {
std::env::set_var("XDG_CONFIG_HOME", td.path());
}
// Ensure directory exists fresh // Ensure directory exists fresh
std::fs::create_dir_all(td.path().join("socktop")).unwrap(); std::fs::create_dir_all(td.path().join("socktop")).unwrap();
let _ = fs::remove_file(profiles_path()); let _ = fs::remove_file(profiles_path());
@ -78,7 +80,9 @@ fn test_profile_created_on_first_use() {
fn test_profile_overwrite_only_when_changed() { fn test_profile_overwrite_only_when_changed() {
let _guard = ENV_LOCK.lock().unwrap(); let _guard = ENV_LOCK.lock().unwrap();
let td = tempfile::tempdir().unwrap(); let td = tempfile::tempdir().unwrap();
std::env::set_var("XDG_CONFIG_HOME", td.path()); unsafe {
std::env::set_var("XDG_CONFIG_HOME", td.path());
}
std::fs::create_dir_all(td.path().join("socktop")).unwrap(); std::fs::create_dir_all(td.path().join("socktop")).unwrap();
let _ = fs::remove_file(profiles_path()); let _ = fs::remove_file(profiles_path());
// Initial create // Initial create
@ -101,7 +105,9 @@ fn test_profile_overwrite_only_when_changed() {
fn test_profile_tls_ca_persisted() { fn test_profile_tls_ca_persisted() {
let _guard = ENV_LOCK.lock().unwrap(); let _guard = ENV_LOCK.lock().unwrap();
let td = tempfile::tempdir().unwrap(); let td = tempfile::tempdir().unwrap();
std::env::set_var("XDG_CONFIG_HOME", td.path()); unsafe {
std::env::set_var("XDG_CONFIG_HOME", td.path());
}
std::fs::create_dir_all(td.path().join("socktop")).unwrap(); std::fs::create_dir_all(td.path().join("socktop")).unwrap();
let _ = fs::remove_file(profiles_path()); let _ = fs::remove_file(profiles_path());
let (_ok, _out) = run_socktop(&[ let (_ok, _out) = run_socktop(&[

View File

@ -1,29 +0,0 @@
use socktop::ws::{connect, request_metrics, request_processes};
// Integration probe: only runs when SOCKTOP_WS is set to an agent WebSocket URL.
// Example: SOCKTOP_WS=ws://127.0.0.1:3000/ws cargo test -p socktop --test ws_probe -- --nocapture
#[tokio::test]
async fn probe_ws_endpoints() {
// Gate the test to avoid CI failures when no agent is running.
let url = match std::env::var("SOCKTOP_WS") {
Ok(v) if !v.is_empty() => v,
_ => {
eprintln!(
"skipping ws_probe: set SOCKTOP_WS=ws://host:port/ws to run this integration test"
);
return;
}
};
// Optional pinned CA for WSS/self-signed setups
let tls_ca = std::env::var("SOCKTOP_TLS_CA").ok();
let mut ws = connect(&url, tls_ca.as_deref()).await.expect("connect ws");
// Should get fast metrics quickly
let m = request_metrics(&mut ws).await;
assert!(m.is_some(), "expected Metrics payload within timeout");
// Processes may be gzipped and a bit slower, but should arrive
let p = request_processes(&mut ws).await;
assert!(p.is_some(), "expected Processes payload within timeout");
}

View File

@ -2,8 +2,8 @@
name = "socktop_agent" name = "socktop_agent"
version = "1.40.67" version = "1.40.67"
authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"] authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"]
description = "Remote system monitor over WebSocket, TUI like top" description = "Socktop agent daemon. Serves host metrics over WebSocket."
edition = "2021" edition = "2024"
license = "MIT" license = "MIT"
readme = "README.md" readme = "README.md"

View File

@ -1,13 +1,13 @@
fn main() { fn main() {
// Vendored protoc for reproducible builds // Vendored protoc for reproducible builds
let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc"); let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc");
std::env::set_var("PROTOC", &protoc);
println!("cargo:rerun-if-changed=proto/processes.proto"); println!("cargo:rerun-if-changed=proto/processes.proto");
// Compile protobuf definitions for processes // Compile protobuf definitions for processes
let mut cfg = prost_build::Config::new(); let mut cfg = prost_build::Config::new();
cfg.out_dir(std::env::var("OUT_DIR").unwrap()); cfg.out_dir(std::env::var("OUT_DIR").unwrap());
cfg.protoc_executable(protoc); // Use the vendored protoc directly
// Use local path (ensures file is inside published crate tarball) // Use local path (ensures file is inside published crate tarball)
cfg.compile_protos(&["proto/processes.proto"], &["proto"]) // relative to CARGO_MANIFEST_DIR cfg.compile_protos(&["proto/processes.proto"], &["proto"]) // relative to CARGO_MANIFEST_DIR
.expect("compile protos"); .expect("compile protos");

View File

@ -8,7 +8,7 @@ mod state;
mod types; mod types;
mod ws; mod ws;
use axum::{http::StatusCode, routing::get, Router}; use axum::{Router, http::StatusCode, routing::get};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::str::FromStr; use std::str::FromStr;

View File

@ -1,8 +1,8 @@
//! Shared agent state: sysinfo handles and hot JSON cache. //! Shared agent state: sysinfo handles and hot JSON cache.
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use sysinfo::{Components, Disks, Networks, System}; use sysinfo::{Components, Disks, Networks, System};
use tokio::sync::Mutex; use tokio::sync::Mutex;

View File

@ -5,7 +5,7 @@ use axum::{
extract::{Query, State, WebSocketUpgrade}, extract::{Query, State, WebSocketUpgrade},
response::Response, response::Response,
}; };
use flate2::{write::GzEncoder, Compression}; use flate2::{Compression, write::GzEncoder};
use futures_util::StreamExt; use futures_util::StreamExt;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use std::collections::HashMap; use std::collections::HashMap;

View File

@ -0,0 +1,48 @@
[package]
name = "socktop_connector"
version = "0.1.0"
edition = "2024"
license = "MIT"
description = "WebSocket connector library for socktop agent communication"
authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"]
repository = "https://github.com/jasonwitty/socktop"
readme = "README.md"
keywords = ["monitoring", "websocket", "metrics", "system"]
categories = ["network-programming", "development-tools", "system-tools"]
documentation = "https://docs.rs/socktop_connector"
# docs.rs specific metadata
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
# WebSocket client
tokio-tungstenite = { workspace = true }
tokio = { workspace = true }
futures-util = { workspace = true }
url = { workspace = true }
# TLS support
rustls = { version = "0.23", features = ["ring"], optional = true }
rustls-pemfile = { version = "2.1", optional = true }
# Serialization
serde = { workspace = true }
serde_json = { workspace = true }
# Compression
flate2 = "1.0"
# Protobuf
prost = { workspace = true }
# Error handling
anyhow = { workspace = true }
[build-dependencies]
prost-build = "0.13"
[features]
default = ["tls"]
tls = ["rustls", "rustls-pemfile"]

21
socktop_connector/LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Jason Witty
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

171
socktop_connector/README.md Normal file
View File

@ -0,0 +1,171 @@
# socktop_connector
A WebSocket connector library for communicating with socktop agents.
## Overview
`socktop_connector` provides a high-level, type-safe interface for connecting to socktop agents over WebSocket connections. It handles connection management, TLS certificate pinning, compression, and protocol buffer decoding automatically.
## Features
- **WebSocket Communication**: Support for both `ws://` and `wss://` connections
- **TLS Security**: Certificate pinning for secure connections with self-signed certificates
- **Hostname Verification**: Configurable hostname verification for TLS connections
- **Type Safety**: Strongly typed requests and responses
- **Automatic Compression**: Handles gzip compression/decompression transparently
- **Protocol Buffer Support**: Decodes binary process data automatically
- **Error Handling**: Comprehensive error handling with detailed error messages
## Connection Types
### Non-TLS Connections (`ws://`)
Use `connect_to_socktop_agent()` for unencrypted WebSocket connections.
### TLS Connections (`wss://`)
Use `connect_to_socktop_agent_with_tls()` for encrypted connections with certificate pinning. You can control hostname verification with the `verify_hostname` parameter.
## Quick Start
Add this to your `Cargo.toml`:
```toml
[dependencies]
socktop_connector = "0.1"
tokio = { version = "1", features = ["full"] }
```
### Basic Usage
```rust
use socktop_connector::{connect_to_socktop_agent, AgentRequest, AgentResponse};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to a socktop agent (non-TLS connections are always unverified)
let mut connector = connect_to_socktop_agent("ws://localhost:3000/ws").await?;
// Request metrics
match connector.request(AgentRequest::Metrics).await? {
AgentResponse::Metrics(metrics) => {
println!("CPU: {}%, Memory: {}/{}MB",
metrics.cpu_total,
metrics.mem_used / 1024 / 1024,
metrics.mem_total / 1024 / 1024
);
}
_ => unreachable!(),
}
// Request process list
match connector.request(AgentRequest::Processes).await? {
AgentResponse::Processes(processes) => {
println!("Total processes: {}", processes.process_count);
for process in processes.top_processes.iter().take(5) {
println!(" {} (PID: {}) - CPU: {}%",
process.name, process.pid, process.cpu_usage);
}
}
_ => unreachable!(),
}
Ok(())
}
```
### TLS with Certificate Pinning
```rust
use socktop_connector::{connect_to_socktop_agent_with_tls, AgentRequest};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect with TLS certificate pinning and hostname verification
let mut connector = connect_to_socktop_agent_with_tls(
"wss://remote-host:8443/ws",
"/path/to/cert.pem",
false // Enable hostname verification
).await?;
let response = connector.request(AgentRequest::Disks).await?;
println!("Got disk info: {:?}", response);
Ok(())
}
```
### Advanced Configuration
```rust
use socktop_connector::{ConnectorConfig, SocktopConnector, AgentRequest};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a custom configuration
let config = ConnectorConfig::new("wss://remote-host:8443/ws")
.with_tls_ca("/path/to/cert.pem")
.with_hostname_verification(false);
// Create and connect
let mut connector = SocktopConnector::new(config);
connector.connect().await?;
// Make requests
let response = connector.request(AgentRequest::Metrics).await?;
// Clean disconnect
connector.disconnect().await?;
Ok(())
}
```
## Request Types
The library supports three types of requests:
- `AgentRequest::Metrics` - Get current system metrics (CPU, memory, network, etc.)
- `AgentRequest::Disks` - Get disk usage information
- `AgentRequest::Processes` - Get running process information
## Response Types
Responses are automatically parsed into strongly-typed structures:
- `AgentResponse::Metrics(Metrics)` - System metrics with CPU, memory, network data
- `AgentResponse::Disks(Vec<DiskInfo>)` - List of disk usage information
- `AgentResponse::Processes(ProcessesPayload)` - Process list with CPU and memory usage
## Configuration Options
The library provides flexible configuration through the `ConnectorConfig` builder:
- `with_tls_ca(path)` - Enable TLS with certificate pinning
- `with_hostname_verification(bool)` - Control hostname verification for TLS connections
- `true` (recommended): Verify the server hostname matches the certificate
- `false`: Skip hostname verification (useful for localhost or IP-based connections)
**Note**: Hostname verification only applies to TLS connections (`wss://`). Non-TLS connections (`ws://`) don't use certificates, so hostname verification is not applicable.
## Security Considerations
- **Production TLS**: Always enable hostname verification (`verify_hostname: true`) for production
- **Development/Testing**: You may disable hostname verification for localhost or IP addresses
- **Certificate Pinning**: Use `with_tls_ca()` for self-signed certificates
- **Non-TLS**: Use only for development or trusted networks
## Environment Variables
Currently no environment variables are used. All configuration is done through the API.
## Error Handling
The library uses `anyhow::Error` for error handling, providing detailed error messages for common failure scenarios:
- Connection failures
- TLS certificate validation errors
- Protocol errors
- Parsing errors
## License
MIT License - see the LICENSE file for details.

View File

@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
prost_build::compile_protos(&["processes.proto"], &["."])?;
Ok(())
}

View File

@ -0,0 +1,15 @@
syntax = "proto3";
package socktop;
// All running processes. Sorting is done client-side.
message Processes {
uint64 process_count = 1; // total processes in the system
repeated Process rows = 2; // all processes
}
message Process {
uint32 pid = 1;
string name = 2;
float cpu_usage = 3; // 0..100
uint64 mem_bytes = 4; // RSS bytes
}

View File

@ -1,4 +1,4 @@
//! Minimal WebSocket client helpers for requesting metrics from the agent. //! WebSocket connector for communicating with socktop agents.
use flate2::bufread::GzDecoder; use flate2::bufread::GzDecoder;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
@ -12,12 +12,21 @@ use std::io::Read;
use std::{fs::File, io::BufReader, sync::Arc}; use std::{fs::File, io::BufReader, sync::Arc};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_tungstenite::{ use tokio_tungstenite::{
connect_async, connect_async_tls_with_config, tungstenite::client::IntoClientRequest, Connector, MaybeTlsStream, WebSocketStream, connect_async, connect_async_tls_with_config,
tungstenite::Message, Connector, MaybeTlsStream, WebSocketStream, tungstenite::Message, tungstenite::client::IntoClientRequest,
}; };
use url::Url; use url::Url;
use crate::types::{DiskInfo, Metrics, ProcessInfo, ProcessesPayload}; use crate::types::{AgentRequest, AgentResponse, DiskInfo, Metrics, ProcessInfo, ProcessesPayload};
#[cfg(feature = "tls")]
fn ensure_crypto_provider() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
let _ = rustls::crypto::ring::default_provider().install_default();
});
}
mod pb { mod pb {
// generated by build.rs // generated by build.rs
@ -26,23 +35,132 @@ mod pb {
pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>; pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
/// Configuration for connecting to a socktop agent
#[derive(Debug, Clone)]
pub struct ConnectorConfig {
pub url: String,
pub tls_ca_path: Option<String>,
pub verify_hostname: bool,
}
impl ConnectorConfig {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
tls_ca_path: None,
verify_hostname: false,
}
}
pub fn with_tls_ca(mut self, ca_path: impl Into<String>) -> Self {
self.tls_ca_path = Some(ca_path.into());
self
}
pub fn with_hostname_verification(mut self, verify: bool) -> Self {
self.verify_hostname = verify;
self
}
}
/// A WebSocket connector for communicating with socktop agents
pub struct SocktopConnector {
config: ConnectorConfig,
stream: Option<WsStream>,
}
impl SocktopConnector {
/// Create a new connector with the given configuration
pub fn new(config: ConnectorConfig) -> Self {
Self {
config,
stream: None,
}
}
/// Connect to the agent
pub async fn connect(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let stream = connect_to_agent(
&self.config.url,
self.config.tls_ca_path.as_deref(),
self.config.verify_hostname,
)
.await?;
self.stream = Some(stream);
Ok(())
}
/// Send a request to the agent and get the response
pub async fn request(
&mut self,
request: AgentRequest,
) -> Result<AgentResponse, Box<dyn std::error::Error>> {
let stream = self.stream.as_mut().ok_or("Not connected")?;
match request {
AgentRequest::Metrics => {
let metrics = request_metrics(stream)
.await
.ok_or("Failed to get metrics")?;
Ok(AgentResponse::Metrics(metrics))
}
AgentRequest::Disks => {
let disks = request_disks(stream).await.ok_or("Failed to get disks")?;
Ok(AgentResponse::Disks(disks))
}
AgentRequest::Processes => {
let processes = request_processes(stream)
.await
.ok_or("Failed to get processes")?;
Ok(AgentResponse::Processes(processes))
}
}
}
/// Check if the connector is connected
pub fn is_connected(&self) -> bool {
self.stream.is_some()
}
/// Disconnect from the agent
pub async fn disconnect(&mut self) -> Result<(), Box<dyn std::error::Error>> {
if let Some(mut stream) = self.stream.take() {
let _ = stream.close(None).await;
}
Ok(())
}
}
// Connect to the agent and return the WS stream // Connect to the agent and return the WS stream
pub async fn connect( async fn connect_to_agent(
url: &str, url: &str,
tls_ca: Option<&str>, tls_ca: Option<&str>,
verify_hostname: bool,
) -> Result<WsStream, Box<dyn std::error::Error>> { ) -> Result<WsStream, Box<dyn std::error::Error>> {
#[cfg(feature = "tls")]
ensure_crypto_provider();
let mut u = Url::parse(url)?; let mut u = Url::parse(url)?;
if let Some(ca_path) = tls_ca { if let Some(ca_path) = tls_ca {
if u.scheme() == "ws" { if u.scheme() == "ws" {
let _ = u.set_scheme("wss"); let _ = u.set_scheme("wss");
} }
return connect_with_ca(u.as_str(), ca_path).await; return connect_with_ca(u.as_str(), ca_path, verify_hostname).await;
} }
// No TLS - hostname verification is not applicable
let (ws, _) = connect_async(u.as_str()).await?; let (ws, _) = connect_async(u.as_str()).await?;
Ok(ws) Ok(ws)
} }
async fn connect_with_ca(url: &str, ca_path: &str) -> Result<WsStream, Box<dyn std::error::Error>> { #[cfg(feature = "tls")]
async fn connect_with_ca(
url: &str,
ca_path: &str,
verify_hostname: bool,
) -> Result<WsStream, Box<dyn std::error::Error>> {
// Initialize the crypto provider for rustls
let _ = rustls::crypto::ring::default_provider().install_default();
let mut root = RootCertStore::empty(); let mut root = RootCertStore::empty();
let mut reader = BufReader::new(File::open(ca_path)?); let mut reader = BufReader::new(File::open(ca_path)?);
let mut der_certs = Vec::new(); let mut der_certs = Vec::new();
@ -58,8 +176,7 @@ async fn connect_with_ca(url: &str, ca_path: &str) -> Result<WsStream, Box<dyn s
.with_no_client_auth(); .with_no_client_auth();
let req = url.into_client_request()?; let req = url.into_client_request()?;
let verify_domain = std::env::var("SOCKTOP_VERIFY_NAME").ok().as_deref() == Some("1"); if !verify_hostname {
if !verify_domain {
#[derive(Debug)] #[derive(Debug)]
struct NoVerify; struct NoVerify;
impl ServerCertVerifier for NoVerify { impl ServerCertVerifier for NoVerify {
@ -90,7 +207,6 @@ async fn connect_with_ca(url: &str, ca_path: &str) -> Result<WsStream, Box<dyn s
Ok(HandshakeSignatureValid::assertion()) Ok(HandshakeSignatureValid::assertion())
} }
fn supported_verify_schemes(&self) -> Vec<SignatureScheme> { fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
// Provide common schemes; not strictly needed for skipping but keeps API happy
vec![ vec![
SignatureScheme::ECDSA_NISTP256_SHA256, SignatureScheme::ECDSA_NISTP256_SHA256,
SignatureScheme::ED25519, SignatureScheme::ED25519,
@ -99,17 +215,27 @@ async fn connect_with_ca(url: &str, ca_path: &str) -> Result<WsStream, Box<dyn s
} }
} }
cfg.dangerous().set_certificate_verifier(Arc::new(NoVerify)); cfg.dangerous().set_certificate_verifier(Arc::new(NoVerify));
eprintln!("socktop: hostname verification disabled (default). Use --verify-hostname to enable strict SAN checking."); eprintln!(
"socktop_connector: hostname verification disabled (default). Set SOCKTOP_VERIFY_NAME=1 to enable strict SAN checking."
);
} }
let cfg = Arc::new(cfg); let cfg = Arc::new(cfg);
let (ws, _) = let (ws, _) =
connect_async_tls_with_config(req, None, verify_domain, Some(Connector::Rustls(cfg))) connect_async_tls_with_config(req, None, verify_hostname, Some(Connector::Rustls(cfg)))
.await?; .await?;
Ok(ws) Ok(ws)
} }
#[cfg(not(feature = "tls"))]
async fn connect_with_ca(
_url: &str,
_ca_path: &str,
) -> Result<WsStream, Box<dyn std::error::Error>> {
Err("TLS support not compiled in".into())
}
// Send a "get_metrics" request and await a single JSON reply // Send a "get_metrics" request and await a single JSON reply
pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> { async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
if ws.send(Message::Text("get_metrics".into())).await.is_err() { if ws.send(Message::Text("get_metrics".into())).await.is_err() {
return None; return None;
} }
@ -122,34 +248,8 @@ pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
} }
} }
// Decompress a gzip-compressed binary frame into a String.
fn gunzip_to_string(bytes: &[u8]) -> Option<String> {
let mut dec = GzDecoder::new(bytes);
let mut out = String::new();
dec.read_to_string(&mut out).ok()?;
Some(out)
}
fn gunzip_to_vec(bytes: &[u8]) -> Option<Vec<u8>> {
let mut dec = GzDecoder::new(bytes);
let mut out = Vec::new();
dec.read_to_end(&mut out).ok()?;
Some(out)
}
fn is_gzip(bytes: &[u8]) -> bool {
bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b
}
// Suppress dead_code until these are wired into the app
#[allow(dead_code)]
pub enum Payload {
Metrics(Metrics),
Disks(Vec<DiskInfo>),
Processes(ProcessesPayload),
}
// Send a "get_disks" request and await a JSON Vec<DiskInfo> // Send a "get_disks" request and await a JSON Vec<DiskInfo>
pub async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> { async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
if ws.send(Message::Text("get_disks".into())).await.is_err() { if ws.send(Message::Text("get_disks".into())).await.is_err() {
return None; return None;
} }
@ -163,7 +263,7 @@ pub async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
} }
// Send a "get_processes" request and await a ProcessesPayload decoded from protobuf (binary, may be gzipped) // Send a "get_processes" request and await a ProcessesPayload decoded from protobuf (binary, may be gzipped)
pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> { async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
if ws if ws
.send(Message::Text("get_processes".into())) .send(Message::Text("get_processes".into()))
.await .await
@ -208,3 +308,57 @@ pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
_ => None, _ => None,
} }
} }
// Decompress a gzip-compressed binary frame into a String.
fn gunzip_to_string(bytes: &[u8]) -> Option<String> {
let mut dec = GzDecoder::new(bytes);
let mut out = String::new();
dec.read_to_string(&mut out).ok()?;
Some(out)
}
fn gunzip_to_vec(bytes: &[u8]) -> Option<Vec<u8>> {
let mut dec = GzDecoder::new(bytes);
let mut out = Vec::new();
dec.read_to_end(&mut out).ok()?;
Some(out)
}
fn is_gzip(bytes: &[u8]) -> bool {
bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b
}
/// Convenience function to create a connector and connect in one step.
///
/// This function is for non-TLS WebSocket connections (`ws://`). Since there's no
/// certificate involved, hostname verification is not applicable.
///
/// For TLS connections with certificate pinning, use `connect_to_socktop_agent_with_tls()`.
pub async fn connect_to_socktop_agent(
url: impl Into<String>,
) -> Result<SocktopConnector, Box<dyn std::error::Error>> {
let config = ConnectorConfig::new(url);
let mut connector = SocktopConnector::new(config);
connector.connect().await?;
Ok(connector)
}
/// Convenience function to create a connector with TLS and connect in one step.
///
/// This function enables TLS with certificate pinning using the provided CA certificate.
/// The `verify_hostname` parameter controls whether the server's hostname is verified
/// against the certificate (recommended for production, can be disabled for testing).
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub async fn connect_to_socktop_agent_with_tls(
url: impl Into<String>,
ca_path: impl Into<String>,
verify_hostname: bool,
) -> Result<SocktopConnector, Box<dyn std::error::Error>> {
let config = ConnectorConfig::new(url)
.with_tls_ca(ca_path)
.with_hostname_verification(verify_hostname);
let mut connector = SocktopConnector::new(config);
connector.connect().await?;
Ok(connector)
}

View File

@ -0,0 +1,101 @@
//! WebSocket connector library for socktop agents.
//!
//! This library provides a high-level interface for connecting to socktop agents
//! over WebSocket connections with support for TLS and certificate pinning.
//!
//! # Quick Start
//!
//! ```no_run
//! use socktop_connector::{connect_to_socktop_agent, AgentRequest, AgentResponse};
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let mut connector = connect_to_socktop_agent("ws://localhost:3000/ws").await?;
//!
//! // Get comprehensive system metrics
//! if let Ok(AgentResponse::Metrics(metrics)) = connector.request(AgentRequest::Metrics).await {
//! println!("Hostname: {}", metrics.hostname);
//! println!("CPU Usage: {:.1}%", metrics.cpu_total);
//!
//! // CPU temperature if available
//! if let Some(temp) = metrics.cpu_temp_c {
//! println!("CPU Temperature: {:.1}°C", temp);
//! }
//!
//! // Memory usage
//! println!("Memory: {:.1} GB / {:.1} GB",
//! metrics.mem_used as f64 / 1_000_000_000.0,
//! metrics.mem_total as f64 / 1_000_000_000.0);
//!
//! // Per-core CPU usage
//! for (i, usage) in metrics.cpu_per_core.iter().enumerate() {
//! println!("Core {}: {:.1}%", i, usage);
//! }
//!
//! // GPU information
//! if let Some(gpus) = &metrics.gpus {
//! for gpu in gpus {
//! if let Some(name) = &gpu.name {
//! println!("GPU {}: {:.1}% usage", name, gpu.utilization.unwrap_or(0.0));
//! if let Some(temp) = gpu.temp {
//! println!(" Temperature: {:.1}°C", temp);
//! }
//! }
//! }
//! }
//! }
//!
//! // Get process information
//! if let Ok(AgentResponse::Processes(processes)) = connector.request(AgentRequest::Processes).await {
//! println!("Running processes: {}", processes.process_count);
//! for proc in &processes.top_processes {
//! println!(" PID {}: {} ({:.1}% CPU, {:.1} MB RAM)",
//! proc.pid, proc.name, proc.cpu_usage, proc.mem_bytes as f64 / 1_000_000.0);
//! }
//! }
//!
//! // Get disk information
//! if let Ok(AgentResponse::Disks(disks)) = connector.request(AgentRequest::Disks).await {
//! for disk in disks {
//! let used_gb = (disk.total - disk.available) as f64 / 1_000_000_000.0;
//! let total_gb = disk.total as f64 / 1_000_000_000.0;
//! println!("Disk {}: {:.1} GB / {:.1} GB", disk.name, used_gb, total_gb);
//! }
//! }
//!
//! Ok(())
//! }
//! ```
//!
//! # TLS Support
//!
//! ```no_run
//! use socktop_connector::connect_to_socktop_agent_with_tls;
//!
//! # #[tokio::main]
//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let connector = connect_to_socktop_agent_with_tls(
//! "wss://secure-host:3000/ws",
//! "/path/to/ca.pem",
//! false // Enable hostname verification
//! ).await?;
//! # Ok(())
//! # }
//! ```
#![cfg_attr(docsrs, feature(doc_cfg))]
pub mod connector;
pub mod types;
pub use connector::{
ConnectorConfig, SocktopConnector, WsStream, connect_to_socktop_agent,
connect_to_socktop_agent_with_tls,
};
pub use types::{
AgentRequest, AgentResponse, DiskInfo, GpuInfo, Metrics, NetworkInfo, ProcessInfo,
ProcessesPayload,
};
/// Re-export commonly used error type
pub use anyhow::Error;

View File

@ -0,0 +1,105 @@
//! Types that represent data from the socktop agent.
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ProcessInfo {
pub pid: u32,
pub name: String,
pub cpu_usage: f32,
pub mem_bytes: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct DiskInfo {
pub name: String,
pub total: u64,
pub available: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct NetworkInfo {
pub name: String,
pub received: u64,
pub transmitted: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GpuInfo {
pub name: Option<String>,
pub vendor: Option<String>,
// Accept both the new and legacy keys
#[serde(
default,
alias = "utilization_gpu_pct",
alias = "gpu_util_pct",
alias = "gpu_utilization"
)]
pub utilization: Option<f32>,
#[serde(default, alias = "mem_used_bytes", alias = "vram_used_bytes")]
pub mem_used: Option<u64>,
#[serde(default, alias = "mem_total_bytes", alias = "vram_total_bytes")]
pub mem_total: Option<u64>,
#[serde(default, alias = "temp_c", alias = "temperature_c")]
pub temp: Option<f32>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Metrics {
pub cpu_total: f32,
pub cpu_per_core: Vec<f32>,
pub mem_total: u64,
pub mem_used: u64,
pub swap_total: u64,
pub swap_used: u64,
pub hostname: String,
pub cpu_temp_c: Option<f32>,
pub disks: Vec<DiskInfo>,
pub networks: Vec<NetworkInfo>,
pub top_processes: Vec<ProcessInfo>,
pub gpus: Option<Vec<GpuInfo>>,
// New: keep the last reported total process count
#[serde(default)]
pub process_count: Option<usize>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ProcessesPayload {
pub process_count: usize,
pub top_processes: Vec<ProcessInfo>,
}
/// Request types that can be sent to the agent
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum AgentRequest {
#[serde(rename = "metrics")]
Metrics,
#[serde(rename = "disks")]
Disks,
#[serde(rename = "processes")]
Processes,
}
impl AgentRequest {
/// Convert to the legacy string format used by the agent
pub fn to_legacy_string(&self) -> String {
match self {
AgentRequest::Metrics => "get_metrics".to_string(),
AgentRequest::Disks => "get_disks".to_string(),
AgentRequest::Processes => "get_processes".to_string(),
}
}
}
/// Response types that can be received from the agent
#[derive(Debug, Clone)]
pub enum AgentResponse {
Metrics(Metrics),
Disks(Vec<DiskInfo>),
Processes(ProcessesPayload),
}

View File

@ -0,0 +1,51 @@
use socktop_connector::{
AgentRequest, AgentResponse, connect_to_socktop_agent, connect_to_socktop_agent_with_tls,
};
// Integration probe: only runs when SOCKTOP_WS is set to an agent WebSocket URL.
// Example: SOCKTOP_WS=ws://127.0.0.1:3000/ws cargo test -p socktop_connector --test integration_test -- --nocapture
#[tokio::test]
async fn probe_ws_endpoints() {
// Gate the test to avoid CI failures when no agent is running.
let url = match std::env::var("SOCKTOP_WS") {
Ok(v) if !v.is_empty() => v,
_ => {
eprintln!(
"skipping ws_probe: set SOCKTOP_WS=ws://host:port/ws to run this integration test"
);
return;
}
};
// Optional pinned CA for WSS/self-signed setups
let tls_ca = std::env::var("SOCKTOP_TLS_CA").ok();
let mut connector = if let Some(ca_path) = tls_ca {
connect_to_socktop_agent_with_tls(&url, ca_path, true)
.await
.expect("connect ws with TLS")
} else {
connect_to_socktop_agent(&url).await.expect("connect ws")
};
// Should get fast metrics quickly
let response = connector.request(AgentRequest::Metrics).await;
assert!(response.is_ok(), "expected Metrics payload within timeout");
if let Ok(AgentResponse::Metrics(_)) = response {
// Success
} else {
panic!("expected Metrics response");
}
// Processes may be gzipped and a bit slower, but should arrive
let response = connector.request(AgentRequest::Processes).await;
assert!(
response.is_ok(),
"expected Processes payload within timeout"
);
if let Ok(AgentResponse::Processes(_)) = response {
// Success
} else {
panic!("expected Processes response");
}
}