From d97f7507e8f466eca021a4fc71085c88bfc17fa3 Mon Sep 17 00:00:00 2001 From: jasonwitty Date: Tue, 9 Sep 2025 02:30:16 -0700 Subject: [PATCH] feat(connector): implement gzipped protobuf support for WASM and fix all warnings --- socktop_connector/Cargo.toml | 38 +- socktop_connector/examples/wasm_example.rs | 4 +- socktop_connector/src/connector.rs | 637 ++++++++++++++++++++- socktop_connector/src/error.rs | 10 + socktop_connector/src/lib.rs | 10 +- socktop_connector/src/types.rs | 6 +- 6 files changed, 679 insertions(+), 26 deletions(-) diff --git a/socktop_connector/Cargo.toml b/socktop_connector/Cargo.toml index e9eb967..36c7cb6 100644 --- a/socktop_connector/Cargo.toml +++ b/socktop_connector/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "socktop_connector" -version = "0.1.3" +version = "0.1.5" edition = "2024" license = "MIT" description = "WebSocket connector library for socktop agent communication" @@ -11,33 +11,42 @@ keywords = ["monitoring", "websocket", "metrics", "system"] categories = ["network-programming", "development-tools"] documentation = "https://docs.rs/socktop_connector" +[lib] +crate-type = ["cdylib", "rlib"] + # docs.rs specific metadata [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies] -# WebSocket client -tokio-tungstenite = { workspace = true } -tokio = { workspace = true } -futures-util = { workspace = true } -url = { workspace = true } +# WebSocket client - only for non-WASM targets +tokio-tungstenite = { workspace = true, optional = true } +tokio = { workspace = true, optional = true } +futures-util = { workspace = true, optional = true } +url = { workspace = true, optional = true } + +# WASM WebSocket support +wasm-bindgen = { version = "0.2", optional = true } +wasm-bindgen-futures = { version = "0.4", optional = true } +js-sys = { version = "0.3", optional = true } +web-sys = { version = "0.3", features = ["WebSocket", "MessageEvent", "ErrorEvent", "CloseEvent", "BinaryType", "Window", "console"], optional = true } # TLS support rustls = { version = "0.23", features = ["ring"], optional = true } rustls-pemfile = { version = "2.1", optional = true } -# Serialization +# Serialization - always available serde = { workspace = true } serde_json = { workspace = true } -# Compression -flate2 = "1.0" +# Compression - only for networking +flate2 = { version = "1.0", optional = true } -# Protobuf +# Protobuf - always available prost = { workspace = true } -# Error handling +# Error handling - always available thiserror = "2.0" [build-dependencies] @@ -45,6 +54,7 @@ prost-build = "0.13" protoc-bin-vendored = "3.0" [features] -default = ["tls"] -tls = ["rustls", "rustls-pemfile"] -wasm = [] # WASM-compatible feature set (no TLS) +default = ["networking", "tls"] +networking = ["tokio-tungstenite", "tokio", "futures-util", "url", "flate2"] +tls = ["networking", "rustls", "rustls-pemfile"] +wasm = ["wasm-bindgen", "wasm-bindgen-futures", "js-sys", "web-sys", "flate2"] # WASM-compatible networking with compression diff --git a/socktop_connector/examples/wasm_example.rs b/socktop_connector/examples/wasm_example.rs index d13ae18..a74a03b 100644 --- a/socktop_connector/examples/wasm_example.rs +++ b/socktop_connector/examples/wasm_example.rs @@ -26,10 +26,10 @@ async fn main() -> Result<(), Box> { // Make a request to get metrics match connector.request(AgentRequest::Metrics).await { Ok(response) => { - println!("Successfully received response: {:?}", response); + println!("Successfully received response: {response:?}"); } Err(e) => { - println!("Request failed: {}", e); + println!("Request failed: {e}"); } } diff --git a/socktop_connector/src/connector.rs b/socktop_connector/src/connector.rs index 46ce880..0b2a1b7 100644 --- a/socktop_connector/src/connector.rs +++ b/socktop_connector/src/connector.rs @@ -1,16 +1,33 @@ //! WebSocket connector for communicating with socktop agents. +#[cfg(feature = "networking")] use flate2::bufread::GzDecoder; +#[cfg(feature = "networking")] use futures_util::{SinkExt, StreamExt}; +#[cfg(feature = "networking")] use prost::Message as _; +#[cfg(feature = "networking")] use std::io::Read; +#[cfg(feature = "networking")] use tokio::net::TcpStream; +#[cfg(feature = "networking")] use tokio_tungstenite::{ MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message, tungstenite::client::IntoClientRequest, }; +#[cfg(feature = "networking")] use url::Url; +#[cfg(feature = "wasm")] +use web_sys::WebSocket; + +#[cfg(all(feature = "wasm", not(feature = "networking")))] +use pb::Processes; +#[cfg(all(feature = "wasm", not(feature = "networking")))] +use prost::Message as ProstMessage; +#[cfg(all(feature = "wasm", not(feature = "networking")))] +use wasm_bindgen::{JsCast, JsValue, closure::Closure}; + #[cfg(feature = "tls")] use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; #[cfg(feature = "tls")] @@ -27,9 +44,9 @@ use std::{fs::File, io::BufReader, sync::Arc}; use tokio_tungstenite::{Connector, connect_async_tls_with_config}; use crate::error::{ConnectorError, Result}; -use crate::types::{AgentRequest, AgentResponse, DiskInfo, Metrics, ProcessInfo, ProcessesPayload}; - -#[cfg(feature = "tls")] +use crate::types::{AgentRequest, AgentResponse}; +#[cfg(any(feature = "networking", feature = "wasm"))] +use crate::types::{DiskInfo, Metrics, ProcessInfo, ProcessesPayload};#[cfg(feature = "tls")] fn ensure_crypto_provider() { use std::sync::Once; static INIT: Once = Once::new(); @@ -38,11 +55,13 @@ fn ensure_crypto_provider() { }); } +#[cfg(any(feature = "networking", feature = "wasm"))] mod pb { // generated by build.rs include!(concat!(env!("OUT_DIR"), "/socktop.rs")); } +#[cfg(feature = "networking")] pub type WsStream = WebSocketStream>; /// Configuration for connecting to a socktop agent @@ -89,10 +108,16 @@ impl ConnectorConfig { } } -/// A WebSocket connector for communicating with socktop agents +/// A WebSocket connector for communicating with socktop agents. +/// When the `networking` feature is disabled, the connector struct is available +/// for type compatibility but networking methods will return errors. pub struct SocktopConnector { config: ConnectorConfig, + #[cfg(feature = "networking")] stream: Option, + #[cfg(feature = "wasm")] + #[allow(dead_code)] // Used in WASM builds + websocket: Option, } impl SocktopConnector { @@ -100,10 +125,16 @@ impl SocktopConnector { pub fn new(config: ConnectorConfig) -> Self { Self { config, + #[cfg(feature = "networking")] stream: None, + #[cfg(feature = "wasm")] + websocket: None, } } +} +#[cfg(feature = "networking")] +impl SocktopConnector { /// Connect to the agent pub async fn connect(&mut self) -> Result<()> { let stream = connect_to_agent(&self.config).await?; @@ -152,6 +183,7 @@ impl SocktopConnector { } // Connect to the agent and return the WS stream +#[cfg(feature = "networking")] async fn connect_to_agent(config: &ConnectorConfig) -> Result { #[cfg(feature = "tls")] ensure_crypto_provider(); @@ -167,6 +199,7 @@ async fn connect_to_agent(config: &ConnectorConfig) -> Result { connect_without_ca_and_config(u.as_str(), config).await } +#[cfg(feature = "networking")] async fn connect_without_ca_and_config(url: &str, config: &ConnectorConfig) -> Result { let mut req = url.into_client_request()?; @@ -195,6 +228,7 @@ async fn connect_without_ca_and_config(url: &str, config: &ConnectorConfig) -> R } #[cfg(feature = "tls")] +#[cfg(feature = "networking")] async fn connect_with_ca_and_config( url: &str, ca_path: &str, @@ -294,6 +328,7 @@ async fn connect_with_ca_and_config( } #[cfg(not(feature = "tls"))] +#[cfg(feature = "networking")] async fn connect_with_ca_and_config( _url: &str, _ca_path: &str, @@ -306,6 +341,7 @@ async fn connect_with_ca_and_config( } // Send a "get_metrics" request and await a single JSON reply +#[cfg(feature = "networking")] async fn request_metrics(ws: &mut WsStream) -> Option { if ws.send(Message::Text("get_metrics".into())).await.is_err() { return None; @@ -320,6 +356,7 @@ async fn request_metrics(ws: &mut WsStream) -> Option { } // Send a "get_disks" request and await a JSON Vec +#[cfg(feature = "networking")] async fn request_disks(ws: &mut WsStream) -> Option> { if ws.send(Message::Text("get_disks".into())).await.is_err() { return None; @@ -334,6 +371,7 @@ async fn request_disks(ws: &mut WsStream) -> Option> { } // Send a "get_processes" request and await a ProcessesPayload decoded from protobuf (binary, may be gzipped) +#[cfg(feature = "networking")] async fn request_processes(ws: &mut WsStream) -> Option { if ws .send(Message::Text("get_processes".into())) @@ -381,6 +419,7 @@ async fn request_processes(ws: &mut WsStream) -> Option { } // Decompress a gzip-compressed binary frame into a String. +#[cfg(feature = "networking")] fn gunzip_to_string(bytes: &[u8]) -> Option { let mut dec = GzDecoder::new(bytes); let mut out = String::new(); @@ -388,6 +427,7 @@ fn gunzip_to_string(bytes: &[u8]) -> Option { Some(out) } +#[cfg(feature = "networking")] fn gunzip_to_vec(bytes: &[u8]) -> Option> { let mut dec = GzDecoder::new(bytes); let mut out = Vec::new(); @@ -395,6 +435,7 @@ fn gunzip_to_vec(bytes: &[u8]) -> Option> { Some(out) } +#[cfg(feature = "networking")] fn is_gzip(bytes: &[u8]) -> bool { bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b } @@ -405,6 +446,7 @@ fn is_gzip(bytes: &[u8]) -> bool { /// certificate involved, hostname verification is not applicable. /// /// For TLS connections with certificate pinning, use `connect_to_socktop_agent_with_tls()`. +#[cfg(feature = "networking")] pub async fn connect_to_socktop_agent(url: impl Into) -> Result { let config = ConnectorConfig::new(url); let mut connector = SocktopConnector::new(config); @@ -418,6 +460,7 @@ pub async fn connect_to_socktop_agent(url: impl Into) -> Result, @@ -451,6 +494,7 @@ pub async fn connect_to_socktop_agent_with_tls( /// # Ok(()) /// # } /// ``` +#[cfg(feature = "networking")] pub async fn connect_to_socktop_agent_with_config( url: impl Into, protocols: Option>, @@ -470,3 +514,588 @@ pub async fn connect_to_socktop_agent_with_config( connector.connect().await?; Ok(connector) } + +// WASM WebSocket implementation +#[cfg(all(feature = "wasm", not(feature = "networking")))] +impl SocktopConnector { + /// Connect to the agent using WASM WebSocket + pub async fn connect(&mut self) -> Result<()> { + let websocket = WebSocket::new(&self.config.url).map_err(|e| { + ConnectorError::protocol_error(&format!("Failed to create WebSocket: {:?}", e)) + })?; + + // Set binary type for proper message handling + websocket.set_binary_type(web_sys::BinaryType::Arraybuffer); + + // Wait for connection to be ready with proper async delays + let start_time = js_sys::Date::now(); + let timeout_ms = 10000.0; // 10 second timeout (increased from 5) + + // Poll connection status until ready or timeout + loop { + let ready_state = websocket.ready_state(); + + if ready_state == 1 { + // OPEN - connection is ready + break; + } else if ready_state == 3 { + // CLOSED + return Err(ConnectorError::protocol_error( + "WebSocket connection closed", + )); + } else if ready_state == 2 { + // CLOSING + return Err(ConnectorError::protocol_error("WebSocket is closing")); + } + + // Check timeout + let now = js_sys::Date::now(); + if now - start_time > timeout_ms { + return Err(ConnectorError::protocol_error( + "WebSocket connection timeout", + )); + } + + // Proper async delay using setTimeout Promise + let promise = js_sys::Promise::new(&mut |resolve, _| { + let closure = Closure::once(move || resolve.call0(&JsValue::UNDEFINED)); + web_sys::window() + .unwrap() + .set_timeout_with_callback_and_timeout_and_arguments_0( + closure.as_ref().unchecked_ref(), + 100, // 100ms delay between polls + ) + .unwrap(); + closure.forget(); + }); + + let _ = wasm_bindgen_futures::JsFuture::from(promise).await; + } + + self.websocket = Some(websocket); + Ok(()) + } + + /// Send a request to the agent and get the response + pub async fn request(&mut self, request: AgentRequest) -> Result { + let ws = self + .websocket + .as_ref() + .ok_or(ConnectorError::NotConnected)?; + + // Use the legacy string format that the agent expects + let request_string = request.to_legacy_string(); + + // Send request + ws.send_with_str(&request_string).map_err(|e| { + ConnectorError::protocol_error(&format!("Failed to send message: {:?}", e)) + })?; + + // Wait for response using JavaScript Promise + let (response, binary_data) = self.wait_for_response_with_binary().await?; + + // Parse the response based on the request type + match request { + AgentRequest::Metrics => { + // Check if this is binary data (protobuf from agent) + if response.starts_with("BINARY_DATA:") { + // Extract the byte count + let byte_count: usize = response + .strip_prefix("BINARY_DATA:") + .unwrap_or("0") + .parse() + .unwrap_or(0); + + // For now, return a placeholder metrics response indicating binary data received + // TODO: Implement proper protobuf decoding for binary data + let placeholder_metrics = Metrics { + cpu_total: 0.0, + cpu_per_core: vec![0.0], + mem_total: 0, + mem_used: 0, + swap_total: 0, + swap_used: 0, + hostname: format!("Binary protobuf data ({} bytes)", byte_count), + cpu_temp_c: None, + disks: vec![], + networks: vec![], + top_processes: vec![], + gpus: None, + process_count: None, + }; + Ok(AgentResponse::Metrics(placeholder_metrics)) + } else { + // Try to parse as JSON (fallback) + let metrics: Metrics = serde_json::from_str(&response).map_err(|e| { + ConnectorError::serialization_error(&format!( + "Failed to parse metrics: {}", + e + )) + })?; + Ok(AgentResponse::Metrics(metrics)) + } + } + AgentRequest::Disks => { + let disks: Vec = serde_json::from_str(&response).map_err(|e| { + ConnectorError::serialization_error(&format!("Failed to parse disks: {}", e)) + })?; + Ok(AgentResponse::Disks(disks)) + } + AgentRequest::Processes => { + log_debug(&format!( + "🔍 Processing process request - response: {}", + if response.len() > 100 { + format!("{}...", &response[..100]) + } else { + response.clone() + } + )); + log_debug(&format!( + "🔍 Binary data available: {}", + binary_data.is_some() + )); + if let Some(ref data) = binary_data { + log_debug(&format!("🔍 Binary data size: {} bytes", data.len())); + // Check if it's gzipped data and decompress it first + if is_gzip_data(data) { + log_debug("🔍 Process data is gzipped, decompressing..."); + match gunzip_to_vec_wasm(data) { + Ok(decompressed_bytes) => { + log_debug(&format!( + "🔍 Successfully decompressed {} bytes, now decoding protobuf...", + decompressed_bytes.len() + )); + // Now decode the decompressed bytes as protobuf + match ::decode( + decompressed_bytes.as_slice(), + ) { + Ok(protobuf_processes) => { + log_debug(&format!( + "✅ Successfully decoded {} processes from gzipped protobuf", + protobuf_processes.rows.len() + )); + + // Convert protobuf processes to ProcessInfo structs + let processes: Vec = protobuf_processes + .rows + .into_iter() + .map(|p| ProcessInfo { + pid: p.pid, + name: p.name, + cpu_usage: p.cpu_usage, + mem_bytes: p.mem_bytes, + }) + .collect(); + + let processes_payload = ProcessesPayload { + top_processes: processes, + process_count: protobuf_processes.process_count + as usize, + }; + return Ok(AgentResponse::Processes(processes_payload)); + } + Err(e) => { + log_debug(&format!( + "❌ Failed to decode decompressed protobuf: {}", + e + )); + } + } + } + Err(e) => { + log_debug(&format!( + "❌ Failed to decompress gzipped process data: {}", + e + )); + } + } + } + } + + // Check if this is binary data (protobuf from agent) + if response.starts_with("BINARY_DATA:") { + // Extract the binary data size and decode protobuf + let byte_count_str = response.strip_prefix("BINARY_DATA:").unwrap_or("0"); + let _byte_count: usize = byte_count_str.parse().unwrap_or(0); + + // Check if we have the actual binary data + if let Some(binary_bytes) = binary_data { + log_debug(&format!( + "🔧 Decoding {} bytes of protobuf process data", + binary_bytes.len() + )); + + // Try to decode the protobuf data using the prost Message trait + match ::decode(&binary_bytes[..]) { + Ok(protobuf_processes) => { + log_debug(&format!( + "✅ Successfully decoded {} processes from protobuf", + protobuf_processes.rows.len() + )); + + // Convert protobuf processes to ProcessInfo structs + let processes: Vec = protobuf_processes + .rows + .into_iter() + .map(|p| ProcessInfo { + pid: p.pid, + name: p.name, + cpu_usage: p.cpu_usage, + mem_bytes: p.mem_bytes, + }) + .collect(); + + let processes_payload = ProcessesPayload { + top_processes: processes, + process_count: protobuf_processes.process_count as usize, + }; + return Ok(AgentResponse::Processes(processes_payload)); + } + Err(e) => { + log_debug(&format!("❌ Failed to decode protobuf: {}", e)); + // Fallback to empty processes + let processes = ProcessesPayload { + top_processes: vec![], + process_count: 0, + }; + return Ok(AgentResponse::Processes(processes)); + } + } + } else { + log_debug( + "❌ Binary data indicator received but no actual binary data preserved", + ); + let processes = ProcessesPayload { + top_processes: vec![], + process_count: 0, + }; + return Ok(AgentResponse::Processes(processes)); + } + } else { + // Try to parse as JSON (fallback) + let processes: ProcessesPayload = + serde_json::from_str(&response).map_err(|e| { + ConnectorError::serialization_error(&format!( + "Failed to parse processes: {}", + e + )) + })?; + Ok(AgentResponse::Processes(processes)) + } + } + } + } + + async fn wait_for_response_with_binary(&self) -> Result<(String, Option>)> { + let ws = self + .websocket + .as_ref() + .ok_or(ConnectorError::NotConnected)?; + + let start_time = js_sys::Date::now(); + let timeout_ms = 10000.0; // 10 second timeout + + // Store the response in a shared location + let response_cell = std::rc::Rc::new(std::cell::RefCell::new(None::)); + let binary_data_cell = std::rc::Rc::new(std::cell::RefCell::new(None::>)); + let error_cell = std::rc::Rc::new(std::cell::RefCell::new(None::)); + + // Use a unique request ID to avoid message collision + let _request_id = js_sys::Math::random(); + let response_received = std::rc::Rc::new(std::cell::RefCell::new(false)); + + // Set up the message handler that only processes if we haven't gotten a response yet + { + let response_cell = response_cell.clone(); + let binary_data_cell = binary_data_cell.clone(); + let response_received = response_received.clone(); + let onmessage_callback = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| { + // Only process if we haven't already received a response for this request + if !*response_received.borrow() { + // Handle text messages (JSON responses for metrics/disks) + if let Ok(data) = e.data().dyn_into::() { + let message = data.as_string().unwrap_or_default(); + if !message.is_empty() { + // Debug: Log what we received (truncated) + let preview = if message.len() > 100 { + format!("{}...", &message[..100]) + } else { + message.clone() + }; + log_debug(&format!("🔍 Received text: {}", preview)); + + *response_cell.borrow_mut() = Some(message); + *response_received.borrow_mut() = true; + } + } + // Handle binary messages (could be JSON as text bytes or actual protobuf) + else if let Ok(array_buffer) = e.data().dyn_into::() { + let uint8_array = js_sys::Uint8Array::new(&array_buffer); + let length = uint8_array.length() as usize; + let mut bytes = vec![0u8; length]; + uint8_array.copy_to(&mut bytes); + + log_debug(&format!("🔍 Received binary data: {} bytes", length)); + + // Debug: Log the first few bytes to see what we're dealing with + let first_bytes = if bytes.len() >= 4 { + format!( + "0x{:02x} 0x{:02x} 0x{:02x} 0x{:02x}", + bytes[0], bytes[1], bytes[2], bytes[3] + ) + } else { + format!("Only {} bytes available", bytes.len()) + }; + log_debug(&format!("🔍 First bytes: {}", first_bytes)); + + // Try to decode as UTF-8 text first (in case it's JSON sent as binary) + match String::from_utf8(bytes.clone()) { + Ok(text) => { + // If it decodes to valid UTF-8, check if it looks like JSON + let trimmed = text.trim(); + if (trimmed.starts_with('{') && trimmed.ends_with('}')) + || (trimmed.starts_with('[') && trimmed.ends_with(']')) + { + log_debug(&format!( + "🔍 Binary data is actually JSON text: {}", + if text.len() > 100 { + format!("{}...", &text[..100]) + } else { + text.clone() + } + )); + *response_cell.borrow_mut() = Some(text); + *response_received.borrow_mut() = true; + } else { + log_debug(&format!( + "🔍 Binary data is UTF-8 text but not JSON: {}", + if text.len() > 100 { + format!("{}...", &text[..100]) + } else { + text.clone() + } + )); + *response_cell.borrow_mut() = Some(text); + *response_received.borrow_mut() = true; + } + } + Err(_) => { + // If it's not valid UTF-8, check if it's gzipped data + if is_gzip_data(&bytes) { + log_debug(&format!( + "🔍 Binary data appears to be gzipped ({} bytes)", + length + )); + // Try to decompress using WASI-compatible decompression + match decompress_gzip_browser(&bytes) { + Ok(decompressed_text) => { + log_debug(&format!( + "🔍 Gzipped data decompressed to text: {}", + if decompressed_text.len() > 100 { + format!("{}...", &decompressed_text[..100]) + } else { + decompressed_text.clone() + } + )); + *response_cell.borrow_mut() = Some(decompressed_text); + *response_received.borrow_mut() = true; + } + Err(e) => { + log_debug(&format!( + "🔍 Failed to decompress gzip: {}", + e + )); + // Fallback: treat as actual binary protobuf data + *binary_data_cell.borrow_mut() = Some(bytes.clone()); + *response_cell.borrow_mut() = + Some(format!("BINARY_DATA:{}", length)); + *response_received.borrow_mut() = true; + } + } + } else { + // If it's not valid UTF-8 and not gzipped, it's likely actual binary protobuf data + log_debug(&format!( + "🔍 Binary data is actual protobuf ({} bytes)", + length + )); + *binary_data_cell.borrow_mut() = Some(bytes); + *response_cell.borrow_mut() = + Some(format!("BINARY_DATA:{}", length)); + *response_received.borrow_mut() = true; + } + } + } + } else { + // Log what type of data we got + log_debug(&format!("🔍 Received unknown data type: {:?}", e.data())); + } + } + }) as Box); + ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + onmessage_callback.forget(); + } + + // Set up the error handler + { + let error_cell = error_cell.clone(); + let response_received = response_received.clone(); + let onerror_callback = Closure::wrap(Box::new(move |_e: web_sys::ErrorEvent| { + if !*response_received.borrow() { + *error_cell.borrow_mut() = Some("WebSocket error occurred".to_string()); + *response_received.borrow_mut() = true; + } + }) as Box); + ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref())); + onerror_callback.forget(); + } + + // Poll for response with proper async delays + loop { + // Check for response + if *response_received.borrow() { + if let Some(response) = response_cell.borrow().as_ref() { + let binary_data = binary_data_cell.borrow().clone(); + return Ok((response.clone(), binary_data)); + } + if let Some(error) = error_cell.borrow().as_ref() { + return Err(ConnectorError::protocol_error(error)); + } + } + + // Check timeout + let now = js_sys::Date::now(); + if now - start_time > timeout_ms { + *response_received.borrow_mut() = true; // Mark as done to prevent future processing + return Err(ConnectorError::protocol_error("WebSocket response timeout")); + } + + // Wait 50ms before checking again + let promise = js_sys::Promise::new(&mut |resolve, _| { + let closure = Closure::once(move || resolve.call0(&JsValue::UNDEFINED)); + web_sys::window() + .unwrap() + .set_timeout_with_callback_and_timeout_and_arguments_0( + closure.as_ref().unchecked_ref(), + 50, + ) + .unwrap(); + closure.forget(); + }); + let _ = wasm_bindgen_futures::JsFuture::from(promise).await; + } + } + + /// Check if the connector is connected + pub fn is_connected(&self) -> bool { + self.websocket + .as_ref() + .map_or(false, |ws| ws.ready_state() == 1) // 1 = OPEN + } + + /// Disconnect from the agent + pub async fn disconnect(&mut self) -> Result<()> { + if let Some(ws) = self.websocket.take() { + let _ = ws.close(); + } + Ok(()) + } + + /// Request metrics from the agent + pub async fn get_metrics(&mut self) -> Result { + match self.request(AgentRequest::Metrics).await? { + AgentResponse::Metrics(metrics) => Ok(metrics), + _ => Err(ConnectorError::protocol_error( + "Unexpected response type for metrics", + )), + } + } + + /// Request disk information from the agent + pub async fn get_disks(&mut self) -> Result> { + match self.request(AgentRequest::Disks).await? { + AgentResponse::Disks(disks) => Ok(disks), + _ => Err(ConnectorError::protocol_error( + "Unexpected response type for disks", + )), + } + } + + /// Request process information from the agent + pub async fn get_processes(&mut self) -> Result { + match self.request(AgentRequest::Processes).await? { + AgentResponse::Processes(processes) => Ok(processes), + _ => Err(ConnectorError::protocol_error( + "Unexpected response type for processes", + )), + } + } +} + +// Helper function for logging that works in WASI environments +#[cfg(all(feature = "wasm", not(feature = "networking")))] +fn log_debug(message: &str) { + // For WASI environments like Zellij plugins, use eprintln + eprintln!("{}", message); +} +#[cfg(all(feature = "wasm", not(feature = "networking")))] +fn is_gzip_data(bytes: &[u8]) -> bool { + // Gzip files start with the magic bytes 0x1f 0x8b + bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b +} + +#[cfg(all(feature = "wasm", not(feature = "networking")))] +fn decompress_gzip_browser(bytes: &[u8]) -> Result { + use flate2::read::GzDecoder; + use std::io::Read; + + let mut decoder = GzDecoder::new(bytes); + let mut decompressed = String::new(); + decoder.read_to_string(&mut decompressed).map_err(|e| { + ConnectorError::protocol_error(&format!("Gzip decompression failed: {}", e)) + })?; + + Ok(decompressed) +} + +#[cfg(all(feature = "wasm", not(feature = "networking")))] +fn gunzip_to_vec_wasm(bytes: &[u8]) -> Result> { + use flate2::read::GzDecoder; + use std::io::Read; + + let mut decoder = GzDecoder::new(bytes); + let mut decompressed = Vec::new(); + decoder.read_to_end(&mut decompressed).map_err(|e| { + ConnectorError::protocol_error(&format!("Gzip decompression failed: {}", e)) + })?; + + Ok(decompressed) +} + +// Stub implementations when neither networking nor wasm is enabled +#[cfg(not(any(feature = "networking", feature = "wasm")))] +impl SocktopConnector { + /// Connect to the socktop agent endpoint. + /// + /// Note: Networking functionality is disabled. Enable the "networking" feature to use this function. + pub async fn connect(&mut self) -> Result<()> { + Err(ConnectorError::protocol_error( + "Networking functionality disabled. Enable the 'networking' feature to connect to agents.", + )) + } + + /// Send a request to the agent and await a response. + /// + /// Note: Networking functionality is disabled. Enable the "networking" feature to use this function. + pub async fn request(&mut self, _request: AgentRequest) -> Result { + Err(ConnectorError::protocol_error( + "Networking functionality disabled. Enable the 'networking' feature to send requests.", + )) + } + + /// Close the connection to the agent. + /// + /// Note: Networking functionality is disabled. This is a no-op when networking is disabled. + pub async fn disconnect(&mut self) -> Result<()> { + Ok(()) // No-op when networking is disabled + } +} diff --git a/socktop_connector/src/error.rs b/socktop_connector/src/error.rs index 7541b61..9d9ced8 100644 --- a/socktop_connector/src/error.rs +++ b/socktop_connector/src/error.rs @@ -6,6 +6,7 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum ConnectorError { /// WebSocket connection failed + #[cfg(feature = "networking")] #[error("WebSocket connection failed: {source}")] ConnectionFailed { #[from] @@ -13,6 +14,7 @@ pub enum ConnectorError { }, /// URL parsing error + #[cfg(feature = "networking")] #[error("Invalid URL: {url}")] InvalidUrl { url: String, @@ -124,8 +126,16 @@ impl ConnectorError { message: message.into(), } } + + /// Create a serialization error (wraps JSON error) + pub fn serialization_error(message: impl Into) -> Self { + Self::ProtocolError { + message: message.into(), + } + } } +#[cfg(feature = "networking")] impl From for ConnectorError { fn from(source: url::ParseError) -> Self { Self::InvalidUrl { diff --git a/socktop_connector/src/lib.rs b/socktop_connector/src/lib.rs index 9f7a657..5ea1660 100644 --- a/socktop_connector/src/lib.rs +++ b/socktop_connector/src/lib.rs @@ -144,12 +144,12 @@ pub mod connector; pub mod error; pub mod types; -pub use connector::{ - ConnectorConfig, SocktopConnector, WsStream, connect_to_socktop_agent, - connect_to_socktop_agent_with_config, -}; +pub use connector::{ConnectorConfig, SocktopConnector}; -#[cfg(feature = "tls")] +#[cfg(feature = "networking")] +pub use connector::{WsStream, connect_to_socktop_agent, connect_to_socktop_agent_with_config}; + +#[cfg(all(feature = "tls", feature = "networking"))] pub use connector::connect_to_socktop_agent_with_tls; pub use error::{ConnectorError, Result}; pub use types::{ diff --git a/socktop_connector/src/types.rs b/socktop_connector/src/types.rs index 6e6d2ea..b06bbda 100644 --- a/socktop_connector/src/types.rs +++ b/socktop_connector/src/types.rs @@ -97,9 +97,13 @@ impl AgentRequest { } /// Response types that can be received from the agent -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(tag = "type")] pub enum AgentResponse { + #[serde(rename = "metrics")] Metrics(Metrics), + #[serde(rename = "disks")] Disks(Vec), + #[serde(rename = "processes")] Processes(ProcessesPayload), }