feat(connector): implement gzipped protobuf support for WASM and fix all warnings
This commit is contained in:
parent
e4186a7ec0
commit
d97f7507e8
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "socktop_connector"
|
||||
version = "0.1.3"
|
||||
version = "0.1.5"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
description = "WebSocket connector library for socktop agent communication"
|
||||
@ -11,33 +11,42 @@ keywords = ["monitoring", "websocket", "metrics", "system"]
|
||||
categories = ["network-programming", "development-tools"]
|
||||
documentation = "https://docs.rs/socktop_connector"
|
||||
|
||||
[lib]
|
||||
crate-type = ["cdylib", "rlib"]
|
||||
|
||||
# docs.rs specific metadata
|
||||
[package.metadata.docs.rs]
|
||||
all-features = true
|
||||
rustdoc-args = ["--cfg", "docsrs"]
|
||||
|
||||
[dependencies]
|
||||
# WebSocket client
|
||||
tokio-tungstenite = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
futures-util = { workspace = true }
|
||||
url = { workspace = true }
|
||||
# WebSocket client - only for non-WASM targets
|
||||
tokio-tungstenite = { workspace = true, optional = true }
|
||||
tokio = { workspace = true, optional = true }
|
||||
futures-util = { workspace = true, optional = true }
|
||||
url = { workspace = true, optional = true }
|
||||
|
||||
# WASM WebSocket support
|
||||
wasm-bindgen = { version = "0.2", optional = true }
|
||||
wasm-bindgen-futures = { version = "0.4", optional = true }
|
||||
js-sys = { version = "0.3", optional = true }
|
||||
web-sys = { version = "0.3", features = ["WebSocket", "MessageEvent", "ErrorEvent", "CloseEvent", "BinaryType", "Window", "console"], optional = true }
|
||||
|
||||
# TLS support
|
||||
rustls = { version = "0.23", features = ["ring"], optional = true }
|
||||
rustls-pemfile = { version = "2.1", optional = true }
|
||||
|
||||
# Serialization
|
||||
# Serialization - always available
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
# Compression
|
||||
flate2 = "1.0"
|
||||
# Compression - only for networking
|
||||
flate2 = { version = "1.0", optional = true }
|
||||
|
||||
# Protobuf
|
||||
# Protobuf - always available
|
||||
prost = { workspace = true }
|
||||
|
||||
# Error handling
|
||||
# Error handling - always available
|
||||
thiserror = "2.0"
|
||||
|
||||
[build-dependencies]
|
||||
@ -45,6 +54,7 @@ prost-build = "0.13"
|
||||
protoc-bin-vendored = "3.0"
|
||||
|
||||
[features]
|
||||
default = ["tls"]
|
||||
tls = ["rustls", "rustls-pemfile"]
|
||||
wasm = [] # WASM-compatible feature set (no TLS)
|
||||
default = ["networking", "tls"]
|
||||
networking = ["tokio-tungstenite", "tokio", "futures-util", "url", "flate2"]
|
||||
tls = ["networking", "rustls", "rustls-pemfile"]
|
||||
wasm = ["wasm-bindgen", "wasm-bindgen-futures", "js-sys", "web-sys", "flate2"] # WASM-compatible networking with compression
|
||||
|
||||
@ -26,10 +26,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Make a request to get metrics
|
||||
match connector.request(AgentRequest::Metrics).await {
|
||||
Ok(response) => {
|
||||
println!("Successfully received response: {:?}", response);
|
||||
println!("Successfully received response: {response:?}");
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Request failed: {}", e);
|
||||
println!("Request failed: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,16 +1,33 @@
|
||||
//! WebSocket connector for communicating with socktop agents.
|
||||
|
||||
#[cfg(feature = "networking")]
|
||||
use flate2::bufread::GzDecoder;
|
||||
#[cfg(feature = "networking")]
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
#[cfg(feature = "networking")]
|
||||
use prost::Message as _;
|
||||
#[cfg(feature = "networking")]
|
||||
use std::io::Read;
|
||||
#[cfg(feature = "networking")]
|
||||
use tokio::net::TcpStream;
|
||||
#[cfg(feature = "networking")]
|
||||
use tokio_tungstenite::{
|
||||
MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message,
|
||||
tungstenite::client::IntoClientRequest,
|
||||
};
|
||||
#[cfg(feature = "networking")]
|
||||
use url::Url;
|
||||
|
||||
#[cfg(feature = "wasm")]
|
||||
use web_sys::WebSocket;
|
||||
|
||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||
use pb::Processes;
|
||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||
use prost::Message as ProstMessage;
|
||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||
use wasm_bindgen::{JsCast, JsValue, closure::Closure};
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
|
||||
#[cfg(feature = "tls")]
|
||||
@ -27,9 +44,9 @@ use std::{fs::File, io::BufReader, sync::Arc};
|
||||
use tokio_tungstenite::{Connector, connect_async_tls_with_config};
|
||||
|
||||
use crate::error::{ConnectorError, Result};
|
||||
use crate::types::{AgentRequest, AgentResponse, DiskInfo, Metrics, ProcessInfo, ProcessesPayload};
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
use crate::types::{AgentRequest, AgentResponse};
|
||||
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||
use crate::types::{DiskInfo, Metrics, ProcessInfo, ProcessesPayload};#[cfg(feature = "tls")]
|
||||
fn ensure_crypto_provider() {
|
||||
use std::sync::Once;
|
||||
static INIT: Once = Once::new();
|
||||
@ -38,11 +55,13 @@ fn ensure_crypto_provider() {
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||
mod pb {
|
||||
// generated by build.rs
|
||||
include!(concat!(env!("OUT_DIR"), "/socktop.rs"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "networking")]
|
||||
pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
||||
|
||||
/// Configuration for connecting to a socktop agent
|
||||
@ -89,10 +108,16 @@ impl ConnectorConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// A WebSocket connector for communicating with socktop agents
|
||||
/// A WebSocket connector for communicating with socktop agents.
|
||||
/// When the `networking` feature is disabled, the connector struct is available
|
||||
/// for type compatibility but networking methods will return errors.
|
||||
pub struct SocktopConnector {
|
||||
config: ConnectorConfig,
|
||||
#[cfg(feature = "networking")]
|
||||
stream: Option<WsStream>,
|
||||
#[cfg(feature = "wasm")]
|
||||
#[allow(dead_code)] // Used in WASM builds
|
||||
websocket: Option<WebSocket>,
|
||||
}
|
||||
|
||||
impl SocktopConnector {
|
||||
@ -100,10 +125,16 @@ impl SocktopConnector {
|
||||
pub fn new(config: ConnectorConfig) -> Self {
|
||||
Self {
|
||||
config,
|
||||
#[cfg(feature = "networking")]
|
||||
stream: None,
|
||||
#[cfg(feature = "wasm")]
|
||||
websocket: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "networking")]
|
||||
impl SocktopConnector {
|
||||
/// Connect to the agent
|
||||
pub async fn connect(&mut self) -> Result<()> {
|
||||
let stream = connect_to_agent(&self.config).await?;
|
||||
@ -152,6 +183,7 @@ impl SocktopConnector {
|
||||
}
|
||||
|
||||
// Connect to the agent and return the WS stream
|
||||
#[cfg(feature = "networking")]
|
||||
async fn connect_to_agent(config: &ConnectorConfig) -> Result<WsStream> {
|
||||
#[cfg(feature = "tls")]
|
||||
ensure_crypto_provider();
|
||||
@ -167,6 +199,7 @@ async fn connect_to_agent(config: &ConnectorConfig) -> Result<WsStream> {
|
||||
connect_without_ca_and_config(u.as_str(), config).await
|
||||
}
|
||||
|
||||
#[cfg(feature = "networking")]
|
||||
async fn connect_without_ca_and_config(url: &str, config: &ConnectorConfig) -> Result<WsStream> {
|
||||
let mut req = url.into_client_request()?;
|
||||
|
||||
@ -195,6 +228,7 @@ async fn connect_without_ca_and_config(url: &str, config: &ConnectorConfig) -> R
|
||||
}
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
#[cfg(feature = "networking")]
|
||||
async fn connect_with_ca_and_config(
|
||||
url: &str,
|
||||
ca_path: &str,
|
||||
@ -294,6 +328,7 @@ async fn connect_with_ca_and_config(
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tls"))]
|
||||
#[cfg(feature = "networking")]
|
||||
async fn connect_with_ca_and_config(
|
||||
_url: &str,
|
||||
_ca_path: &str,
|
||||
@ -306,6 +341,7 @@ async fn connect_with_ca_and_config(
|
||||
}
|
||||
|
||||
// Send a "get_metrics" request and await a single JSON reply
|
||||
#[cfg(feature = "networking")]
|
||||
async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
|
||||
if ws.send(Message::Text("get_metrics".into())).await.is_err() {
|
||||
return None;
|
||||
@ -320,6 +356,7 @@ async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
|
||||
}
|
||||
|
||||
// Send a "get_disks" request and await a JSON Vec<DiskInfo>
|
||||
#[cfg(feature = "networking")]
|
||||
async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
|
||||
if ws.send(Message::Text("get_disks".into())).await.is_err() {
|
||||
return None;
|
||||
@ -334,6 +371,7 @@ async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
|
||||
}
|
||||
|
||||
// Send a "get_processes" request and await a ProcessesPayload decoded from protobuf (binary, may be gzipped)
|
||||
#[cfg(feature = "networking")]
|
||||
async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
|
||||
if ws
|
||||
.send(Message::Text("get_processes".into()))
|
||||
@ -381,6 +419,7 @@ async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
|
||||
}
|
||||
|
||||
// Decompress a gzip-compressed binary frame into a String.
|
||||
#[cfg(feature = "networking")]
|
||||
fn gunzip_to_string(bytes: &[u8]) -> Option<String> {
|
||||
let mut dec = GzDecoder::new(bytes);
|
||||
let mut out = String::new();
|
||||
@ -388,6 +427,7 @@ fn gunzip_to_string(bytes: &[u8]) -> Option<String> {
|
||||
Some(out)
|
||||
}
|
||||
|
||||
#[cfg(feature = "networking")]
|
||||
fn gunzip_to_vec(bytes: &[u8]) -> Option<Vec<u8>> {
|
||||
let mut dec = GzDecoder::new(bytes);
|
||||
let mut out = Vec::new();
|
||||
@ -395,6 +435,7 @@ fn gunzip_to_vec(bytes: &[u8]) -> Option<Vec<u8>> {
|
||||
Some(out)
|
||||
}
|
||||
|
||||
#[cfg(feature = "networking")]
|
||||
fn is_gzip(bytes: &[u8]) -> bool {
|
||||
bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b
|
||||
}
|
||||
@ -405,6 +446,7 @@ fn is_gzip(bytes: &[u8]) -> bool {
|
||||
/// certificate involved, hostname verification is not applicable.
|
||||
///
|
||||
/// For TLS connections with certificate pinning, use `connect_to_socktop_agent_with_tls()`.
|
||||
#[cfg(feature = "networking")]
|
||||
pub async fn connect_to_socktop_agent(url: impl Into<String>) -> Result<SocktopConnector> {
|
||||
let config = ConnectorConfig::new(url);
|
||||
let mut connector = SocktopConnector::new(config);
|
||||
@ -418,6 +460,7 @@ pub async fn connect_to_socktop_agent(url: impl Into<String>) -> Result<SocktopC
|
||||
/// The `verify_hostname` parameter controls whether the server's hostname is verified
|
||||
/// against the certificate (recommended for production, can be disabled for testing).
|
||||
#[cfg(feature = "tls")]
|
||||
#[cfg(feature = "networking")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
|
||||
pub async fn connect_to_socktop_agent_with_tls(
|
||||
url: impl Into<String>,
|
||||
@ -451,6 +494,7 @@ pub async fn connect_to_socktop_agent_with_tls(
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
#[cfg(feature = "networking")]
|
||||
pub async fn connect_to_socktop_agent_with_config(
|
||||
url: impl Into<String>,
|
||||
protocols: Option<Vec<String>>,
|
||||
@ -470,3 +514,588 @@ pub async fn connect_to_socktop_agent_with_config(
|
||||
connector.connect().await?;
|
||||
Ok(connector)
|
||||
}
|
||||
|
||||
// WASM WebSocket implementation
|
||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||
impl SocktopConnector {
|
||||
/// Connect to the agent using WASM WebSocket
|
||||
pub async fn connect(&mut self) -> Result<()> {
|
||||
let websocket = WebSocket::new(&self.config.url).map_err(|e| {
|
||||
ConnectorError::protocol_error(&format!("Failed to create WebSocket: {:?}", e))
|
||||
})?;
|
||||
|
||||
// Set binary type for proper message handling
|
||||
websocket.set_binary_type(web_sys::BinaryType::Arraybuffer);
|
||||
|
||||
// Wait for connection to be ready with proper async delays
|
||||
let start_time = js_sys::Date::now();
|
||||
let timeout_ms = 10000.0; // 10 second timeout (increased from 5)
|
||||
|
||||
// Poll connection status until ready or timeout
|
||||
loop {
|
||||
let ready_state = websocket.ready_state();
|
||||
|
||||
if ready_state == 1 {
|
||||
// OPEN - connection is ready
|
||||
break;
|
||||
} else if ready_state == 3 {
|
||||
// CLOSED
|
||||
return Err(ConnectorError::protocol_error(
|
||||
"WebSocket connection closed",
|
||||
));
|
||||
} else if ready_state == 2 {
|
||||
// CLOSING
|
||||
return Err(ConnectorError::protocol_error("WebSocket is closing"));
|
||||
}
|
||||
|
||||
// Check timeout
|
||||
let now = js_sys::Date::now();
|
||||
if now - start_time > timeout_ms {
|
||||
return Err(ConnectorError::protocol_error(
|
||||
"WebSocket connection timeout",
|
||||
));
|
||||
}
|
||||
|
||||
// Proper async delay using setTimeout Promise
|
||||
let promise = js_sys::Promise::new(&mut |resolve, _| {
|
||||
let closure = Closure::once(move || resolve.call0(&JsValue::UNDEFINED));
|
||||
web_sys::window()
|
||||
.unwrap()
|
||||
.set_timeout_with_callback_and_timeout_and_arguments_0(
|
||||
closure.as_ref().unchecked_ref(),
|
||||
100, // 100ms delay between polls
|
||||
)
|
||||
.unwrap();
|
||||
closure.forget();
|
||||
});
|
||||
|
||||
let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
|
||||
}
|
||||
|
||||
self.websocket = Some(websocket);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a request to the agent and get the response
|
||||
pub async fn request(&mut self, request: AgentRequest) -> Result<AgentResponse> {
|
||||
let ws = self
|
||||
.websocket
|
||||
.as_ref()
|
||||
.ok_or(ConnectorError::NotConnected)?;
|
||||
|
||||
// Use the legacy string format that the agent expects
|
||||
let request_string = request.to_legacy_string();
|
||||
|
||||
// Send request
|
||||
ws.send_with_str(&request_string).map_err(|e| {
|
||||
ConnectorError::protocol_error(&format!("Failed to send message: {:?}", e))
|
||||
})?;
|
||||
|
||||
// Wait for response using JavaScript Promise
|
||||
let (response, binary_data) = self.wait_for_response_with_binary().await?;
|
||||
|
||||
// Parse the response based on the request type
|
||||
match request {
|
||||
AgentRequest::Metrics => {
|
||||
// Check if this is binary data (protobuf from agent)
|
||||
if response.starts_with("BINARY_DATA:") {
|
||||
// Extract the byte count
|
||||
let byte_count: usize = response
|
||||
.strip_prefix("BINARY_DATA:")
|
||||
.unwrap_or("0")
|
||||
.parse()
|
||||
.unwrap_or(0);
|
||||
|
||||
// For now, return a placeholder metrics response indicating binary data received
|
||||
// TODO: Implement proper protobuf decoding for binary data
|
||||
let placeholder_metrics = Metrics {
|
||||
cpu_total: 0.0,
|
||||
cpu_per_core: vec![0.0],
|
||||
mem_total: 0,
|
||||
mem_used: 0,
|
||||
swap_total: 0,
|
||||
swap_used: 0,
|
||||
hostname: format!("Binary protobuf data ({} bytes)", byte_count),
|
||||
cpu_temp_c: None,
|
||||
disks: vec![],
|
||||
networks: vec![],
|
||||
top_processes: vec![],
|
||||
gpus: None,
|
||||
process_count: None,
|
||||
};
|
||||
Ok(AgentResponse::Metrics(placeholder_metrics))
|
||||
} else {
|
||||
// Try to parse as JSON (fallback)
|
||||
let metrics: Metrics = serde_json::from_str(&response).map_err(|e| {
|
||||
ConnectorError::serialization_error(&format!(
|
||||
"Failed to parse metrics: {}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
Ok(AgentResponse::Metrics(metrics))
|
||||
}
|
||||
}
|
||||
AgentRequest::Disks => {
|
||||
let disks: Vec<DiskInfo> = serde_json::from_str(&response).map_err(|e| {
|
||||
ConnectorError::serialization_error(&format!("Failed to parse disks: {}", e))
|
||||
})?;
|
||||
Ok(AgentResponse::Disks(disks))
|
||||
}
|
||||
AgentRequest::Processes => {
|
||||
log_debug(&format!(
|
||||
"🔍 Processing process request - response: {}",
|
||||
if response.len() > 100 {
|
||||
format!("{}...", &response[..100])
|
||||
} else {
|
||||
response.clone()
|
||||
}
|
||||
));
|
||||
log_debug(&format!(
|
||||
"🔍 Binary data available: {}",
|
||||
binary_data.is_some()
|
||||
));
|
||||
if let Some(ref data) = binary_data {
|
||||
log_debug(&format!("🔍 Binary data size: {} bytes", data.len()));
|
||||
// Check if it's gzipped data and decompress it first
|
||||
if is_gzip_data(data) {
|
||||
log_debug("🔍 Process data is gzipped, decompressing...");
|
||||
match gunzip_to_vec_wasm(data) {
|
||||
Ok(decompressed_bytes) => {
|
||||
log_debug(&format!(
|
||||
"🔍 Successfully decompressed {} bytes, now decoding protobuf...",
|
||||
decompressed_bytes.len()
|
||||
));
|
||||
// Now decode the decompressed bytes as protobuf
|
||||
match <Processes as ProstMessage>::decode(
|
||||
decompressed_bytes.as_slice(),
|
||||
) {
|
||||
Ok(protobuf_processes) => {
|
||||
log_debug(&format!(
|
||||
"✅ Successfully decoded {} processes from gzipped protobuf",
|
||||
protobuf_processes.rows.len()
|
||||
));
|
||||
|
||||
// Convert protobuf processes to ProcessInfo structs
|
||||
let processes: Vec<ProcessInfo> = protobuf_processes
|
||||
.rows
|
||||
.into_iter()
|
||||
.map(|p| ProcessInfo {
|
||||
pid: p.pid,
|
||||
name: p.name,
|
||||
cpu_usage: p.cpu_usage,
|
||||
mem_bytes: p.mem_bytes,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let processes_payload = ProcessesPayload {
|
||||
top_processes: processes,
|
||||
process_count: protobuf_processes.process_count
|
||||
as usize,
|
||||
};
|
||||
return Ok(AgentResponse::Processes(processes_payload));
|
||||
}
|
||||
Err(e) => {
|
||||
log_debug(&format!(
|
||||
"❌ Failed to decode decompressed protobuf: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log_debug(&format!(
|
||||
"❌ Failed to decompress gzipped process data: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if this is binary data (protobuf from agent)
|
||||
if response.starts_with("BINARY_DATA:") {
|
||||
// Extract the binary data size and decode protobuf
|
||||
let byte_count_str = response.strip_prefix("BINARY_DATA:").unwrap_or("0");
|
||||
let _byte_count: usize = byte_count_str.parse().unwrap_or(0);
|
||||
|
||||
// Check if we have the actual binary data
|
||||
if let Some(binary_bytes) = binary_data {
|
||||
log_debug(&format!(
|
||||
"🔧 Decoding {} bytes of protobuf process data",
|
||||
binary_bytes.len()
|
||||
));
|
||||
|
||||
// Try to decode the protobuf data using the prost Message trait
|
||||
match <Processes as ProstMessage>::decode(&binary_bytes[..]) {
|
||||
Ok(protobuf_processes) => {
|
||||
log_debug(&format!(
|
||||
"✅ Successfully decoded {} processes from protobuf",
|
||||
protobuf_processes.rows.len()
|
||||
));
|
||||
|
||||
// Convert protobuf processes to ProcessInfo structs
|
||||
let processes: Vec<ProcessInfo> = protobuf_processes
|
||||
.rows
|
||||
.into_iter()
|
||||
.map(|p| ProcessInfo {
|
||||
pid: p.pid,
|
||||
name: p.name,
|
||||
cpu_usage: p.cpu_usage,
|
||||
mem_bytes: p.mem_bytes,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let processes_payload = ProcessesPayload {
|
||||
top_processes: processes,
|
||||
process_count: protobuf_processes.process_count as usize,
|
||||
};
|
||||
return Ok(AgentResponse::Processes(processes_payload));
|
||||
}
|
||||
Err(e) => {
|
||||
log_debug(&format!("❌ Failed to decode protobuf: {}", e));
|
||||
// Fallback to empty processes
|
||||
let processes = ProcessesPayload {
|
||||
top_processes: vec![],
|
||||
process_count: 0,
|
||||
};
|
||||
return Ok(AgentResponse::Processes(processes));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log_debug(
|
||||
"❌ Binary data indicator received but no actual binary data preserved",
|
||||
);
|
||||
let processes = ProcessesPayload {
|
||||
top_processes: vec![],
|
||||
process_count: 0,
|
||||
};
|
||||
return Ok(AgentResponse::Processes(processes));
|
||||
}
|
||||
} else {
|
||||
// Try to parse as JSON (fallback)
|
||||
let processes: ProcessesPayload =
|
||||
serde_json::from_str(&response).map_err(|e| {
|
||||
ConnectorError::serialization_error(&format!(
|
||||
"Failed to parse processes: {}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
Ok(AgentResponse::Processes(processes))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_response_with_binary(&self) -> Result<(String, Option<Vec<u8>>)> {
|
||||
let ws = self
|
||||
.websocket
|
||||
.as_ref()
|
||||
.ok_or(ConnectorError::NotConnected)?;
|
||||
|
||||
let start_time = js_sys::Date::now();
|
||||
let timeout_ms = 10000.0; // 10 second timeout
|
||||
|
||||
// Store the response in a shared location
|
||||
let response_cell = std::rc::Rc::new(std::cell::RefCell::new(None::<String>));
|
||||
let binary_data_cell = std::rc::Rc::new(std::cell::RefCell::new(None::<Vec<u8>>));
|
||||
let error_cell = std::rc::Rc::new(std::cell::RefCell::new(None::<String>));
|
||||
|
||||
// Use a unique request ID to avoid message collision
|
||||
let _request_id = js_sys::Math::random();
|
||||
let response_received = std::rc::Rc::new(std::cell::RefCell::new(false));
|
||||
|
||||
// Set up the message handler that only processes if we haven't gotten a response yet
|
||||
{
|
||||
let response_cell = response_cell.clone();
|
||||
let binary_data_cell = binary_data_cell.clone();
|
||||
let response_received = response_received.clone();
|
||||
let onmessage_callback = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| {
|
||||
// Only process if we haven't already received a response for this request
|
||||
if !*response_received.borrow() {
|
||||
// Handle text messages (JSON responses for metrics/disks)
|
||||
if let Ok(data) = e.data().dyn_into::<js_sys::JsString>() {
|
||||
let message = data.as_string().unwrap_or_default();
|
||||
if !message.is_empty() {
|
||||
// Debug: Log what we received (truncated)
|
||||
let preview = if message.len() > 100 {
|
||||
format!("{}...", &message[..100])
|
||||
} else {
|
||||
message.clone()
|
||||
};
|
||||
log_debug(&format!("🔍 Received text: {}", preview));
|
||||
|
||||
*response_cell.borrow_mut() = Some(message);
|
||||
*response_received.borrow_mut() = true;
|
||||
}
|
||||
}
|
||||
// Handle binary messages (could be JSON as text bytes or actual protobuf)
|
||||
else if let Ok(array_buffer) = e.data().dyn_into::<js_sys::ArrayBuffer>() {
|
||||
let uint8_array = js_sys::Uint8Array::new(&array_buffer);
|
||||
let length = uint8_array.length() as usize;
|
||||
let mut bytes = vec![0u8; length];
|
||||
uint8_array.copy_to(&mut bytes);
|
||||
|
||||
log_debug(&format!("🔍 Received binary data: {} bytes", length));
|
||||
|
||||
// Debug: Log the first few bytes to see what we're dealing with
|
||||
let first_bytes = if bytes.len() >= 4 {
|
||||
format!(
|
||||
"0x{:02x} 0x{:02x} 0x{:02x} 0x{:02x}",
|
||||
bytes[0], bytes[1], bytes[2], bytes[3]
|
||||
)
|
||||
} else {
|
||||
format!("Only {} bytes available", bytes.len())
|
||||
};
|
||||
log_debug(&format!("🔍 First bytes: {}", first_bytes));
|
||||
|
||||
// Try to decode as UTF-8 text first (in case it's JSON sent as binary)
|
||||
match String::from_utf8(bytes.clone()) {
|
||||
Ok(text) => {
|
||||
// If it decodes to valid UTF-8, check if it looks like JSON
|
||||
let trimmed = text.trim();
|
||||
if (trimmed.starts_with('{') && trimmed.ends_with('}'))
|
||||
|| (trimmed.starts_with('[') && trimmed.ends_with(']'))
|
||||
{
|
||||
log_debug(&format!(
|
||||
"🔍 Binary data is actually JSON text: {}",
|
||||
if text.len() > 100 {
|
||||
format!("{}...", &text[..100])
|
||||
} else {
|
||||
text.clone()
|
||||
}
|
||||
));
|
||||
*response_cell.borrow_mut() = Some(text);
|
||||
*response_received.borrow_mut() = true;
|
||||
} else {
|
||||
log_debug(&format!(
|
||||
"🔍 Binary data is UTF-8 text but not JSON: {}",
|
||||
if text.len() > 100 {
|
||||
format!("{}...", &text[..100])
|
||||
} else {
|
||||
text.clone()
|
||||
}
|
||||
));
|
||||
*response_cell.borrow_mut() = Some(text);
|
||||
*response_received.borrow_mut() = true;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// If it's not valid UTF-8, check if it's gzipped data
|
||||
if is_gzip_data(&bytes) {
|
||||
log_debug(&format!(
|
||||
"🔍 Binary data appears to be gzipped ({} bytes)",
|
||||
length
|
||||
));
|
||||
// Try to decompress using WASI-compatible decompression
|
||||
match decompress_gzip_browser(&bytes) {
|
||||
Ok(decompressed_text) => {
|
||||
log_debug(&format!(
|
||||
"🔍 Gzipped data decompressed to text: {}",
|
||||
if decompressed_text.len() > 100 {
|
||||
format!("{}...", &decompressed_text[..100])
|
||||
} else {
|
||||
decompressed_text.clone()
|
||||
}
|
||||
));
|
||||
*response_cell.borrow_mut() = Some(decompressed_text);
|
||||
*response_received.borrow_mut() = true;
|
||||
}
|
||||
Err(e) => {
|
||||
log_debug(&format!(
|
||||
"🔍 Failed to decompress gzip: {}",
|
||||
e
|
||||
));
|
||||
// Fallback: treat as actual binary protobuf data
|
||||
*binary_data_cell.borrow_mut() = Some(bytes.clone());
|
||||
*response_cell.borrow_mut() =
|
||||
Some(format!("BINARY_DATA:{}", length));
|
||||
*response_received.borrow_mut() = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// If it's not valid UTF-8 and not gzipped, it's likely actual binary protobuf data
|
||||
log_debug(&format!(
|
||||
"🔍 Binary data is actual protobuf ({} bytes)",
|
||||
length
|
||||
));
|
||||
*binary_data_cell.borrow_mut() = Some(bytes);
|
||||
*response_cell.borrow_mut() =
|
||||
Some(format!("BINARY_DATA:{}", length));
|
||||
*response_received.borrow_mut() = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Log what type of data we got
|
||||
log_debug(&format!("🔍 Received unknown data type: {:?}", e.data()));
|
||||
}
|
||||
}
|
||||
}) as Box<dyn FnMut(_)>);
|
||||
ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
|
||||
onmessage_callback.forget();
|
||||
}
|
||||
|
||||
// Set up the error handler
|
||||
{
|
||||
let error_cell = error_cell.clone();
|
||||
let response_received = response_received.clone();
|
||||
let onerror_callback = Closure::wrap(Box::new(move |_e: web_sys::ErrorEvent| {
|
||||
if !*response_received.borrow() {
|
||||
*error_cell.borrow_mut() = Some("WebSocket error occurred".to_string());
|
||||
*response_received.borrow_mut() = true;
|
||||
}
|
||||
}) as Box<dyn FnMut(_)>);
|
||||
ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
|
||||
onerror_callback.forget();
|
||||
}
|
||||
|
||||
// Poll for response with proper async delays
|
||||
loop {
|
||||
// Check for response
|
||||
if *response_received.borrow() {
|
||||
if let Some(response) = response_cell.borrow().as_ref() {
|
||||
let binary_data = binary_data_cell.borrow().clone();
|
||||
return Ok((response.clone(), binary_data));
|
||||
}
|
||||
if let Some(error) = error_cell.borrow().as_ref() {
|
||||
return Err(ConnectorError::protocol_error(error));
|
||||
}
|
||||
}
|
||||
|
||||
// Check timeout
|
||||
let now = js_sys::Date::now();
|
||||
if now - start_time > timeout_ms {
|
||||
*response_received.borrow_mut() = true; // Mark as done to prevent future processing
|
||||
return Err(ConnectorError::protocol_error("WebSocket response timeout"));
|
||||
}
|
||||
|
||||
// Wait 50ms before checking again
|
||||
let promise = js_sys::Promise::new(&mut |resolve, _| {
|
||||
let closure = Closure::once(move || resolve.call0(&JsValue::UNDEFINED));
|
||||
web_sys::window()
|
||||
.unwrap()
|
||||
.set_timeout_with_callback_and_timeout_and_arguments_0(
|
||||
closure.as_ref().unchecked_ref(),
|
||||
50,
|
||||
)
|
||||
.unwrap();
|
||||
closure.forget();
|
||||
});
|
||||
let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the connector is connected
|
||||
pub fn is_connected(&self) -> bool {
|
||||
self.websocket
|
||||
.as_ref()
|
||||
.map_or(false, |ws| ws.ready_state() == 1) // 1 = OPEN
|
||||
}
|
||||
|
||||
/// Disconnect from the agent
|
||||
pub async fn disconnect(&mut self) -> Result<()> {
|
||||
if let Some(ws) = self.websocket.take() {
|
||||
let _ = ws.close();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Request metrics from the agent
|
||||
pub async fn get_metrics(&mut self) -> Result<Metrics> {
|
||||
match self.request(AgentRequest::Metrics).await? {
|
||||
AgentResponse::Metrics(metrics) => Ok(metrics),
|
||||
_ => Err(ConnectorError::protocol_error(
|
||||
"Unexpected response type for metrics",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Request disk information from the agent
|
||||
pub async fn get_disks(&mut self) -> Result<Vec<DiskInfo>> {
|
||||
match self.request(AgentRequest::Disks).await? {
|
||||
AgentResponse::Disks(disks) => Ok(disks),
|
||||
_ => Err(ConnectorError::protocol_error(
|
||||
"Unexpected response type for disks",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Request process information from the agent
|
||||
pub async fn get_processes(&mut self) -> Result<ProcessesPayload> {
|
||||
match self.request(AgentRequest::Processes).await? {
|
||||
AgentResponse::Processes(processes) => Ok(processes),
|
||||
_ => Err(ConnectorError::protocol_error(
|
||||
"Unexpected response type for processes",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function for logging that works in WASI environments
|
||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||
fn log_debug(message: &str) {
|
||||
// For WASI environments like Zellij plugins, use eprintln
|
||||
eprintln!("{}", message);
|
||||
}
|
||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||
fn is_gzip_data(bytes: &[u8]) -> bool {
|
||||
// Gzip files start with the magic bytes 0x1f 0x8b
|
||||
bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||
fn decompress_gzip_browser(bytes: &[u8]) -> Result<String> {
|
||||
use flate2::read::GzDecoder;
|
||||
use std::io::Read;
|
||||
|
||||
let mut decoder = GzDecoder::new(bytes);
|
||||
let mut decompressed = String::new();
|
||||
decoder.read_to_string(&mut decompressed).map_err(|e| {
|
||||
ConnectorError::protocol_error(&format!("Gzip decompression failed: {}", e))
|
||||
})?;
|
||||
|
||||
Ok(decompressed)
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||
fn gunzip_to_vec_wasm(bytes: &[u8]) -> Result<Vec<u8>> {
|
||||
use flate2::read::GzDecoder;
|
||||
use std::io::Read;
|
||||
|
||||
let mut decoder = GzDecoder::new(bytes);
|
||||
let mut decompressed = Vec::new();
|
||||
decoder.read_to_end(&mut decompressed).map_err(|e| {
|
||||
ConnectorError::protocol_error(&format!("Gzip decompression failed: {}", e))
|
||||
})?;
|
||||
|
||||
Ok(decompressed)
|
||||
}
|
||||
|
||||
// Stub implementations when neither networking nor wasm is enabled
|
||||
#[cfg(not(any(feature = "networking", feature = "wasm")))]
|
||||
impl SocktopConnector {
|
||||
/// Connect to the socktop agent endpoint.
|
||||
///
|
||||
/// Note: Networking functionality is disabled. Enable the "networking" feature to use this function.
|
||||
pub async fn connect(&mut self) -> Result<()> {
|
||||
Err(ConnectorError::protocol_error(
|
||||
"Networking functionality disabled. Enable the 'networking' feature to connect to agents.",
|
||||
))
|
||||
}
|
||||
|
||||
/// Send a request to the agent and await a response.
|
||||
///
|
||||
/// Note: Networking functionality is disabled. Enable the "networking" feature to use this function.
|
||||
pub async fn request(&mut self, _request: AgentRequest) -> Result<AgentResponse> {
|
||||
Err(ConnectorError::protocol_error(
|
||||
"Networking functionality disabled. Enable the 'networking' feature to send requests.",
|
||||
))
|
||||
}
|
||||
|
||||
/// Close the connection to the agent.
|
||||
///
|
||||
/// Note: Networking functionality is disabled. This is a no-op when networking is disabled.
|
||||
pub async fn disconnect(&mut self) -> Result<()> {
|
||||
Ok(()) // No-op when networking is disabled
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ use thiserror::Error;
|
||||
#[derive(Error, Debug)]
|
||||
pub enum ConnectorError {
|
||||
/// WebSocket connection failed
|
||||
#[cfg(feature = "networking")]
|
||||
#[error("WebSocket connection failed: {source}")]
|
||||
ConnectionFailed {
|
||||
#[from]
|
||||
@ -13,6 +14,7 @@ pub enum ConnectorError {
|
||||
},
|
||||
|
||||
/// URL parsing error
|
||||
#[cfg(feature = "networking")]
|
||||
#[error("Invalid URL: {url}")]
|
||||
InvalidUrl {
|
||||
url: String,
|
||||
@ -124,8 +126,16 @@ impl ConnectorError {
|
||||
message: message.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a serialization error (wraps JSON error)
|
||||
pub fn serialization_error(message: impl Into<String>) -> Self {
|
||||
Self::ProtocolError {
|
||||
message: message.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "networking")]
|
||||
impl From<url::ParseError> for ConnectorError {
|
||||
fn from(source: url::ParseError) -> Self {
|
||||
Self::InvalidUrl {
|
||||
|
||||
@ -144,12 +144,12 @@ pub mod connector;
|
||||
pub mod error;
|
||||
pub mod types;
|
||||
|
||||
pub use connector::{
|
||||
ConnectorConfig, SocktopConnector, WsStream, connect_to_socktop_agent,
|
||||
connect_to_socktop_agent_with_config,
|
||||
};
|
||||
pub use connector::{ConnectorConfig, SocktopConnector};
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
#[cfg(feature = "networking")]
|
||||
pub use connector::{WsStream, connect_to_socktop_agent, connect_to_socktop_agent_with_config};
|
||||
|
||||
#[cfg(all(feature = "tls", feature = "networking"))]
|
||||
pub use connector::connect_to_socktop_agent_with_tls;
|
||||
pub use error::{ConnectorError, Result};
|
||||
pub use types::{
|
||||
|
||||
@ -97,9 +97,13 @@ impl AgentRequest {
|
||||
}
|
||||
|
||||
/// Response types that can be received from the agent
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum AgentResponse {
|
||||
#[serde(rename = "metrics")]
|
||||
Metrics(Metrics),
|
||||
#[serde(rename = "disks")]
|
||||
Disks(Vec<DiskInfo>),
|
||||
#[serde(rename = "processes")]
|
||||
Processes(ProcessesPayload),
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user