diff --git a/Cargo.lock b/Cargo.lock index 721f3d2..f90e60e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2182,7 +2182,7 @@ dependencies = [ [[package]] name = "socktop_agent" -version = "1.40.67" +version = "1.40.7" dependencies = [ "anyhow", "assert_cmd", @@ -2213,7 +2213,7 @@ dependencies = [ [[package]] name = "socktop_connector" -version = "0.1.5" +version = "0.1.6" dependencies = [ "flate2", "futures-util", diff --git a/socktop_agent/Cargo.toml b/socktop_agent/Cargo.toml index 01d3e87..69d1dbf 100644 --- a/socktop_agent/Cargo.toml +++ b/socktop_agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "socktop_agent" -version = "1.40.67" +version = "1.40.7" authors = ["Jason Witty "] description = "Socktop agent daemon. Serves host metrics over WebSocket." edition = "2024" diff --git a/socktop_connector/Cargo.toml b/socktop_connector/Cargo.toml index 36c7cb6..8c628e3 100644 --- a/socktop_connector/Cargo.toml +++ b/socktop_connector/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "socktop_connector" -version = "0.1.5" +version = "0.1.6" edition = "2024" license = "MIT" description = "WebSocket connector library for socktop agent communication" @@ -40,8 +40,8 @@ rustls-pemfile = { version = "2.1", optional = true } serde = { workspace = true } serde_json = { workspace = true } -# Compression - only for networking -flate2 = { version = "1.0", optional = true } +# Compression - used in both networking and WASM modes +flate2 = "1.0" # Protobuf - always available prost = { workspace = true } @@ -55,6 +55,6 @@ protoc-bin-vendored = "3.0" [features] default = ["networking", "tls"] -networking = ["tokio-tungstenite", "tokio", "futures-util", "url", "flate2"] +networking = ["tokio-tungstenite", "tokio", "futures-util", "url"] tls = ["networking", "rustls", "rustls-pemfile"] -wasm = ["wasm-bindgen", "wasm-bindgen-futures", "js-sys", "web-sys", "flate2"] # WASM-compatible networking with compression +wasm = ["wasm-bindgen", "wasm-bindgen-futures", "js-sys", "web-sys"] # WASM-compatible networking with compression diff --git a/socktop_connector/README.md b/socktop_connector/README.md index 6839aef..adf22ff 100644 --- a/socktop_connector/README.md +++ b/socktop_connector/README.md @@ -340,7 +340,7 @@ The library provides flexible configuration through the `ConnectorConfig` builde **Note**: Hostname verification only applies to TLS connections (`wss://`). Non-TLS connections (`ws://`) don't use certificates, so hostname verification is not applicable. -## WASM Compatibility +## WASM Compatibility (experimental) `socktop_connector` provides **full WebSocket support** for WebAssembly (WASM) environments, including complete networking functionality with automatic compression and protobuf decoding. diff --git a/socktop_connector/src/config.rs b/socktop_connector/src/config.rs new file mode 100644 index 0000000..c5eeeba --- /dev/null +++ b/socktop_connector/src/config.rs @@ -0,0 +1,48 @@ +//! Configuration for socktop WebSocket connections. + +/// Configuration for connecting to a socktop agent. +#[derive(Debug, Clone)] +pub struct ConnectorConfig { + pub url: String, + pub tls_ca_path: Option, + pub verify_hostname: bool, + pub ws_protocols: Option>, + pub ws_version: Option, +} + +impl ConnectorConfig { + /// Create a new connector configuration with the given URL. + pub fn new(url: impl Into) -> Self { + Self { + url: url.into(), + tls_ca_path: None, + verify_hostname: false, + ws_protocols: None, + ws_version: None, + } + } + + /// Set the path to a custom TLS CA certificate file. + pub fn with_tls_ca(mut self, ca_path: impl Into) -> Self { + self.tls_ca_path = Some(ca_path.into()); + self + } + + /// Enable or disable hostname verification for TLS connections. + pub fn with_hostname_verification(mut self, verify: bool) -> Self { + self.verify_hostname = verify; + self + } + + /// Set WebSocket sub-protocols to negotiate. + pub fn with_protocols(mut self, protocols: Vec) -> Self { + self.ws_protocols = Some(protocols); + self + } + + /// Set WebSocket protocol version (default is "13"). + pub fn with_version(mut self, version: impl Into) -> Self { + self.ws_version = Some(version.into()); + self + } +} diff --git a/socktop_connector/src/connector.rs b/socktop_connector/src/connector.rs index 404d341..ed3d109 100644 --- a/socktop_connector/src/connector.rs +++ b/socktop_connector/src/connector.rs @@ -1,13 +1,35 @@ //! WebSocket connector for communicating with socktop agents. -#[cfg(feature = "networking")] -use flate2::bufread::GzDecoder; +// WebSocket state constants +#[cfg(feature = "wasm")] +#[allow(dead_code)] +const WEBSOCKET_CONNECTING: u16 = 0; +#[cfg(feature = "wasm")] +#[allow(dead_code)] +const WEBSOCKET_OPEN: u16 = 1; +#[cfg(feature = "wasm")] +#[allow(dead_code)] +const WEBSOCKET_CLOSING: u16 = 2; +#[cfg(feature = "wasm")] +#[allow(dead_code)] +const WEBSOCKET_CLOSED: u16 = 3; + +// Gzip magic header constants +const GZIP_MAGIC_1: u8 = 0x1f; +const GZIP_MAGIC_2: u8 = 0x8b; + +// Shared imports for both networking and WASM +#[cfg(any(feature = "networking", feature = "wasm"))] +use flate2::read::GzDecoder; +#[cfg(any(feature = "networking", feature = "wasm"))] +use std::io::Read; +#[cfg(any(feature = "networking", feature = "wasm"))] +use prost::Message as ProstMessage; + #[cfg(feature = "networking")] use futures_util::{SinkExt, StreamExt}; #[cfg(feature = "networking")] -use prost::Message as _; -#[cfg(feature = "networking")] -use std::io::Read; +use std::io::BufReader; #[cfg(feature = "networking")] use tokio::net::TcpStream; #[cfg(feature = "networking")] @@ -24,8 +46,6 @@ use web_sys::WebSocket; #[cfg(all(feature = "wasm", not(feature = "networking")))] use pb::Processes; #[cfg(all(feature = "wasm", not(feature = "networking")))] -use prost::Message as ProstMessage; -#[cfg(all(feature = "wasm", not(feature = "networking")))] use wasm_bindgen::{JsCast, JsValue, closure::Closure}; #[cfg(feature = "tls")] @@ -39,7 +59,7 @@ use rustls::{DigitallySignedStruct, SignatureScheme}; #[cfg(feature = "tls")] use rustls_pemfile::Item; #[cfg(feature = "tls")] -use std::{fs::File, io::BufReader, sync::Arc}; +use std::{fs::File, sync::Arc}; #[cfg(feature = "tls")] use tokio_tungstenite::{Connector, connect_async_tls_with_config}; @@ -349,7 +369,7 @@ async fn request_metrics(ws: &mut WsStream) -> Option { } match ws.next().await { Some(Ok(Message::Binary(b))) => { - gunzip_to_string(&b).and_then(|s| serde_json::from_str::(&s).ok()) + gunzip_to_string(&b).ok().and_then(|s| serde_json::from_str::(&s).ok()) } Some(Ok(Message::Text(json))) => serde_json::from_str::(&json).ok(), _ => None, @@ -364,7 +384,7 @@ async fn request_disks(ws: &mut WsStream) -> Option> { } match ws.next().await { Some(Ok(Message::Binary(b))) => { - gunzip_to_string(&b).and_then(|s| serde_json::from_str::>(&s).ok()) + gunzip_to_string(&b).ok().and_then(|s| serde_json::from_str::>(&s).ok()) } Some(Ok(Message::Text(json))) => serde_json::from_str::>(&json).ok(), _ => None, @@ -384,7 +404,7 @@ async fn request_processes(ws: &mut WsStream) -> Option { match ws.next().await { Some(Ok(Message::Binary(b))) => { let gz = is_gzip(&b); - let data = if gz { gunzip_to_vec(&b)? } else { b }; + let data = if gz { gunzip_to_vec(&b).ok()? } else { b }; match pb::Processes::decode(data.as_slice()) { Ok(pb) => { let rows: Vec = pb @@ -420,25 +440,32 @@ async fn request_processes(ws: &mut WsStream) -> Option { } // Decompress a gzip-compressed binary frame into a String. -#[cfg(feature = "networking")] -fn gunzip_to_string(bytes: &[u8]) -> Option { - let mut dec = GzDecoder::new(bytes); - let mut out = String::new(); - dec.read_to_string(&mut out).ok()?; - Some(out) +/// Unified gzip decompression to string for both networking and WASM +#[cfg(any(feature = "networking", feature = "wasm"))] +fn gunzip_to_string(bytes: &[u8]) -> Result { + let mut decoder = GzDecoder::new(bytes); + let mut decompressed = String::new(); + decoder.read_to_string(&mut decompressed).map_err(|e| { + ConnectorError::protocol_error(format!("Gzip decompression failed: {e}")) + })?; + Ok(decompressed) } -#[cfg(feature = "networking")] -fn gunzip_to_vec(bytes: &[u8]) -> Option> { - let mut dec = GzDecoder::new(bytes); - let mut out = Vec::new(); - dec.read_to_end(&mut out).ok()?; - Some(out) +/// Unified gzip decompression to bytes for both networking and WASM +#[cfg(any(feature = "networking", feature = "wasm"))] +fn gunzip_to_vec(bytes: &[u8]) -> Result> { + let mut decoder = GzDecoder::new(bytes); + let mut decompressed = Vec::new(); + decoder.read_to_end(&mut decompressed).map_err(|e| { + ConnectorError::protocol_error(format!("Gzip decompression failed: {e}")) + })?; + Ok(decompressed) } -#[cfg(feature = "networking")] +/// Unified gzip detection for both networking and WASM +#[cfg(any(feature = "networking", feature = "wasm"))] fn is_gzip(bytes: &[u8]) -> bool { - bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b + bytes.len() >= 2 && bytes[0] == GZIP_MAGIC_1 && bytes[1] == GZIP_MAGIC_2 } /// Convenience function to create a connector and connect in one step. @@ -522,7 +549,7 @@ impl SocktopConnector { /// Connect to the agent using WASM WebSocket pub async fn connect(&mut self) -> Result<()> { let websocket = WebSocket::new(&self.config.url).map_err(|e| { - ConnectorError::protocol_error(&format!("Failed to create WebSocket: {:?}", e)) + ConnectorError::protocol_error(format!("Failed to create WebSocket: {e:?}")) })?; // Set binary type for proper message handling @@ -536,15 +563,15 @@ impl SocktopConnector { loop { let ready_state = websocket.ready_state(); - if ready_state == 1 { + if ready_state == WEBSOCKET_OPEN { // OPEN - connection is ready break; - } else if ready_state == 3 { + } else if ready_state == WEBSOCKET_CLOSED { // CLOSED return Err(ConnectorError::protocol_error( "WebSocket connection closed", )); - } else if ready_state == 2 { + } else if ready_state == WEBSOCKET_CLOSING { // CLOSING return Err(ConnectorError::protocol_error("WebSocket is closing")); } @@ -589,7 +616,7 @@ impl SocktopConnector { // Send request ws.send_with_str(&request_string).map_err(|e| { - ConnectorError::protocol_error(&format!("Failed to send message: {:?}", e)) + ConnectorError::protocol_error(format!("Failed to send message: {e:?}")) })?; // Wait for response using JavaScript Promise @@ -616,7 +643,7 @@ impl SocktopConnector { mem_used: 0, swap_total: 0, swap_used: 0, - hostname: format!("Binary protobuf data ({} bytes)", byte_count), + hostname: format!("Binary protobuf data ({byte_count} bytes)"), cpu_temp_c: None, disks: vec![], networks: vec![], @@ -628,9 +655,8 @@ impl SocktopConnector { } else { // Try to parse as JSON (fallback) let metrics: Metrics = serde_json::from_str(&response).map_err(|e| { - ConnectorError::serialization_error(&format!( - "Failed to parse metrics: {}", - e + ConnectorError::serialization_error(format!( + "Failed to parse metrics: {e}" )) })?; Ok(AgentResponse::Metrics(metrics)) @@ -638,7 +664,7 @@ impl SocktopConnector { } AgentRequest::Disks => { let disks: Vec = serde_json::from_str(&response).map_err(|e| { - ConnectorError::serialization_error(&format!("Failed to parse disks: {}", e)) + ConnectorError::serialization_error(format!("Failed to parse disks: {e}")) })?; Ok(AgentResponse::Disks(disks)) } @@ -658,9 +684,9 @@ impl SocktopConnector { if let Some(ref data) = binary_data { log_debug(&format!("🔍 Binary data size: {} bytes", data.len())); // Check if it's gzipped data and decompress it first - if is_gzip_data(data) { + if is_gzip(data) { log_debug("🔍 Process data is gzipped, decompressing..."); - match gunzip_to_vec_wasm(data) { + match gunzip_to_vec(data) { Ok(decompressed_bytes) => { log_debug(&format!( "🔍 Successfully decompressed {} bytes, now decoding protobuf...", @@ -697,16 +723,14 @@ impl SocktopConnector { } Err(e) => { log_debug(&format!( - "❌ Failed to decode decompressed protobuf: {}", - e + "❌ Failed to decode decompressed protobuf: {e}" )); } } } Err(e) => { log_debug(&format!( - "❌ Failed to decompress gzipped process data: {}", - e + "❌ Failed to decompress gzipped process data: {e}" )); } } @@ -750,16 +774,16 @@ impl SocktopConnector { top_processes: processes, process_count: protobuf_processes.process_count as usize, }; - return Ok(AgentResponse::Processes(processes_payload)); + Ok(AgentResponse::Processes(processes_payload)) } Err(e) => { - log_debug(&format!("❌ Failed to decode protobuf: {}", e)); + log_debug(&format!("❌ Failed to decode protobuf: {e}")); // Fallback to empty processes let processes = ProcessesPayload { top_processes: vec![], process_count: 0, }; - return Ok(AgentResponse::Processes(processes)); + Ok(AgentResponse::Processes(processes)) } } } else { @@ -770,15 +794,14 @@ impl SocktopConnector { top_processes: vec![], process_count: 0, }; - return Ok(AgentResponse::Processes(processes)); + Ok(AgentResponse::Processes(processes)) } } else { // Try to parse as JSON (fallback) let processes: ProcessesPayload = serde_json::from_str(&response).map_err(|e| { - ConnectorError::serialization_error(&format!( - "Failed to parse processes: {}", - e + ConnectorError::serialization_error(format!( + "Failed to parse processes: {e}" )) })?; Ok(AgentResponse::Processes(processes)) @@ -823,7 +846,7 @@ impl SocktopConnector { } else { message.clone() }; - log_debug(&format!("🔍 Received text: {}", preview)); + log_debug(&format!("🔍 Received text: {preview}")); *response_cell.borrow_mut() = Some(message); *response_received.borrow_mut() = true; @@ -836,7 +859,7 @@ impl SocktopConnector { let mut bytes = vec![0u8; length]; uint8_array.copy_to(&mut bytes); - log_debug(&format!("🔍 Received binary data: {} bytes", length)); + log_debug(&format!("🔍 Received binary data: {length} bytes")); // Debug: Log the first few bytes to see what we're dealing with let first_bytes = if bytes.len() >= 4 { @@ -847,7 +870,7 @@ impl SocktopConnector { } else { format!("Only {} bytes available", bytes.len()) }; - log_debug(&format!("🔍 First bytes: {}", first_bytes)); + log_debug(&format!("🔍 First bytes: {first_bytes}")); // Try to decode as UTF-8 text first (in case it's JSON sent as binary) match String::from_utf8(bytes.clone()) { @@ -882,13 +905,12 @@ impl SocktopConnector { } Err(_) => { // If it's not valid UTF-8, check if it's gzipped data - if is_gzip_data(&bytes) { + if is_gzip(&bytes) { log_debug(&format!( - "🔍 Binary data appears to be gzipped ({} bytes)", - length + "🔍 Binary data appears to be gzipped ({length} bytes)" )); - // Try to decompress using WASI-compatible decompression - match decompress_gzip_browser(&bytes) { + // Try to decompress using unified gzip decompression + match gunzip_to_string(&bytes) { Ok(decompressed_text) => { log_debug(&format!( "🔍 Gzipped data decompressed to text: {}", @@ -903,25 +925,23 @@ impl SocktopConnector { } Err(e) => { log_debug(&format!( - "🔍 Failed to decompress gzip: {}", - e + "🔍 Failed to decompress gzip: {e}" )); // Fallback: treat as actual binary protobuf data *binary_data_cell.borrow_mut() = Some(bytes.clone()); *response_cell.borrow_mut() = - Some(format!("BINARY_DATA:{}", length)); + Some(format!("BINARY_DATA:{length}")); *response_received.borrow_mut() = true; } } } else { // If it's not valid UTF-8 and not gzipped, it's likely actual binary protobuf data log_debug(&format!( - "🔍 Binary data is actual protobuf ({} bytes)", - length + "🔍 Binary data is actual protobuf ({length} bytes)" )); *binary_data_cell.borrow_mut() = Some(bytes); *response_cell.borrow_mut() = - Some(format!("BINARY_DATA:{}", length)); + Some(format!("BINARY_DATA:{length}")); *response_received.borrow_mut() = true; } } @@ -990,7 +1010,7 @@ impl SocktopConnector { pub fn is_connected(&self) -> bool { self.websocket .as_ref() - .map_or(false, |ws| ws.ready_state() == 1) // 1 = OPEN + .is_some_and(|ws| ws.ready_state() == WEBSOCKET_OPEN) } /// Disconnect from the agent @@ -1033,43 +1053,17 @@ impl SocktopConnector { } // Helper function for logging that works in WASI environments -#[cfg(all(feature = "wasm", not(feature = "networking")))] +/// Unified debug logging for both networking and WASM modes +#[cfg(any(feature = "networking", feature = "wasm"))] +#[allow(dead_code)] fn log_debug(message: &str) { - // For WASI environments like Zellij plugins, use eprintln - eprintln!("{}", message); -} -#[cfg(all(feature = "wasm", not(feature = "networking")))] -fn is_gzip_data(bytes: &[u8]) -> bool { - // Gzip files start with the magic bytes 0x1f 0x8b - bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b -} - -#[cfg(all(feature = "wasm", not(feature = "networking")))] -fn decompress_gzip_browser(bytes: &[u8]) -> Result { - use flate2::read::GzDecoder; - use std::io::Read; - - let mut decoder = GzDecoder::new(bytes); - let mut decompressed = String::new(); - decoder.read_to_string(&mut decompressed).map_err(|e| { - ConnectorError::protocol_error(&format!("Gzip decompression failed: {}", e)) - })?; - - Ok(decompressed) -} - -#[cfg(all(feature = "wasm", not(feature = "networking")))] -fn gunzip_to_vec_wasm(bytes: &[u8]) -> Result> { - use flate2::read::GzDecoder; - use std::io::Read; - - let mut decoder = GzDecoder::new(bytes); - let mut decompressed = Vec::new(); - decoder.read_to_end(&mut decompressed).map_err(|e| { - ConnectorError::protocol_error(&format!("Gzip decompression failed: {}", e)) - })?; - - Ok(decompressed) + #[cfg(feature = "networking")] + if std::env::var("SOCKTOP_DEBUG").ok().as_deref() == Some("1") { + eprintln!("{message}"); + } + + #[cfg(all(feature = "wasm", not(feature = "networking")))] + eprintln!("{message}"); } // Stub implementations when neither networking nor wasm is enabled diff --git a/socktop_connector/src/connector_impl.rs b/socktop_connector/src/connector_impl.rs new file mode 100644 index 0000000..bcac691 --- /dev/null +++ b/socktop_connector/src/connector_impl.rs @@ -0,0 +1,261 @@ +//! Modular SocktopConnector implementation using networking and WASM modules. + +use crate::config::ConnectorConfig; +use crate::error::{ConnectorError, Result}; +use crate::{AgentRequest, AgentResponse}; + +#[cfg(feature = "networking")] +use crate::networking::{ + WsStream, connect_to_agent, request_disks, request_metrics, request_processes, +}; + +#[cfg(all(feature = "wasm", not(feature = "networking")))] +use crate::wasm::{connect_to_agent, send_request_and_wait}; + +#[cfg(all(feature = "wasm", not(feature = "networking")))] +use crate::{DiskInfo, Metrics, ProcessesPayload}; + +#[cfg(all(feature = "wasm", not(feature = "networking")))] +use web_sys::WebSocket; + +/// Main connector for communicating with socktop agents +pub struct SocktopConnector { + pub config: ConnectorConfig, + #[cfg(feature = "networking")] + stream: Option, + #[cfg(all(feature = "wasm", not(feature = "networking")))] + websocket: Option, +} + +impl SocktopConnector { + /// Create a new connector with the given configuration + pub fn new(config: ConnectorConfig) -> Self { + Self { + config, + #[cfg(feature = "networking")] + stream: None, + #[cfg(all(feature = "wasm", not(feature = "networking")))] + websocket: None, + } + } +} + +#[cfg(feature = "networking")] +impl SocktopConnector { + /// Connect to the agent + pub async fn connect(&mut self) -> Result<()> { + let stream = connect_to_agent(&self.config).await?; + self.stream = Some(stream); + Ok(()) + } + + /// Send a request to the agent and get the response + pub async fn request(&mut self, request: AgentRequest) -> Result { + let stream = self.stream.as_mut().ok_or(ConnectorError::NotConnected)?; + + match request { + AgentRequest::Metrics => { + let metrics = request_metrics(stream) + .await + .ok_or_else(|| ConnectorError::invalid_response("Failed to get metrics"))?; + Ok(AgentResponse::Metrics(metrics)) + } + AgentRequest::Disks => { + let disks = request_disks(stream) + .await + .ok_or_else(|| ConnectorError::invalid_response("Failed to get disks"))?; + Ok(AgentResponse::Disks(disks)) + } + AgentRequest::Processes => { + let processes = request_processes(stream) + .await + .ok_or_else(|| ConnectorError::invalid_response("Failed to get processes"))?; + Ok(AgentResponse::Processes(processes)) + } + } + } + + /// Check if the connector is connected + pub fn is_connected(&self) -> bool { + self.stream.is_some() + } + + /// Disconnect from the agent + pub async fn disconnect(&mut self) -> Result<()> { + if let Some(mut stream) = self.stream.take() { + let _ = stream.close(None).await; + } + Ok(()) + } +} + +// WASM WebSocket implementation +#[cfg(all(feature = "wasm", not(feature = "networking")))] +impl SocktopConnector { + /// Connect to the agent using WASM WebSocket + pub async fn connect(&mut self) -> Result<()> { + let websocket = connect_to_agent(&self.config).await?; + self.websocket = Some(websocket); + Ok(()) + } + + /// Send a request to the agent and get the response + pub async fn request(&mut self, request: AgentRequest) -> Result { + let ws = self + .websocket + .as_ref() + .ok_or(ConnectorError::NotConnected)?; + + send_request_and_wait(ws, request).await + } + + /// Check if the connector is connected + pub fn is_connected(&self) -> bool { + use crate::utils::WEBSOCKET_OPEN; + self.websocket + .as_ref() + .is_some_and(|ws| ws.ready_state() == WEBSOCKET_OPEN) + } + + /// Disconnect from the agent + pub async fn disconnect(&mut self) -> Result<()> { + if let Some(ws) = self.websocket.take() { + let _ = ws.close(); + } + Ok(()) + } + + /// Request metrics from the agent + pub async fn get_metrics(&mut self) -> Result { + match self.request(AgentRequest::Metrics).await? { + AgentResponse::Metrics(metrics) => Ok(metrics), + _ => Err(ConnectorError::protocol_error( + "Unexpected response type for metrics", + )), + } + } + + /// Request disk information from the agent + pub async fn get_disks(&mut self) -> Result> { + match self.request(AgentRequest::Disks).await? { + AgentResponse::Disks(disks) => Ok(disks), + _ => Err(ConnectorError::protocol_error( + "Unexpected response type for disks", + )), + } + } + + /// Request process information from the agent + pub async fn get_processes(&mut self) -> Result { + match self.request(AgentRequest::Processes).await? { + AgentResponse::Processes(processes) => Ok(processes), + _ => Err(ConnectorError::protocol_error( + "Unexpected response type for processes", + )), + } + } +} + +// Stub implementations when neither networking nor wasm is enabled +#[cfg(not(any(feature = "networking", feature = "wasm")))] +impl SocktopConnector { + /// Connect to the socktop agent endpoint. + /// + /// Note: Networking functionality is disabled. Enable the "networking" feature to use this function. + pub async fn connect(&mut self) -> Result<()> { + Err(ConnectorError::protocol_error( + "Networking functionality disabled. Enable the 'networking' feature to connect to agents.", + )) + } + + /// Send a request to the agent and await a response. + /// + /// Note: Networking functionality is disabled. Enable the "networking" feature to use this function. + pub async fn request(&mut self, _request: AgentRequest) -> Result { + Err(ConnectorError::protocol_error( + "Networking functionality disabled. Enable the 'networking' feature to send requests.", + )) + } + + /// Close the connection to the agent. + /// + /// Note: Networking functionality is disabled. This is a no-op when networking is disabled. + pub async fn disconnect(&mut self) -> Result<()> { + Ok(()) // No-op when networking is disabled + } +} + +/// Convenience function to create a connector and connect in one step. +/// +/// This function is for non-TLS WebSocket connections (`ws://`). Since there's no +/// certificate involved, hostname verification is not applicable. +/// +/// For TLS connections with certificate pinning, use `connect_to_socktop_agent_with_tls()`. +#[cfg(feature = "networking")] +pub async fn connect_to_socktop_agent(url: impl Into) -> Result { + let config = ConnectorConfig::new(url); + let mut connector = SocktopConnector::new(config); + connector.connect().await?; + Ok(connector) +} + +/// Convenience function to create a connector with TLS and connect in one step. +/// +/// This function enables TLS with certificate pinning using the provided CA certificate. +/// The `verify_hostname` parameter controls whether the server's hostname is verified +/// against the certificate (recommended for production, can be disabled for testing). +#[cfg(feature = "tls")] +#[cfg(feature = "networking")] +#[cfg_attr(docsrs, doc(cfg(feature = "tls")))] +pub async fn connect_to_socktop_agent_with_tls( + url: impl Into, + ca_path: impl Into, + verify_hostname: bool, +) -> Result { + let config = ConnectorConfig::new(url) + .with_tls_ca(ca_path) + .with_hostname_verification(verify_hostname); + let mut connector = SocktopConnector::new(config); + connector.connect().await?; + Ok(connector) +} + +/// Convenience function to create a connector with custom WebSocket protocol configuration. +/// +/// This function allows you to specify WebSocket protocol version and sub-protocols. +/// Most users should use the simpler `connect_to_socktop_agent()` function instead. +/// +/// # Example +/// ```no_run +/// use socktop_connector::connect_to_socktop_agent_with_config; +/// +/// # #[tokio::main] +/// # async fn main() -> Result<(), Box> { +/// let connector = connect_to_socktop_agent_with_config( +/// "ws://localhost:3000/ws", +/// Some(vec!["socktop".to_string()]), // WebSocket sub-protocols +/// Some("13".to_string()), // WebSocket version (13 is standard) +/// ).await?; +/// # Ok(()) +/// # } +/// ``` +#[cfg(feature = "networking")] +pub async fn connect_to_socktop_agent_with_config( + url: impl Into, + protocols: Option>, + version: Option, +) -> Result { + let mut config = ConnectorConfig::new(url); + + if let Some(protocols) = protocols { + config = config.with_protocols(protocols); + } + + if let Some(version) = version { + config = config.with_version(version); + } + + let mut connector = SocktopConnector::new(config); + connector.connect().await?; + Ok(connector) +} diff --git a/socktop_connector/src/lib.rs b/socktop_connector/src/lib.rs index 5ea1660..a819aaf 100644 --- a/socktop_connector/src/lib.rs +++ b/socktop_connector/src/lib.rs @@ -140,19 +140,43 @@ #![cfg_attr(docsrs, feature(doc_cfg))] -pub mod connector; +// Core modules +pub mod config; pub mod error; pub mod types; +pub mod utils; -pub use connector::{ConnectorConfig, SocktopConnector}; - +// Implementation modules #[cfg(feature = "networking")] -pub use connector::{WsStream, connect_to_socktop_agent, connect_to_socktop_agent_with_config}; +pub mod networking; -#[cfg(all(feature = "tls", feature = "networking"))] -pub use connector::connect_to_socktop_agent_with_tls; +#[cfg(feature = "wasm")] +pub mod wasm; + +// Main connector implementation +pub mod connector_impl; + +// Re-export the main types +pub use config::ConnectorConfig; +pub use connector_impl::SocktopConnector; pub use error::{ConnectorError, Result}; pub use types::{ AgentRequest, AgentResponse, DiskInfo, GpuInfo, Metrics, NetworkInfo, ProcessInfo, ProcessesPayload, }; + +// Re-export convenience functions +#[cfg(feature = "networking")] +pub use connector_impl::{connect_to_socktop_agent, connect_to_socktop_agent_with_config}; + +#[cfg(all(feature = "tls", feature = "networking"))] +pub use connector_impl::connect_to_socktop_agent_with_tls; + +#[cfg(feature = "networking")] +pub use networking::WsStream; + +// Protobuf types for internal use +#[cfg(any(feature = "networking", feature = "wasm"))] +pub mod pb { + include!(concat!(env!("OUT_DIR"), "/socktop.rs")); +} diff --git a/socktop_connector/src/networking/connection.rs b/socktop_connector/src/networking/connection.rs new file mode 100644 index 0000000..6816444 --- /dev/null +++ b/socktop_connector/src/networking/connection.rs @@ -0,0 +1,185 @@ +//! WebSocket connection handling for native (non-WASM) environments. + +use crate::config::ConnectorConfig; +use crate::error::{ConnectorError, Result}; + +use std::io::BufReader; +use std::sync::Arc; +use tokio_tungstenite::tungstenite::client::IntoClientRequest; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async}; +use url::Url; + +#[cfg(feature = "tls")] +use { + rustls::{self, ClientConfig}, + rustls::{ + DigitallySignedStruct, RootCertStore, SignatureScheme, + client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}, + crypto::ring, + pki_types::{CertificateDer, ServerName, UnixTime}, + }, + rustls_pemfile::Item, + std::fs::File, + tokio_tungstenite::Connector, +}; + +pub type WsStream = WebSocketStream>; + +/// Connect to the agent and return the WS stream +pub async fn connect_to_agent(config: &ConnectorConfig) -> Result { + #[cfg(feature = "tls")] + ensure_crypto_provider(); + + let mut u = Url::parse(&config.url)?; + if let Some(ca_path) = &config.tls_ca_path { + if u.scheme() == "ws" { + let _ = u.set_scheme("wss"); + } + return connect_with_ca_and_config(u.as_str(), ca_path, config).await; + } + // No TLS - hostname verification is not applicable + connect_without_ca_and_config(u.as_str(), config).await +} + +async fn connect_without_ca_and_config(url: &str, config: &ConnectorConfig) -> Result { + let mut req = url.into_client_request()?; + + // Apply WebSocket protocol configuration + if let Some(version) = &config.ws_version { + req.headers_mut().insert( + "Sec-WebSocket-Version", + version + .parse() + .map_err(|_| ConnectorError::protocol_error("Invalid WebSocket version"))?, + ); + } + + if let Some(protocols) = &config.ws_protocols { + let protocols_str = protocols.join(", "); + req.headers_mut().insert( + "Sec-WebSocket-Protocol", + protocols_str + .parse() + .map_err(|_| ConnectorError::protocol_error("Invalid WebSocket protocols"))?, + ); + } + + let (ws, _) = connect_async(req).await?; + Ok(ws) +} + +#[cfg(feature = "tls")] +async fn connect_with_ca_and_config( + url: &str, + ca_path: &str, + config: &ConnectorConfig, +) -> Result { + // Initialize the crypto provider for rustls + let _ = rustls::crypto::ring::default_provider().install_default(); + + let mut root = RootCertStore::empty(); + let mut reader = BufReader::new(File::open(ca_path)?); + let mut der_certs = Vec::new(); + while let Ok(Some(item)) = rustls_pemfile::read_one(&mut reader) { + if let Item::X509Certificate(der) = item { + der_certs.push(der); + } + } + root.add_parsable_certificates(der_certs); + + let mut cfg = ClientConfig::builder() + .with_root_certificates(root) + .with_no_client_auth(); + + let mut req = url.into_client_request()?; + + // Apply WebSocket protocol configuration + if let Some(version) = &config.ws_version { + req.headers_mut().insert( + "Sec-WebSocket-Version", + version + .parse() + .map_err(|_| ConnectorError::protocol_error("Invalid WebSocket version"))?, + ); + } + + if let Some(protocols) = &config.ws_protocols { + let protocols_str = protocols.join(", "); + req.headers_mut().insert( + "Sec-WebSocket-Protocol", + protocols_str + .parse() + .map_err(|_| ConnectorError::protocol_error("Invalid WebSocket protocols"))?, + ); + } + + if !config.verify_hostname { + #[derive(Debug)] + struct NoVerify; + impl ServerCertVerifier for NoVerify { + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &ServerName, + _ocsp_response: &[u8], + _now: UnixTime, + ) -> std::result::Result { + Ok(ServerCertVerified::assertion()) + } + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> std::result::Result { + Ok(HandshakeSignatureValid::assertion()) + } + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> std::result::Result { + Ok(HandshakeSignatureValid::assertion()) + } + fn supported_verify_schemes(&self) -> Vec { + vec![ + SignatureScheme::ECDSA_NISTP256_SHA256, + SignatureScheme::ED25519, + SignatureScheme::RSA_PSS_SHA256, + ] + } + } + cfg.dangerous().set_certificate_verifier(Arc::new(NoVerify)); + eprintln!( + "socktop_connector: hostname verification disabled (default). Set SOCKTOP_VERIFY_NAME=1 to enable strict SAN checking." + ); + } + let cfg = Arc::new(cfg); + let (ws, _) = tokio_tungstenite::connect_async_tls_with_config( + req, + None, + config.verify_hostname, + Some(Connector::Rustls(cfg)), + ) + .await?; + Ok(ws) +} + +#[cfg(not(feature = "tls"))] +async fn connect_with_ca_and_config( + _url: &str, + _ca_path: &str, + _config: &ConnectorConfig, +) -> Result { + Err(ConnectorError::tls_error( + "TLS support not compiled in", + std::io::Error::new(std::io::ErrorKind::Unsupported, "TLS not available"), + )) +} + +#[cfg(feature = "tls")] +fn ensure_crypto_provider() { + let _ = ring::default_provider().install_default(); +} diff --git a/socktop_connector/src/networking/mod.rs b/socktop_connector/src/networking/mod.rs new file mode 100644 index 0000000..1137f1f --- /dev/null +++ b/socktop_connector/src/networking/mod.rs @@ -0,0 +1,7 @@ +//! Networking module for native WebSocket connections. + +pub mod connection; +pub mod requests; + +pub use connection::*; +pub use requests::*; diff --git a/socktop_connector/src/networking/requests.rs b/socktop_connector/src/networking/requests.rs new file mode 100644 index 0000000..9cb85bc --- /dev/null +++ b/socktop_connector/src/networking/requests.rs @@ -0,0 +1,84 @@ +//! WebSocket request handlers for native (non-WASM) environments. + +use crate::networking::WsStream; +use crate::utils::{gunzip_to_string, gunzip_to_vec, is_gzip}; +use crate::{DiskInfo, Metrics, ProcessInfo, ProcessesPayload, pb}; + +use futures_util::{SinkExt, StreamExt}; +use prost::Message as ProstMessage; +use tokio_tungstenite::tungstenite::Message; + +/// Send a "get_metrics" request and await a single JSON reply +pub async fn request_metrics(ws: &mut WsStream) -> Option { + if ws.send(Message::Text("get_metrics".into())).await.is_err() { + return None; + } + match ws.next().await { + Some(Ok(Message::Binary(b))) => gunzip_to_string(&b) + .ok() + .and_then(|s| serde_json::from_str::(&s).ok()), + Some(Ok(Message::Text(json))) => serde_json::from_str::(&json).ok(), + _ => None, + } +} + +/// Send a "get_disks" request and await a JSON Vec +pub async fn request_disks(ws: &mut WsStream) -> Option> { + if ws.send(Message::Text("get_disks".into())).await.is_err() { + return None; + } + match ws.next().await { + Some(Ok(Message::Binary(b))) => gunzip_to_string(&b) + .ok() + .and_then(|s| serde_json::from_str::>(&s).ok()), + Some(Ok(Message::Text(json))) => serde_json::from_str::>(&json).ok(), + _ => None, + } +} + +/// Send a "get_processes" request and await a ProcessesPayload decoded from protobuf (binary, may be gzipped) +pub async fn request_processes(ws: &mut WsStream) -> Option { + if ws + .send(Message::Text("get_processes".into())) + .await + .is_err() + { + return None; + } + match ws.next().await { + Some(Ok(Message::Binary(b))) => { + let gz = is_gzip(&b); + let data = if gz { gunzip_to_vec(&b).ok()? } else { b }; + match pb::Processes::decode(data.as_slice()) { + Ok(pb) => { + let rows: Vec = pb + .rows + .into_iter() + .map(|p: pb::Process| ProcessInfo { + pid: p.pid, + name: p.name, + cpu_usage: p.cpu_usage, + mem_bytes: p.mem_bytes, + }) + .collect(); + Some(ProcessesPayload { + process_count: pb.process_count as usize, + top_processes: rows, + }) + } + Err(e) => { + if std::env::var("SOCKTOP_DEBUG").ok().as_deref() == Some("1") { + eprintln!("protobuf decode failed: {e}"); + } + // Fallback: maybe it's JSON (bytes already decompressed if gz) + match String::from_utf8(data) { + Ok(s) => serde_json::from_str::(&s).ok(), + Err(_) => None, + } + } + } + } + Some(Ok(Message::Text(json))) => serde_json::from_str::(&json).ok(), + _ => None, + } +} diff --git a/socktop_connector/src/utils.rs b/socktop_connector/src/utils.rs new file mode 100644 index 0000000..f42c3cd --- /dev/null +++ b/socktop_connector/src/utils.rs @@ -0,0 +1,67 @@ +//! Shared utilities for both networking and WASM implementations. + +#[cfg(any(feature = "networking", feature = "wasm"))] +use flate2::read::GzDecoder; +#[cfg(any(feature = "networking", feature = "wasm"))] +use std::io::Read; + +use crate::error::{ConnectorError, Result}; + +// WebSocket state constants +#[cfg(feature = "wasm")] +#[allow(dead_code)] +pub const WEBSOCKET_CONNECTING: u16 = 0; +#[cfg(feature = "wasm")] +#[allow(dead_code)] +pub const WEBSOCKET_OPEN: u16 = 1; +#[cfg(feature = "wasm")] +#[allow(dead_code)] +pub const WEBSOCKET_CLOSING: u16 = 2; +#[cfg(feature = "wasm")] +#[allow(dead_code)] +pub const WEBSOCKET_CLOSED: u16 = 3; + +// Gzip magic header constants +pub const GZIP_MAGIC_1: u8 = 0x1f; +pub const GZIP_MAGIC_2: u8 = 0x8b; + +/// Unified gzip decompression to string for both networking and WASM +#[cfg(any(feature = "networking", feature = "wasm"))] +pub fn gunzip_to_string(bytes: &[u8]) -> Result { + let mut decoder = GzDecoder::new(bytes); + let mut decompressed = String::new(); + decoder + .read_to_string(&mut decompressed) + .map_err(|e| ConnectorError::protocol_error(format!("Gzip decompression failed: {e}")))?; + Ok(decompressed) +} + +/// Unified gzip decompression to bytes for both networking and WASM +#[cfg(any(feature = "networking", feature = "wasm"))] +pub fn gunzip_to_vec(bytes: &[u8]) -> Result> { + let mut decoder = GzDecoder::new(bytes); + let mut decompressed = Vec::new(); + decoder + .read_to_end(&mut decompressed) + .map_err(|e| ConnectorError::protocol_error(format!("Gzip decompression failed: {e}")))?; + Ok(decompressed) +} + +/// Unified gzip detection for both networking and WASM +#[cfg(any(feature = "networking", feature = "wasm"))] +pub fn is_gzip(bytes: &[u8]) -> bool { + bytes.len() >= 2 && bytes[0] == GZIP_MAGIC_1 && bytes[1] == GZIP_MAGIC_2 +} + +/// Unified debug logging for both networking and WASM modes +#[cfg(any(feature = "networking", feature = "wasm"))] +#[allow(dead_code)] +pub fn log_debug(message: &str) { + #[cfg(feature = "networking")] + if std::env::var("SOCKTOP_DEBUG").ok().as_deref() == Some("1") { + eprintln!("{message}"); + } + + #[cfg(all(feature = "wasm", not(feature = "networking")))] + eprintln!("{message}"); +} diff --git a/socktop_connector/src/wasm/connection.rs b/socktop_connector/src/wasm/connection.rs new file mode 100644 index 0000000..9280d12 --- /dev/null +++ b/socktop_connector/src/wasm/connection.rs @@ -0,0 +1,66 @@ +//! WebSocket connection handling for WASM environments. + +use crate::config::ConnectorConfig; +use crate::error::{ConnectorError, Result}; +use crate::utils::{WEBSOCKET_CLOSED, WEBSOCKET_CLOSING, WEBSOCKET_OPEN}; + +use wasm_bindgen::JsCast; +use wasm_bindgen::prelude::*; +use web_sys::WebSocket; + +/// Connect to the agent using WASM WebSocket +pub async fn connect_to_agent(config: &ConnectorConfig) -> Result { + let websocket = WebSocket::new(&config.url).map_err(|e| { + ConnectorError::protocol_error(format!("Failed to create WebSocket: {e:?}")) + })?; + + // Set binary type for proper message handling + websocket.set_binary_type(web_sys::BinaryType::Arraybuffer); + + // Wait for connection to be ready with proper async delays + let start_time = js_sys::Date::now(); + let timeout_ms = 10000.0; // 10 second timeout (increased from 5) + + // Poll connection status until ready or timeout + loop { + let ready_state = websocket.ready_state(); + + if ready_state == WEBSOCKET_OPEN { + // OPEN - connection is ready + break; + } else if ready_state == WEBSOCKET_CLOSED { + // CLOSED + return Err(ConnectorError::protocol_error( + "WebSocket connection closed", + )); + } else if ready_state == WEBSOCKET_CLOSING { + // CLOSING + return Err(ConnectorError::protocol_error("WebSocket is closing")); + } + + // Check timeout + let now = js_sys::Date::now(); + if now - start_time > timeout_ms { + return Err(ConnectorError::protocol_error( + "WebSocket connection timeout", + )); + } + + // Proper async delay using setTimeout Promise + let promise = js_sys::Promise::new(&mut |resolve, _| { + let closure = Closure::once(move || resolve.call0(&JsValue::UNDEFINED)); + web_sys::window() + .unwrap() + .set_timeout_with_callback_and_timeout_and_arguments_0( + closure.as_ref().unchecked_ref(), + 100, // 100ms delay between polls + ) + .unwrap(); + closure.forget(); + }); + + let _ = wasm_bindgen_futures::JsFuture::from(promise).await; + } + + Ok(websocket) +} diff --git a/socktop_connector/src/wasm/mod.rs b/socktop_connector/src/wasm/mod.rs new file mode 100644 index 0000000..6452ac4 --- /dev/null +++ b/socktop_connector/src/wasm/mod.rs @@ -0,0 +1,7 @@ +//! WASM module for browser WebSocket connections. + +pub mod connection; +pub mod requests; + +pub use connection::*; +pub use requests::*; diff --git a/socktop_connector/src/wasm/requests.rs b/socktop_connector/src/wasm/requests.rs new file mode 100644 index 0000000..9ecea53 --- /dev/null +++ b/socktop_connector/src/wasm/requests.rs @@ -0,0 +1,398 @@ +//! WebSocket request handlers for WASM environments. + +use crate::error::{ConnectorError, Result}; +use crate::pb::Processes; +use crate::utils::{gunzip_to_string, gunzip_to_vec, is_gzip, log_debug}; +use crate::{AgentRequest, AgentResponse, DiskInfo, Metrics, ProcessInfo, ProcessesPayload}; + +use prost::Message as ProstMessage; +use std::cell::RefCell; +use std::rc::Rc; +use wasm_bindgen::JsCast; +use wasm_bindgen::prelude::*; +use web_sys::WebSocket; + +/// Send a request and wait for response with binary data handling +pub async fn send_request_and_wait( + websocket: &WebSocket, + request: AgentRequest, +) -> Result { + // Use the legacy string format that the agent expects + let request_string = request.to_legacy_string(); + + // Send request + websocket + .send_with_str(&request_string) + .map_err(|e| ConnectorError::protocol_error(format!("Failed to send message: {e:?}")))?; + + // Wait for response using JavaScript Promise + let (response, binary_data) = wait_for_response_with_binary(websocket).await?; + + // Parse the response based on the request type + match request { + AgentRequest::Metrics => { + // Check if this is binary data (protobuf from agent) + if response.starts_with("BINARY_DATA:") { + // Extract the byte count + let byte_count: usize = response + .strip_prefix("BINARY_DATA:") + .unwrap_or("0") + .parse() + .unwrap_or(0); + + // For now, return a placeholder metrics response indicating binary data received + // TODO: Implement proper protobuf decoding for binary data + let placeholder_metrics = Metrics { + cpu_total: 0.0, + cpu_per_core: vec![0.0], + mem_total: 0, + mem_used: 0, + swap_total: 0, + swap_used: 0, + hostname: format!("Binary protobuf data ({byte_count} bytes)"), + cpu_temp_c: None, + disks: vec![], + networks: vec![], + top_processes: vec![], + gpus: None, + process_count: None, + }; + Ok(AgentResponse::Metrics(placeholder_metrics)) + } else { + // Try to parse as JSON (fallback) + let metrics: Metrics = serde_json::from_str(&response).map_err(|e| { + ConnectorError::serialization_error(format!("Failed to parse metrics: {e}")) + })?; + Ok(AgentResponse::Metrics(metrics)) + } + } + AgentRequest::Disks => { + let disks: Vec = serde_json::from_str(&response).map_err(|e| { + ConnectorError::serialization_error(format!("Failed to parse disks: {e}")) + })?; + Ok(AgentResponse::Disks(disks)) + } + AgentRequest::Processes => { + log_debug(&format!( + "🔍 Processing process request - response: {}", + if response.len() > 100 { + format!("{}...", &response[..100]) + } else { + response.clone() + } + )); + log_debug(&format!( + "🔍 Binary data available: {}", + binary_data.is_some() + )); + if let Some(ref data) = binary_data { + log_debug(&format!("🔍 Binary data size: {} bytes", data.len())); + // Check if it's gzipped data and decompress it first + if is_gzip(data) { + log_debug("🔍 Process data is gzipped, decompressing..."); + match gunzip_to_vec(data) { + Ok(decompressed_bytes) => { + log_debug(&format!( + "🔍 Successfully decompressed {} bytes, now decoding protobuf...", + decompressed_bytes.len() + )); + // Now decode the decompressed bytes as protobuf + match ::decode(decompressed_bytes.as_slice()) + { + Ok(protobuf_processes) => { + log_debug(&format!( + "✅ Successfully decoded {} processes from gzipped protobuf", + protobuf_processes.rows.len() + )); + + // Convert protobuf processes to ProcessInfo structs + let processes: Vec = protobuf_processes + .rows + .into_iter() + .map(|p| ProcessInfo { + pid: p.pid, + name: p.name, + cpu_usage: p.cpu_usage, + mem_bytes: p.mem_bytes, + }) + .collect(); + + let processes_payload = ProcessesPayload { + top_processes: processes, + process_count: protobuf_processes.process_count as usize, + }; + return Ok(AgentResponse::Processes(processes_payload)); + } + Err(e) => { + log_debug(&format!( + "❌ Failed to decode decompressed protobuf: {e}" + )); + } + } + } + Err(e) => { + log_debug(&format!( + "❌ Failed to decompress gzipped process data: {e}" + )); + } + } + } + } + + // Check if this is binary data (protobuf from agent) + if response.starts_with("BINARY_DATA:") { + // Extract the binary data size and decode protobuf + let byte_count_str = response.strip_prefix("BINARY_DATA:").unwrap_or("0"); + let _byte_count: usize = byte_count_str.parse().unwrap_or(0); + + // Check if we have the actual binary data + if let Some(binary_bytes) = binary_data { + log_debug(&format!( + "🔧 Decoding {} bytes of protobuf process data", + binary_bytes.len() + )); + + // Try to decode the protobuf data using the prost Message trait + match ::decode(&binary_bytes[..]) { + Ok(protobuf_processes) => { + log_debug(&format!( + "✅ Successfully decoded {} processes from protobuf", + protobuf_processes.rows.len() + )); + + // Convert protobuf processes to ProcessInfo structs + let processes: Vec = protobuf_processes + .rows + .into_iter() + .map(|p| ProcessInfo { + pid: p.pid, + name: p.name, + cpu_usage: p.cpu_usage, + mem_bytes: p.mem_bytes, + }) + .collect(); + + let processes_payload = ProcessesPayload { + top_processes: processes, + process_count: protobuf_processes.process_count as usize, + }; + Ok(AgentResponse::Processes(processes_payload)) + } + Err(e) => { + log_debug(&format!("❌ Failed to decode protobuf: {e}")); + // Fallback to empty processes + let processes = ProcessesPayload { + top_processes: vec![], + process_count: 0, + }; + Ok(AgentResponse::Processes(processes)) + } + } + } else { + log_debug( + "❌ Binary data indicator received but no actual binary data preserved", + ); + let processes = ProcessesPayload { + top_processes: vec![], + process_count: 0, + }; + Ok(AgentResponse::Processes(processes)) + } + } else { + // Try to parse as JSON (fallback) + let processes: ProcessesPayload = serde_json::from_str(&response).map_err(|e| { + ConnectorError::serialization_error(format!("Failed to parse processes: {e}")) + })?; + Ok(AgentResponse::Processes(processes)) + } + } + } +} + +async fn wait_for_response_with_binary(websocket: &WebSocket) -> Result<(String, Option>)> { + let start_time = js_sys::Date::now(); + let timeout_ms = 10000.0; // 10 second timeout + + // Store the response in a shared location + let response_cell = Rc::new(RefCell::new(None::)); + let binary_data_cell = Rc::new(RefCell::new(None::>)); + let error_cell = Rc::new(RefCell::new(None::)); + + // Use a unique request ID to avoid message collision + let _request_id = js_sys::Math::random(); + let response_received = Rc::new(RefCell::new(false)); + + // Set up the message handler that only processes if we haven't gotten a response yet + { + let response_cell = response_cell.clone(); + let binary_data_cell = binary_data_cell.clone(); + let response_received = response_received.clone(); + let onmessage_callback = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| { + // Only process if we haven't already received a response for this request + if !*response_received.borrow() { + // Handle text messages (JSON responses for metrics/disks) + if let Ok(data) = e.data().dyn_into::() { + let message = data.as_string().unwrap_or_default(); + if !message.is_empty() { + // Debug: Log what we received (truncated) + let preview = if message.len() > 100 { + format!("{}...", &message[..100]) + } else { + message.clone() + }; + log_debug(&format!("🔍 Received text: {preview}")); + + *response_cell.borrow_mut() = Some(message); + *response_received.borrow_mut() = true; + } + } + // Handle binary messages (could be JSON as text bytes or actual protobuf) + else if let Ok(array_buffer) = e.data().dyn_into::() { + let uint8_array = js_sys::Uint8Array::new(&array_buffer); + let length = uint8_array.length() as usize; + let mut bytes = vec![0u8; length]; + uint8_array.copy_to(&mut bytes); + + log_debug(&format!("🔍 Received binary data: {length} bytes")); + + // Debug: Log the first few bytes to see what we're dealing with + let first_bytes = if bytes.len() >= 4 { + format!( + "0x{:02x} 0x{:02x} 0x{:02x} 0x{:02x}", + bytes[0], bytes[1], bytes[2], bytes[3] + ) + } else { + format!("Only {} bytes available", bytes.len()) + }; + log_debug(&format!("🔍 First bytes: {first_bytes}")); + + // Try to decode as UTF-8 text first (in case it's JSON sent as binary) + match String::from_utf8(bytes.clone()) { + Ok(text) => { + // If it decodes to valid UTF-8, check if it looks like JSON + let trimmed = text.trim(); + if (trimmed.starts_with('{') && trimmed.ends_with('}')) + || (trimmed.starts_with('[') && trimmed.ends_with(']')) + { + log_debug(&format!( + "🔍 Binary data is actually JSON text: {}", + if text.len() > 100 { + format!("{}...", &text[..100]) + } else { + text.clone() + } + )); + *response_cell.borrow_mut() = Some(text); + *response_received.borrow_mut() = true; + } else { + log_debug(&format!( + "🔍 Binary data is UTF-8 text but not JSON: {}", + if text.len() > 100 { + format!("{}...", &text[..100]) + } else { + text.clone() + } + )); + *response_cell.borrow_mut() = Some(text); + *response_received.borrow_mut() = true; + } + } + Err(_) => { + // If it's not valid UTF-8, check if it's gzipped data + if is_gzip(&bytes) { + log_debug(&format!( + "🔍 Binary data appears to be gzipped ({length} bytes)" + )); + // Try to decompress using unified gzip decompression + match gunzip_to_string(&bytes) { + Ok(decompressed_text) => { + log_debug(&format!( + "🔍 Gzipped data decompressed to text: {}", + if decompressed_text.len() > 100 { + format!("{}...", &decompressed_text[..100]) + } else { + decompressed_text.clone() + } + )); + *response_cell.borrow_mut() = Some(decompressed_text); + *response_received.borrow_mut() = true; + } + Err(e) => { + log_debug(&format!("🔍 Failed to decompress gzip: {e}")); + // Fallback: treat as actual binary protobuf data + *binary_data_cell.borrow_mut() = Some(bytes.clone()); + *response_cell.borrow_mut() = + Some(format!("BINARY_DATA:{length}")); + *response_received.borrow_mut() = true; + } + } + } else { + // If it's not valid UTF-8 and not gzipped, it's likely actual binary protobuf data + log_debug(&format!( + "🔍 Binary data is actual protobuf ({length} bytes)" + )); + *binary_data_cell.borrow_mut() = Some(bytes); + *response_cell.borrow_mut() = Some(format!("BINARY_DATA:{length}")); + *response_received.borrow_mut() = true; + } + } + } + } else { + // Log what type of data we got + log_debug(&format!("🔍 Received unknown data type: {:?}", e.data())); + } + } + }) as Box); + websocket.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + onmessage_callback.forget(); + } + + // Set up the error handler + { + let error_cell = error_cell.clone(); + let response_received = response_received.clone(); + let onerror_callback = Closure::wrap(Box::new(move |_e: web_sys::ErrorEvent| { + if !*response_received.borrow() { + *error_cell.borrow_mut() = Some("WebSocket error occurred".to_string()); + *response_received.borrow_mut() = true; + } + }) as Box); + websocket.set_onerror(Some(onerror_callback.as_ref().unchecked_ref())); + onerror_callback.forget(); + } + + // Poll for response with proper async delays + loop { + // Check for response + if *response_received.borrow() { + if let Some(response) = response_cell.borrow().as_ref() { + let binary_data = binary_data_cell.borrow().clone(); + return Ok((response.clone(), binary_data)); + } + if let Some(error) = error_cell.borrow().as_ref() { + return Err(ConnectorError::protocol_error(error)); + } + } + + // Check timeout + let now = js_sys::Date::now(); + if now - start_time > timeout_ms { + *response_received.borrow_mut() = true; // Mark as done to prevent future processing + return Err(ConnectorError::protocol_error("WebSocket response timeout")); + } + + // Wait 50ms before checking again + let promise = js_sys::Promise::new(&mut |resolve, _| { + let closure = Closure::once(move || resolve.call0(&JsValue::UNDEFINED)); + web_sys::window() + .unwrap() + .set_timeout_with_callback_and_timeout_and_arguments_0( + closure.as_ref().unchecked_ref(), + 50, + ) + .unwrap(); + closure.forget(); + }); + let _ = wasm_bindgen_futures::JsFuture::from(promise).await; + } +}