Housekeeping and QOL
non functional update: - refactor stream of consciousness into separate files. - combine equivelent functions used in networking and wasm features. - cleanups and version bumps.
This commit is contained in:
parent
cea133b7da
commit
08f248c696
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -2182,7 +2182,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "socktop_agent"
|
name = "socktop_agent"
|
||||||
version = "1.40.67"
|
version = "1.40.7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"assert_cmd",
|
"assert_cmd",
|
||||||
@ -2213,7 +2213,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "socktop_connector"
|
name = "socktop_connector"
|
||||||
version = "0.1.5"
|
version = "0.1.6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"flate2",
|
"flate2",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "socktop_agent"
|
name = "socktop_agent"
|
||||||
version = "1.40.67"
|
version = "1.40.7"
|
||||||
authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"]
|
authors = ["Jason Witty <jasonpwitty+socktop@proton.me>"]
|
||||||
description = "Socktop agent daemon. Serves host metrics over WebSocket."
|
description = "Socktop agent daemon. Serves host metrics over WebSocket."
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "socktop_connector"
|
name = "socktop_connector"
|
||||||
version = "0.1.5"
|
version = "0.1.6"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
description = "WebSocket connector library for socktop agent communication"
|
description = "WebSocket connector library for socktop agent communication"
|
||||||
@ -40,8 +40,8 @@ rustls-pemfile = { version = "2.1", optional = true }
|
|||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
|
|
||||||
# Compression - only for networking
|
# Compression - used in both networking and WASM modes
|
||||||
flate2 = { version = "1.0", optional = true }
|
flate2 = "1.0"
|
||||||
|
|
||||||
# Protobuf - always available
|
# Protobuf - always available
|
||||||
prost = { workspace = true }
|
prost = { workspace = true }
|
||||||
@ -55,6 +55,6 @@ protoc-bin-vendored = "3.0"
|
|||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["networking", "tls"]
|
default = ["networking", "tls"]
|
||||||
networking = ["tokio-tungstenite", "tokio", "futures-util", "url", "flate2"]
|
networking = ["tokio-tungstenite", "tokio", "futures-util", "url"]
|
||||||
tls = ["networking", "rustls", "rustls-pemfile"]
|
tls = ["networking", "rustls", "rustls-pemfile"]
|
||||||
wasm = ["wasm-bindgen", "wasm-bindgen-futures", "js-sys", "web-sys", "flate2"] # WASM-compatible networking with compression
|
wasm = ["wasm-bindgen", "wasm-bindgen-futures", "js-sys", "web-sys"] # WASM-compatible networking with compression
|
||||||
|
|||||||
@ -340,7 +340,7 @@ The library provides flexible configuration through the `ConnectorConfig` builde
|
|||||||
|
|
||||||
**Note**: Hostname verification only applies to TLS connections (`wss://`). Non-TLS connections (`ws://`) don't use certificates, so hostname verification is not applicable.
|
**Note**: Hostname verification only applies to TLS connections (`wss://`). Non-TLS connections (`ws://`) don't use certificates, so hostname verification is not applicable.
|
||||||
|
|
||||||
## WASM Compatibility
|
## WASM Compatibility (experimental)
|
||||||
|
|
||||||
`socktop_connector` provides **full WebSocket support** for WebAssembly (WASM) environments, including complete networking functionality with automatic compression and protobuf decoding.
|
`socktop_connector` provides **full WebSocket support** for WebAssembly (WASM) environments, including complete networking functionality with automatic compression and protobuf decoding.
|
||||||
|
|
||||||
|
|||||||
48
socktop_connector/src/config.rs
Normal file
48
socktop_connector/src/config.rs
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
//! Configuration for socktop WebSocket connections.
|
||||||
|
|
||||||
|
/// Configuration for connecting to a socktop agent.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ConnectorConfig {
|
||||||
|
pub url: String,
|
||||||
|
pub tls_ca_path: Option<String>,
|
||||||
|
pub verify_hostname: bool,
|
||||||
|
pub ws_protocols: Option<Vec<String>>,
|
||||||
|
pub ws_version: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectorConfig {
|
||||||
|
/// Create a new connector configuration with the given URL.
|
||||||
|
pub fn new(url: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
url: url.into(),
|
||||||
|
tls_ca_path: None,
|
||||||
|
verify_hostname: false,
|
||||||
|
ws_protocols: None,
|
||||||
|
ws_version: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the path to a custom TLS CA certificate file.
|
||||||
|
pub fn with_tls_ca(mut self, ca_path: impl Into<String>) -> Self {
|
||||||
|
self.tls_ca_path = Some(ca_path.into());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enable or disable hostname verification for TLS connections.
|
||||||
|
pub fn with_hostname_verification(mut self, verify: bool) -> Self {
|
||||||
|
self.verify_hostname = verify;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set WebSocket sub-protocols to negotiate.
|
||||||
|
pub fn with_protocols(mut self, protocols: Vec<String>) -> Self {
|
||||||
|
self.ws_protocols = Some(protocols);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set WebSocket protocol version (default is "13").
|
||||||
|
pub fn with_version(mut self, version: impl Into<String>) -> Self {
|
||||||
|
self.ws_version = Some(version.into());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,13 +1,35 @@
|
|||||||
//! WebSocket connector for communicating with socktop agents.
|
//! WebSocket connector for communicating with socktop agents.
|
||||||
|
|
||||||
#[cfg(feature = "networking")]
|
// WebSocket state constants
|
||||||
use flate2::bufread::GzDecoder;
|
#[cfg(feature = "wasm")]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
const WEBSOCKET_CONNECTING: u16 = 0;
|
||||||
|
#[cfg(feature = "wasm")]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
const WEBSOCKET_OPEN: u16 = 1;
|
||||||
|
#[cfg(feature = "wasm")]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
const WEBSOCKET_CLOSING: u16 = 2;
|
||||||
|
#[cfg(feature = "wasm")]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
const WEBSOCKET_CLOSED: u16 = 3;
|
||||||
|
|
||||||
|
// Gzip magic header constants
|
||||||
|
const GZIP_MAGIC_1: u8 = 0x1f;
|
||||||
|
const GZIP_MAGIC_2: u8 = 0x8b;
|
||||||
|
|
||||||
|
// Shared imports for both networking and WASM
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
use flate2::read::GzDecoder;
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
use std::io::Read;
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
use prost::Message as ProstMessage;
|
||||||
|
|
||||||
#[cfg(feature = "networking")]
|
#[cfg(feature = "networking")]
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
#[cfg(feature = "networking")]
|
#[cfg(feature = "networking")]
|
||||||
use prost::Message as _;
|
use std::io::BufReader;
|
||||||
#[cfg(feature = "networking")]
|
|
||||||
use std::io::Read;
|
|
||||||
#[cfg(feature = "networking")]
|
#[cfg(feature = "networking")]
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
#[cfg(feature = "networking")]
|
#[cfg(feature = "networking")]
|
||||||
@ -24,8 +46,6 @@ use web_sys::WebSocket;
|
|||||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||||
use pb::Processes;
|
use pb::Processes;
|
||||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||||
use prost::Message as ProstMessage;
|
|
||||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
|
||||||
use wasm_bindgen::{JsCast, JsValue, closure::Closure};
|
use wasm_bindgen::{JsCast, JsValue, closure::Closure};
|
||||||
|
|
||||||
#[cfg(feature = "tls")]
|
#[cfg(feature = "tls")]
|
||||||
@ -39,7 +59,7 @@ use rustls::{DigitallySignedStruct, SignatureScheme};
|
|||||||
#[cfg(feature = "tls")]
|
#[cfg(feature = "tls")]
|
||||||
use rustls_pemfile::Item;
|
use rustls_pemfile::Item;
|
||||||
#[cfg(feature = "tls")]
|
#[cfg(feature = "tls")]
|
||||||
use std::{fs::File, io::BufReader, sync::Arc};
|
use std::{fs::File, sync::Arc};
|
||||||
#[cfg(feature = "tls")]
|
#[cfg(feature = "tls")]
|
||||||
use tokio_tungstenite::{Connector, connect_async_tls_with_config};
|
use tokio_tungstenite::{Connector, connect_async_tls_with_config};
|
||||||
|
|
||||||
@ -349,7 +369,7 @@ async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
|
|||||||
}
|
}
|
||||||
match ws.next().await {
|
match ws.next().await {
|
||||||
Some(Ok(Message::Binary(b))) => {
|
Some(Ok(Message::Binary(b))) => {
|
||||||
gunzip_to_string(&b).and_then(|s| serde_json::from_str::<Metrics>(&s).ok())
|
gunzip_to_string(&b).ok().and_then(|s| serde_json::from_str::<Metrics>(&s).ok())
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Text(json))) => serde_json::from_str::<Metrics>(&json).ok(),
|
Some(Ok(Message::Text(json))) => serde_json::from_str::<Metrics>(&json).ok(),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -364,7 +384,7 @@ async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
|
|||||||
}
|
}
|
||||||
match ws.next().await {
|
match ws.next().await {
|
||||||
Some(Ok(Message::Binary(b))) => {
|
Some(Ok(Message::Binary(b))) => {
|
||||||
gunzip_to_string(&b).and_then(|s| serde_json::from_str::<Vec<DiskInfo>>(&s).ok())
|
gunzip_to_string(&b).ok().and_then(|s| serde_json::from_str::<Vec<DiskInfo>>(&s).ok())
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Text(json))) => serde_json::from_str::<Vec<DiskInfo>>(&json).ok(),
|
Some(Ok(Message::Text(json))) => serde_json::from_str::<Vec<DiskInfo>>(&json).ok(),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -384,7 +404,7 @@ async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
|
|||||||
match ws.next().await {
|
match ws.next().await {
|
||||||
Some(Ok(Message::Binary(b))) => {
|
Some(Ok(Message::Binary(b))) => {
|
||||||
let gz = is_gzip(&b);
|
let gz = is_gzip(&b);
|
||||||
let data = if gz { gunzip_to_vec(&b)? } else { b };
|
let data = if gz { gunzip_to_vec(&b).ok()? } else { b };
|
||||||
match pb::Processes::decode(data.as_slice()) {
|
match pb::Processes::decode(data.as_slice()) {
|
||||||
Ok(pb) => {
|
Ok(pb) => {
|
||||||
let rows: Vec<ProcessInfo> = pb
|
let rows: Vec<ProcessInfo> = pb
|
||||||
@ -420,25 +440,32 @@ async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Decompress a gzip-compressed binary frame into a String.
|
// Decompress a gzip-compressed binary frame into a String.
|
||||||
#[cfg(feature = "networking")]
|
/// Unified gzip decompression to string for both networking and WASM
|
||||||
fn gunzip_to_string(bytes: &[u8]) -> Option<String> {
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
let mut dec = GzDecoder::new(bytes);
|
fn gunzip_to_string(bytes: &[u8]) -> Result<String> {
|
||||||
let mut out = String::new();
|
let mut decoder = GzDecoder::new(bytes);
|
||||||
dec.read_to_string(&mut out).ok()?;
|
let mut decompressed = String::new();
|
||||||
Some(out)
|
decoder.read_to_string(&mut decompressed).map_err(|e| {
|
||||||
|
ConnectorError::protocol_error(format!("Gzip decompression failed: {e}"))
|
||||||
|
})?;
|
||||||
|
Ok(decompressed)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "networking")]
|
/// Unified gzip decompression to bytes for both networking and WASM
|
||||||
fn gunzip_to_vec(bytes: &[u8]) -> Option<Vec<u8>> {
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
let mut dec = GzDecoder::new(bytes);
|
fn gunzip_to_vec(bytes: &[u8]) -> Result<Vec<u8>> {
|
||||||
let mut out = Vec::new();
|
let mut decoder = GzDecoder::new(bytes);
|
||||||
dec.read_to_end(&mut out).ok()?;
|
let mut decompressed = Vec::new();
|
||||||
Some(out)
|
decoder.read_to_end(&mut decompressed).map_err(|e| {
|
||||||
|
ConnectorError::protocol_error(format!("Gzip decompression failed: {e}"))
|
||||||
|
})?;
|
||||||
|
Ok(decompressed)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "networking")]
|
/// Unified gzip detection for both networking and WASM
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
fn is_gzip(bytes: &[u8]) -> bool {
|
fn is_gzip(bytes: &[u8]) -> bool {
|
||||||
bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b
|
bytes.len() >= 2 && bytes[0] == GZIP_MAGIC_1 && bytes[1] == GZIP_MAGIC_2
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Convenience function to create a connector and connect in one step.
|
/// Convenience function to create a connector and connect in one step.
|
||||||
@ -522,7 +549,7 @@ impl SocktopConnector {
|
|||||||
/// Connect to the agent using WASM WebSocket
|
/// Connect to the agent using WASM WebSocket
|
||||||
pub async fn connect(&mut self) -> Result<()> {
|
pub async fn connect(&mut self) -> Result<()> {
|
||||||
let websocket = WebSocket::new(&self.config.url).map_err(|e| {
|
let websocket = WebSocket::new(&self.config.url).map_err(|e| {
|
||||||
ConnectorError::protocol_error(&format!("Failed to create WebSocket: {:?}", e))
|
ConnectorError::protocol_error(format!("Failed to create WebSocket: {e:?}"))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Set binary type for proper message handling
|
// Set binary type for proper message handling
|
||||||
@ -536,15 +563,15 @@ impl SocktopConnector {
|
|||||||
loop {
|
loop {
|
||||||
let ready_state = websocket.ready_state();
|
let ready_state = websocket.ready_state();
|
||||||
|
|
||||||
if ready_state == 1 {
|
if ready_state == WEBSOCKET_OPEN {
|
||||||
// OPEN - connection is ready
|
// OPEN - connection is ready
|
||||||
break;
|
break;
|
||||||
} else if ready_state == 3 {
|
} else if ready_state == WEBSOCKET_CLOSED {
|
||||||
// CLOSED
|
// CLOSED
|
||||||
return Err(ConnectorError::protocol_error(
|
return Err(ConnectorError::protocol_error(
|
||||||
"WebSocket connection closed",
|
"WebSocket connection closed",
|
||||||
));
|
));
|
||||||
} else if ready_state == 2 {
|
} else if ready_state == WEBSOCKET_CLOSING {
|
||||||
// CLOSING
|
// CLOSING
|
||||||
return Err(ConnectorError::protocol_error("WebSocket is closing"));
|
return Err(ConnectorError::protocol_error("WebSocket is closing"));
|
||||||
}
|
}
|
||||||
@ -589,7 +616,7 @@ impl SocktopConnector {
|
|||||||
|
|
||||||
// Send request
|
// Send request
|
||||||
ws.send_with_str(&request_string).map_err(|e| {
|
ws.send_with_str(&request_string).map_err(|e| {
|
||||||
ConnectorError::protocol_error(&format!("Failed to send message: {:?}", e))
|
ConnectorError::protocol_error(format!("Failed to send message: {e:?}"))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Wait for response using JavaScript Promise
|
// Wait for response using JavaScript Promise
|
||||||
@ -616,7 +643,7 @@ impl SocktopConnector {
|
|||||||
mem_used: 0,
|
mem_used: 0,
|
||||||
swap_total: 0,
|
swap_total: 0,
|
||||||
swap_used: 0,
|
swap_used: 0,
|
||||||
hostname: format!("Binary protobuf data ({} bytes)", byte_count),
|
hostname: format!("Binary protobuf data ({byte_count} bytes)"),
|
||||||
cpu_temp_c: None,
|
cpu_temp_c: None,
|
||||||
disks: vec![],
|
disks: vec![],
|
||||||
networks: vec![],
|
networks: vec![],
|
||||||
@ -628,9 +655,8 @@ impl SocktopConnector {
|
|||||||
} else {
|
} else {
|
||||||
// Try to parse as JSON (fallback)
|
// Try to parse as JSON (fallback)
|
||||||
let metrics: Metrics = serde_json::from_str(&response).map_err(|e| {
|
let metrics: Metrics = serde_json::from_str(&response).map_err(|e| {
|
||||||
ConnectorError::serialization_error(&format!(
|
ConnectorError::serialization_error(format!(
|
||||||
"Failed to parse metrics: {}",
|
"Failed to parse metrics: {e}"
|
||||||
e
|
|
||||||
))
|
))
|
||||||
})?;
|
})?;
|
||||||
Ok(AgentResponse::Metrics(metrics))
|
Ok(AgentResponse::Metrics(metrics))
|
||||||
@ -638,7 +664,7 @@ impl SocktopConnector {
|
|||||||
}
|
}
|
||||||
AgentRequest::Disks => {
|
AgentRequest::Disks => {
|
||||||
let disks: Vec<DiskInfo> = serde_json::from_str(&response).map_err(|e| {
|
let disks: Vec<DiskInfo> = serde_json::from_str(&response).map_err(|e| {
|
||||||
ConnectorError::serialization_error(&format!("Failed to parse disks: {}", e))
|
ConnectorError::serialization_error(format!("Failed to parse disks: {e}"))
|
||||||
})?;
|
})?;
|
||||||
Ok(AgentResponse::Disks(disks))
|
Ok(AgentResponse::Disks(disks))
|
||||||
}
|
}
|
||||||
@ -658,9 +684,9 @@ impl SocktopConnector {
|
|||||||
if let Some(ref data) = binary_data {
|
if let Some(ref data) = binary_data {
|
||||||
log_debug(&format!("🔍 Binary data size: {} bytes", data.len()));
|
log_debug(&format!("🔍 Binary data size: {} bytes", data.len()));
|
||||||
// Check if it's gzipped data and decompress it first
|
// Check if it's gzipped data and decompress it first
|
||||||
if is_gzip_data(data) {
|
if is_gzip(data) {
|
||||||
log_debug("🔍 Process data is gzipped, decompressing...");
|
log_debug("🔍 Process data is gzipped, decompressing...");
|
||||||
match gunzip_to_vec_wasm(data) {
|
match gunzip_to_vec(data) {
|
||||||
Ok(decompressed_bytes) => {
|
Ok(decompressed_bytes) => {
|
||||||
log_debug(&format!(
|
log_debug(&format!(
|
||||||
"🔍 Successfully decompressed {} bytes, now decoding protobuf...",
|
"🔍 Successfully decompressed {} bytes, now decoding protobuf...",
|
||||||
@ -697,16 +723,14 @@ impl SocktopConnector {
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log_debug(&format!(
|
log_debug(&format!(
|
||||||
"❌ Failed to decode decompressed protobuf: {}",
|
"❌ Failed to decode decompressed protobuf: {e}"
|
||||||
e
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log_debug(&format!(
|
log_debug(&format!(
|
||||||
"❌ Failed to decompress gzipped process data: {}",
|
"❌ Failed to decompress gzipped process data: {e}"
|
||||||
e
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -750,16 +774,16 @@ impl SocktopConnector {
|
|||||||
top_processes: processes,
|
top_processes: processes,
|
||||||
process_count: protobuf_processes.process_count as usize,
|
process_count: protobuf_processes.process_count as usize,
|
||||||
};
|
};
|
||||||
return Ok(AgentResponse::Processes(processes_payload));
|
Ok(AgentResponse::Processes(processes_payload))
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log_debug(&format!("❌ Failed to decode protobuf: {}", e));
|
log_debug(&format!("❌ Failed to decode protobuf: {e}"));
|
||||||
// Fallback to empty processes
|
// Fallback to empty processes
|
||||||
let processes = ProcessesPayload {
|
let processes = ProcessesPayload {
|
||||||
top_processes: vec![],
|
top_processes: vec![],
|
||||||
process_count: 0,
|
process_count: 0,
|
||||||
};
|
};
|
||||||
return Ok(AgentResponse::Processes(processes));
|
Ok(AgentResponse::Processes(processes))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -770,15 +794,14 @@ impl SocktopConnector {
|
|||||||
top_processes: vec![],
|
top_processes: vec![],
|
||||||
process_count: 0,
|
process_count: 0,
|
||||||
};
|
};
|
||||||
return Ok(AgentResponse::Processes(processes));
|
Ok(AgentResponse::Processes(processes))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Try to parse as JSON (fallback)
|
// Try to parse as JSON (fallback)
|
||||||
let processes: ProcessesPayload =
|
let processes: ProcessesPayload =
|
||||||
serde_json::from_str(&response).map_err(|e| {
|
serde_json::from_str(&response).map_err(|e| {
|
||||||
ConnectorError::serialization_error(&format!(
|
ConnectorError::serialization_error(format!(
|
||||||
"Failed to parse processes: {}",
|
"Failed to parse processes: {e}"
|
||||||
e
|
|
||||||
))
|
))
|
||||||
})?;
|
})?;
|
||||||
Ok(AgentResponse::Processes(processes))
|
Ok(AgentResponse::Processes(processes))
|
||||||
@ -823,7 +846,7 @@ impl SocktopConnector {
|
|||||||
} else {
|
} else {
|
||||||
message.clone()
|
message.clone()
|
||||||
};
|
};
|
||||||
log_debug(&format!("🔍 Received text: {}", preview));
|
log_debug(&format!("🔍 Received text: {preview}"));
|
||||||
|
|
||||||
*response_cell.borrow_mut() = Some(message);
|
*response_cell.borrow_mut() = Some(message);
|
||||||
*response_received.borrow_mut() = true;
|
*response_received.borrow_mut() = true;
|
||||||
@ -836,7 +859,7 @@ impl SocktopConnector {
|
|||||||
let mut bytes = vec![0u8; length];
|
let mut bytes = vec![0u8; length];
|
||||||
uint8_array.copy_to(&mut bytes);
|
uint8_array.copy_to(&mut bytes);
|
||||||
|
|
||||||
log_debug(&format!("🔍 Received binary data: {} bytes", length));
|
log_debug(&format!("🔍 Received binary data: {length} bytes"));
|
||||||
|
|
||||||
// Debug: Log the first few bytes to see what we're dealing with
|
// Debug: Log the first few bytes to see what we're dealing with
|
||||||
let first_bytes = if bytes.len() >= 4 {
|
let first_bytes = if bytes.len() >= 4 {
|
||||||
@ -847,7 +870,7 @@ impl SocktopConnector {
|
|||||||
} else {
|
} else {
|
||||||
format!("Only {} bytes available", bytes.len())
|
format!("Only {} bytes available", bytes.len())
|
||||||
};
|
};
|
||||||
log_debug(&format!("🔍 First bytes: {}", first_bytes));
|
log_debug(&format!("🔍 First bytes: {first_bytes}"));
|
||||||
|
|
||||||
// Try to decode as UTF-8 text first (in case it's JSON sent as binary)
|
// Try to decode as UTF-8 text first (in case it's JSON sent as binary)
|
||||||
match String::from_utf8(bytes.clone()) {
|
match String::from_utf8(bytes.clone()) {
|
||||||
@ -882,13 +905,12 @@ impl SocktopConnector {
|
|||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
// If it's not valid UTF-8, check if it's gzipped data
|
// If it's not valid UTF-8, check if it's gzipped data
|
||||||
if is_gzip_data(&bytes) {
|
if is_gzip(&bytes) {
|
||||||
log_debug(&format!(
|
log_debug(&format!(
|
||||||
"🔍 Binary data appears to be gzipped ({} bytes)",
|
"🔍 Binary data appears to be gzipped ({length} bytes)"
|
||||||
length
|
|
||||||
));
|
));
|
||||||
// Try to decompress using WASI-compatible decompression
|
// Try to decompress using unified gzip decompression
|
||||||
match decompress_gzip_browser(&bytes) {
|
match gunzip_to_string(&bytes) {
|
||||||
Ok(decompressed_text) => {
|
Ok(decompressed_text) => {
|
||||||
log_debug(&format!(
|
log_debug(&format!(
|
||||||
"🔍 Gzipped data decompressed to text: {}",
|
"🔍 Gzipped data decompressed to text: {}",
|
||||||
@ -903,25 +925,23 @@ impl SocktopConnector {
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log_debug(&format!(
|
log_debug(&format!(
|
||||||
"🔍 Failed to decompress gzip: {}",
|
"🔍 Failed to decompress gzip: {e}"
|
||||||
e
|
|
||||||
));
|
));
|
||||||
// Fallback: treat as actual binary protobuf data
|
// Fallback: treat as actual binary protobuf data
|
||||||
*binary_data_cell.borrow_mut() = Some(bytes.clone());
|
*binary_data_cell.borrow_mut() = Some(bytes.clone());
|
||||||
*response_cell.borrow_mut() =
|
*response_cell.borrow_mut() =
|
||||||
Some(format!("BINARY_DATA:{}", length));
|
Some(format!("BINARY_DATA:{length}"));
|
||||||
*response_received.borrow_mut() = true;
|
*response_received.borrow_mut() = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// If it's not valid UTF-8 and not gzipped, it's likely actual binary protobuf data
|
// If it's not valid UTF-8 and not gzipped, it's likely actual binary protobuf data
|
||||||
log_debug(&format!(
|
log_debug(&format!(
|
||||||
"🔍 Binary data is actual protobuf ({} bytes)",
|
"🔍 Binary data is actual protobuf ({length} bytes)"
|
||||||
length
|
|
||||||
));
|
));
|
||||||
*binary_data_cell.borrow_mut() = Some(bytes);
|
*binary_data_cell.borrow_mut() = Some(bytes);
|
||||||
*response_cell.borrow_mut() =
|
*response_cell.borrow_mut() =
|
||||||
Some(format!("BINARY_DATA:{}", length));
|
Some(format!("BINARY_DATA:{length}"));
|
||||||
*response_received.borrow_mut() = true;
|
*response_received.borrow_mut() = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -990,7 +1010,7 @@ impl SocktopConnector {
|
|||||||
pub fn is_connected(&self) -> bool {
|
pub fn is_connected(&self) -> bool {
|
||||||
self.websocket
|
self.websocket
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map_or(false, |ws| ws.ready_state() == 1) // 1 = OPEN
|
.is_some_and(|ws| ws.ready_state() == WEBSOCKET_OPEN)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Disconnect from the agent
|
/// Disconnect from the agent
|
||||||
@ -1033,43 +1053,17 @@ impl SocktopConnector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Helper function for logging that works in WASI environments
|
// Helper function for logging that works in WASI environments
|
||||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
/// Unified debug logging for both networking and WASM modes
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
#[allow(dead_code)]
|
||||||
fn log_debug(message: &str) {
|
fn log_debug(message: &str) {
|
||||||
// For WASI environments like Zellij plugins, use eprintln
|
#[cfg(feature = "networking")]
|
||||||
eprintln!("{}", message);
|
if std::env::var("SOCKTOP_DEBUG").ok().as_deref() == Some("1") {
|
||||||
}
|
eprintln!("{message}");
|
||||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
|
||||||
fn is_gzip_data(bytes: &[u8]) -> bool {
|
|
||||||
// Gzip files start with the magic bytes 0x1f 0x8b
|
|
||||||
bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||||
fn decompress_gzip_browser(bytes: &[u8]) -> Result<String> {
|
eprintln!("{message}");
|
||||||
use flate2::read::GzDecoder;
|
|
||||||
use std::io::Read;
|
|
||||||
|
|
||||||
let mut decoder = GzDecoder::new(bytes);
|
|
||||||
let mut decompressed = String::new();
|
|
||||||
decoder.read_to_string(&mut decompressed).map_err(|e| {
|
|
||||||
ConnectorError::protocol_error(&format!("Gzip decompression failed: {}", e))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(decompressed)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
|
||||||
fn gunzip_to_vec_wasm(bytes: &[u8]) -> Result<Vec<u8>> {
|
|
||||||
use flate2::read::GzDecoder;
|
|
||||||
use std::io::Read;
|
|
||||||
|
|
||||||
let mut decoder = GzDecoder::new(bytes);
|
|
||||||
let mut decompressed = Vec::new();
|
|
||||||
decoder.read_to_end(&mut decompressed).map_err(|e| {
|
|
||||||
ConnectorError::protocol_error(&format!("Gzip decompression failed: {}", e))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(decompressed)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stub implementations when neither networking nor wasm is enabled
|
// Stub implementations when neither networking nor wasm is enabled
|
||||||
|
|||||||
261
socktop_connector/src/connector_impl.rs
Normal file
261
socktop_connector/src/connector_impl.rs
Normal file
@ -0,0 +1,261 @@
|
|||||||
|
//! Modular SocktopConnector implementation using networking and WASM modules.
|
||||||
|
|
||||||
|
use crate::config::ConnectorConfig;
|
||||||
|
use crate::error::{ConnectorError, Result};
|
||||||
|
use crate::{AgentRequest, AgentResponse};
|
||||||
|
|
||||||
|
#[cfg(feature = "networking")]
|
||||||
|
use crate::networking::{
|
||||||
|
WsStream, connect_to_agent, request_disks, request_metrics, request_processes,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||||
|
use crate::wasm::{connect_to_agent, send_request_and_wait};
|
||||||
|
|
||||||
|
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||||
|
use crate::{DiskInfo, Metrics, ProcessesPayload};
|
||||||
|
|
||||||
|
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||||
|
use web_sys::WebSocket;
|
||||||
|
|
||||||
|
/// Main connector for communicating with socktop agents
|
||||||
|
pub struct SocktopConnector {
|
||||||
|
pub config: ConnectorConfig,
|
||||||
|
#[cfg(feature = "networking")]
|
||||||
|
stream: Option<WsStream>,
|
||||||
|
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||||
|
websocket: Option<WebSocket>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SocktopConnector {
|
||||||
|
/// Create a new connector with the given configuration
|
||||||
|
pub fn new(config: ConnectorConfig) -> Self {
|
||||||
|
Self {
|
||||||
|
config,
|
||||||
|
#[cfg(feature = "networking")]
|
||||||
|
stream: None,
|
||||||
|
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||||
|
websocket: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "networking")]
|
||||||
|
impl SocktopConnector {
|
||||||
|
/// Connect to the agent
|
||||||
|
pub async fn connect(&mut self) -> Result<()> {
|
||||||
|
let stream = connect_to_agent(&self.config).await?;
|
||||||
|
self.stream = Some(stream);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a request to the agent and get the response
|
||||||
|
pub async fn request(&mut self, request: AgentRequest) -> Result<AgentResponse> {
|
||||||
|
let stream = self.stream.as_mut().ok_or(ConnectorError::NotConnected)?;
|
||||||
|
|
||||||
|
match request {
|
||||||
|
AgentRequest::Metrics => {
|
||||||
|
let metrics = request_metrics(stream)
|
||||||
|
.await
|
||||||
|
.ok_or_else(|| ConnectorError::invalid_response("Failed to get metrics"))?;
|
||||||
|
Ok(AgentResponse::Metrics(metrics))
|
||||||
|
}
|
||||||
|
AgentRequest::Disks => {
|
||||||
|
let disks = request_disks(stream)
|
||||||
|
.await
|
||||||
|
.ok_or_else(|| ConnectorError::invalid_response("Failed to get disks"))?;
|
||||||
|
Ok(AgentResponse::Disks(disks))
|
||||||
|
}
|
||||||
|
AgentRequest::Processes => {
|
||||||
|
let processes = request_processes(stream)
|
||||||
|
.await
|
||||||
|
.ok_or_else(|| ConnectorError::invalid_response("Failed to get processes"))?;
|
||||||
|
Ok(AgentResponse::Processes(processes))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if the connector is connected
|
||||||
|
pub fn is_connected(&self) -> bool {
|
||||||
|
self.stream.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Disconnect from the agent
|
||||||
|
pub async fn disconnect(&mut self) -> Result<()> {
|
||||||
|
if let Some(mut stream) = self.stream.take() {
|
||||||
|
let _ = stream.close(None).await;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WASM WebSocket implementation
|
||||||
|
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||||
|
impl SocktopConnector {
|
||||||
|
/// Connect to the agent using WASM WebSocket
|
||||||
|
pub async fn connect(&mut self) -> Result<()> {
|
||||||
|
let websocket = connect_to_agent(&self.config).await?;
|
||||||
|
self.websocket = Some(websocket);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a request to the agent and get the response
|
||||||
|
pub async fn request(&mut self, request: AgentRequest) -> Result<AgentResponse> {
|
||||||
|
let ws = self
|
||||||
|
.websocket
|
||||||
|
.as_ref()
|
||||||
|
.ok_or(ConnectorError::NotConnected)?;
|
||||||
|
|
||||||
|
send_request_and_wait(ws, request).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if the connector is connected
|
||||||
|
pub fn is_connected(&self) -> bool {
|
||||||
|
use crate::utils::WEBSOCKET_OPEN;
|
||||||
|
self.websocket
|
||||||
|
.as_ref()
|
||||||
|
.is_some_and(|ws| ws.ready_state() == WEBSOCKET_OPEN)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Disconnect from the agent
|
||||||
|
pub async fn disconnect(&mut self) -> Result<()> {
|
||||||
|
if let Some(ws) = self.websocket.take() {
|
||||||
|
let _ = ws.close();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Request metrics from the agent
|
||||||
|
pub async fn get_metrics(&mut self) -> Result<Metrics> {
|
||||||
|
match self.request(AgentRequest::Metrics).await? {
|
||||||
|
AgentResponse::Metrics(metrics) => Ok(metrics),
|
||||||
|
_ => Err(ConnectorError::protocol_error(
|
||||||
|
"Unexpected response type for metrics",
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Request disk information from the agent
|
||||||
|
pub async fn get_disks(&mut self) -> Result<Vec<DiskInfo>> {
|
||||||
|
match self.request(AgentRequest::Disks).await? {
|
||||||
|
AgentResponse::Disks(disks) => Ok(disks),
|
||||||
|
_ => Err(ConnectorError::protocol_error(
|
||||||
|
"Unexpected response type for disks",
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Request process information from the agent
|
||||||
|
pub async fn get_processes(&mut self) -> Result<ProcessesPayload> {
|
||||||
|
match self.request(AgentRequest::Processes).await? {
|
||||||
|
AgentResponse::Processes(processes) => Ok(processes),
|
||||||
|
_ => Err(ConnectorError::protocol_error(
|
||||||
|
"Unexpected response type for processes",
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stub implementations when neither networking nor wasm is enabled
|
||||||
|
#[cfg(not(any(feature = "networking", feature = "wasm")))]
|
||||||
|
impl SocktopConnector {
|
||||||
|
/// Connect to the socktop agent endpoint.
|
||||||
|
///
|
||||||
|
/// Note: Networking functionality is disabled. Enable the "networking" feature to use this function.
|
||||||
|
pub async fn connect(&mut self) -> Result<()> {
|
||||||
|
Err(ConnectorError::protocol_error(
|
||||||
|
"Networking functionality disabled. Enable the 'networking' feature to connect to agents.",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a request to the agent and await a response.
|
||||||
|
///
|
||||||
|
/// Note: Networking functionality is disabled. Enable the "networking" feature to use this function.
|
||||||
|
pub async fn request(&mut self, _request: AgentRequest) -> Result<AgentResponse> {
|
||||||
|
Err(ConnectorError::protocol_error(
|
||||||
|
"Networking functionality disabled. Enable the 'networking' feature to send requests.",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Close the connection to the agent.
|
||||||
|
///
|
||||||
|
/// Note: Networking functionality is disabled. This is a no-op when networking is disabled.
|
||||||
|
pub async fn disconnect(&mut self) -> Result<()> {
|
||||||
|
Ok(()) // No-op when networking is disabled
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convenience function to create a connector and connect in one step.
|
||||||
|
///
|
||||||
|
/// This function is for non-TLS WebSocket connections (`ws://`). Since there's no
|
||||||
|
/// certificate involved, hostname verification is not applicable.
|
||||||
|
///
|
||||||
|
/// For TLS connections with certificate pinning, use `connect_to_socktop_agent_with_tls()`.
|
||||||
|
#[cfg(feature = "networking")]
|
||||||
|
pub async fn connect_to_socktop_agent(url: impl Into<String>) -> Result<SocktopConnector> {
|
||||||
|
let config = ConnectorConfig::new(url);
|
||||||
|
let mut connector = SocktopConnector::new(config);
|
||||||
|
connector.connect().await?;
|
||||||
|
Ok(connector)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convenience function to create a connector with TLS and connect in one step.
|
||||||
|
///
|
||||||
|
/// This function enables TLS with certificate pinning using the provided CA certificate.
|
||||||
|
/// The `verify_hostname` parameter controls whether the server's hostname is verified
|
||||||
|
/// against the certificate (recommended for production, can be disabled for testing).
|
||||||
|
#[cfg(feature = "tls")]
|
||||||
|
#[cfg(feature = "networking")]
|
||||||
|
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
|
||||||
|
pub async fn connect_to_socktop_agent_with_tls(
|
||||||
|
url: impl Into<String>,
|
||||||
|
ca_path: impl Into<String>,
|
||||||
|
verify_hostname: bool,
|
||||||
|
) -> Result<SocktopConnector> {
|
||||||
|
let config = ConnectorConfig::new(url)
|
||||||
|
.with_tls_ca(ca_path)
|
||||||
|
.with_hostname_verification(verify_hostname);
|
||||||
|
let mut connector = SocktopConnector::new(config);
|
||||||
|
connector.connect().await?;
|
||||||
|
Ok(connector)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convenience function to create a connector with custom WebSocket protocol configuration.
|
||||||
|
///
|
||||||
|
/// This function allows you to specify WebSocket protocol version and sub-protocols.
|
||||||
|
/// Most users should use the simpler `connect_to_socktop_agent()` function instead.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
/// ```no_run
|
||||||
|
/// use socktop_connector::connect_to_socktop_agent_with_config;
|
||||||
|
///
|
||||||
|
/// # #[tokio::main]
|
||||||
|
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
/// let connector = connect_to_socktop_agent_with_config(
|
||||||
|
/// "ws://localhost:3000/ws",
|
||||||
|
/// Some(vec!["socktop".to_string()]), // WebSocket sub-protocols
|
||||||
|
/// Some("13".to_string()), // WebSocket version (13 is standard)
|
||||||
|
/// ).await?;
|
||||||
|
/// # Ok(())
|
||||||
|
/// # }
|
||||||
|
/// ```
|
||||||
|
#[cfg(feature = "networking")]
|
||||||
|
pub async fn connect_to_socktop_agent_with_config(
|
||||||
|
url: impl Into<String>,
|
||||||
|
protocols: Option<Vec<String>>,
|
||||||
|
version: Option<String>,
|
||||||
|
) -> Result<SocktopConnector> {
|
||||||
|
let mut config = ConnectorConfig::new(url);
|
||||||
|
|
||||||
|
if let Some(protocols) = protocols {
|
||||||
|
config = config.with_protocols(protocols);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(version) = version {
|
||||||
|
config = config.with_version(version);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut connector = SocktopConnector::new(config);
|
||||||
|
connector.connect().await?;
|
||||||
|
Ok(connector)
|
||||||
|
}
|
||||||
@ -140,19 +140,43 @@
|
|||||||
|
|
||||||
#![cfg_attr(docsrs, feature(doc_cfg))]
|
#![cfg_attr(docsrs, feature(doc_cfg))]
|
||||||
|
|
||||||
pub mod connector;
|
// Core modules
|
||||||
|
pub mod config;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
|
pub mod utils;
|
||||||
|
|
||||||
pub use connector::{ConnectorConfig, SocktopConnector};
|
// Implementation modules
|
||||||
|
|
||||||
#[cfg(feature = "networking")]
|
#[cfg(feature = "networking")]
|
||||||
pub use connector::{WsStream, connect_to_socktop_agent, connect_to_socktop_agent_with_config};
|
pub mod networking;
|
||||||
|
|
||||||
#[cfg(all(feature = "tls", feature = "networking"))]
|
#[cfg(feature = "wasm")]
|
||||||
pub use connector::connect_to_socktop_agent_with_tls;
|
pub mod wasm;
|
||||||
|
|
||||||
|
// Main connector implementation
|
||||||
|
pub mod connector_impl;
|
||||||
|
|
||||||
|
// Re-export the main types
|
||||||
|
pub use config::ConnectorConfig;
|
||||||
|
pub use connector_impl::SocktopConnector;
|
||||||
pub use error::{ConnectorError, Result};
|
pub use error::{ConnectorError, Result};
|
||||||
pub use types::{
|
pub use types::{
|
||||||
AgentRequest, AgentResponse, DiskInfo, GpuInfo, Metrics, NetworkInfo, ProcessInfo,
|
AgentRequest, AgentResponse, DiskInfo, GpuInfo, Metrics, NetworkInfo, ProcessInfo,
|
||||||
ProcessesPayload,
|
ProcessesPayload,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Re-export convenience functions
|
||||||
|
#[cfg(feature = "networking")]
|
||||||
|
pub use connector_impl::{connect_to_socktop_agent, connect_to_socktop_agent_with_config};
|
||||||
|
|
||||||
|
#[cfg(all(feature = "tls", feature = "networking"))]
|
||||||
|
pub use connector_impl::connect_to_socktop_agent_with_tls;
|
||||||
|
|
||||||
|
#[cfg(feature = "networking")]
|
||||||
|
pub use networking::WsStream;
|
||||||
|
|
||||||
|
// Protobuf types for internal use
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
pub mod pb {
|
||||||
|
include!(concat!(env!("OUT_DIR"), "/socktop.rs"));
|
||||||
|
}
|
||||||
|
|||||||
185
socktop_connector/src/networking/connection.rs
Normal file
185
socktop_connector/src/networking/connection.rs
Normal file
@ -0,0 +1,185 @@
|
|||||||
|
//! WebSocket connection handling for native (non-WASM) environments.
|
||||||
|
|
||||||
|
use crate::config::ConnectorConfig;
|
||||||
|
use crate::error::{ConnectorError, Result};
|
||||||
|
|
||||||
|
use std::io::BufReader;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
||||||
|
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
#[cfg(feature = "tls")]
|
||||||
|
use {
|
||||||
|
rustls::{self, ClientConfig},
|
||||||
|
rustls::{
|
||||||
|
DigitallySignedStruct, RootCertStore, SignatureScheme,
|
||||||
|
client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
|
||||||
|
crypto::ring,
|
||||||
|
pki_types::{CertificateDer, ServerName, UnixTime},
|
||||||
|
},
|
||||||
|
rustls_pemfile::Item,
|
||||||
|
std::fs::File,
|
||||||
|
tokio_tungstenite::Connector,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub type WsStream = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
|
||||||
|
|
||||||
|
/// Connect to the agent and return the WS stream
|
||||||
|
pub async fn connect_to_agent(config: &ConnectorConfig) -> Result<WsStream> {
|
||||||
|
#[cfg(feature = "tls")]
|
||||||
|
ensure_crypto_provider();
|
||||||
|
|
||||||
|
let mut u = Url::parse(&config.url)?;
|
||||||
|
if let Some(ca_path) = &config.tls_ca_path {
|
||||||
|
if u.scheme() == "ws" {
|
||||||
|
let _ = u.set_scheme("wss");
|
||||||
|
}
|
||||||
|
return connect_with_ca_and_config(u.as_str(), ca_path, config).await;
|
||||||
|
}
|
||||||
|
// No TLS - hostname verification is not applicable
|
||||||
|
connect_without_ca_and_config(u.as_str(), config).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect_without_ca_and_config(url: &str, config: &ConnectorConfig) -> Result<WsStream> {
|
||||||
|
let mut req = url.into_client_request()?;
|
||||||
|
|
||||||
|
// Apply WebSocket protocol configuration
|
||||||
|
if let Some(version) = &config.ws_version {
|
||||||
|
req.headers_mut().insert(
|
||||||
|
"Sec-WebSocket-Version",
|
||||||
|
version
|
||||||
|
.parse()
|
||||||
|
.map_err(|_| ConnectorError::protocol_error("Invalid WebSocket version"))?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(protocols) = &config.ws_protocols {
|
||||||
|
let protocols_str = protocols.join(", ");
|
||||||
|
req.headers_mut().insert(
|
||||||
|
"Sec-WebSocket-Protocol",
|
||||||
|
protocols_str
|
||||||
|
.parse()
|
||||||
|
.map_err(|_| ConnectorError::protocol_error("Invalid WebSocket protocols"))?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let (ws, _) = connect_async(req).await?;
|
||||||
|
Ok(ws)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "tls")]
|
||||||
|
async fn connect_with_ca_and_config(
|
||||||
|
url: &str,
|
||||||
|
ca_path: &str,
|
||||||
|
config: &ConnectorConfig,
|
||||||
|
) -> Result<WsStream> {
|
||||||
|
// Initialize the crypto provider for rustls
|
||||||
|
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||||
|
|
||||||
|
let mut root = RootCertStore::empty();
|
||||||
|
let mut reader = BufReader::new(File::open(ca_path)?);
|
||||||
|
let mut der_certs = Vec::new();
|
||||||
|
while let Ok(Some(item)) = rustls_pemfile::read_one(&mut reader) {
|
||||||
|
if let Item::X509Certificate(der) = item {
|
||||||
|
der_certs.push(der);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
root.add_parsable_certificates(der_certs);
|
||||||
|
|
||||||
|
let mut cfg = ClientConfig::builder()
|
||||||
|
.with_root_certificates(root)
|
||||||
|
.with_no_client_auth();
|
||||||
|
|
||||||
|
let mut req = url.into_client_request()?;
|
||||||
|
|
||||||
|
// Apply WebSocket protocol configuration
|
||||||
|
if let Some(version) = &config.ws_version {
|
||||||
|
req.headers_mut().insert(
|
||||||
|
"Sec-WebSocket-Version",
|
||||||
|
version
|
||||||
|
.parse()
|
||||||
|
.map_err(|_| ConnectorError::protocol_error("Invalid WebSocket version"))?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(protocols) = &config.ws_protocols {
|
||||||
|
let protocols_str = protocols.join(", ");
|
||||||
|
req.headers_mut().insert(
|
||||||
|
"Sec-WebSocket-Protocol",
|
||||||
|
protocols_str
|
||||||
|
.parse()
|
||||||
|
.map_err(|_| ConnectorError::protocol_error("Invalid WebSocket protocols"))?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if !config.verify_hostname {
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct NoVerify;
|
||||||
|
impl ServerCertVerifier for NoVerify {
|
||||||
|
fn verify_server_cert(
|
||||||
|
&self,
|
||||||
|
_end_entity: &CertificateDer<'_>,
|
||||||
|
_intermediates: &[CertificateDer<'_>],
|
||||||
|
_server_name: &ServerName,
|
||||||
|
_ocsp_response: &[u8],
|
||||||
|
_now: UnixTime,
|
||||||
|
) -> std::result::Result<ServerCertVerified, rustls::Error> {
|
||||||
|
Ok(ServerCertVerified::assertion())
|
||||||
|
}
|
||||||
|
fn verify_tls12_signature(
|
||||||
|
&self,
|
||||||
|
_message: &[u8],
|
||||||
|
_cert: &CertificateDer<'_>,
|
||||||
|
_dss: &DigitallySignedStruct,
|
||||||
|
) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
|
||||||
|
Ok(HandshakeSignatureValid::assertion())
|
||||||
|
}
|
||||||
|
fn verify_tls13_signature(
|
||||||
|
&self,
|
||||||
|
_message: &[u8],
|
||||||
|
_cert: &CertificateDer<'_>,
|
||||||
|
_dss: &DigitallySignedStruct,
|
||||||
|
) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
|
||||||
|
Ok(HandshakeSignatureValid::assertion())
|
||||||
|
}
|
||||||
|
fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
|
||||||
|
vec![
|
||||||
|
SignatureScheme::ECDSA_NISTP256_SHA256,
|
||||||
|
SignatureScheme::ED25519,
|
||||||
|
SignatureScheme::RSA_PSS_SHA256,
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cfg.dangerous().set_certificate_verifier(Arc::new(NoVerify));
|
||||||
|
eprintln!(
|
||||||
|
"socktop_connector: hostname verification disabled (default). Set SOCKTOP_VERIFY_NAME=1 to enable strict SAN checking."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let cfg = Arc::new(cfg);
|
||||||
|
let (ws, _) = tokio_tungstenite::connect_async_tls_with_config(
|
||||||
|
req,
|
||||||
|
None,
|
||||||
|
config.verify_hostname,
|
||||||
|
Some(Connector::Rustls(cfg)),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok(ws)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "tls"))]
|
||||||
|
async fn connect_with_ca_and_config(
|
||||||
|
_url: &str,
|
||||||
|
_ca_path: &str,
|
||||||
|
_config: &ConnectorConfig,
|
||||||
|
) -> Result<WsStream> {
|
||||||
|
Err(ConnectorError::tls_error(
|
||||||
|
"TLS support not compiled in",
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Unsupported, "TLS not available"),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "tls")]
|
||||||
|
fn ensure_crypto_provider() {
|
||||||
|
let _ = ring::default_provider().install_default();
|
||||||
|
}
|
||||||
7
socktop_connector/src/networking/mod.rs
Normal file
7
socktop_connector/src/networking/mod.rs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
//! Networking module for native WebSocket connections.
|
||||||
|
|
||||||
|
pub mod connection;
|
||||||
|
pub mod requests;
|
||||||
|
|
||||||
|
pub use connection::*;
|
||||||
|
pub use requests::*;
|
||||||
84
socktop_connector/src/networking/requests.rs
Normal file
84
socktop_connector/src/networking/requests.rs
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
//! WebSocket request handlers for native (non-WASM) environments.
|
||||||
|
|
||||||
|
use crate::networking::WsStream;
|
||||||
|
use crate::utils::{gunzip_to_string, gunzip_to_vec, is_gzip};
|
||||||
|
use crate::{DiskInfo, Metrics, ProcessInfo, ProcessesPayload, pb};
|
||||||
|
|
||||||
|
use futures_util::{SinkExt, StreamExt};
|
||||||
|
use prost::Message as ProstMessage;
|
||||||
|
use tokio_tungstenite::tungstenite::Message;
|
||||||
|
|
||||||
|
/// Send a "get_metrics" request and await a single JSON reply
|
||||||
|
pub async fn request_metrics(ws: &mut WsStream) -> Option<Metrics> {
|
||||||
|
if ws.send(Message::Text("get_metrics".into())).await.is_err() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
match ws.next().await {
|
||||||
|
Some(Ok(Message::Binary(b))) => gunzip_to_string(&b)
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| serde_json::from_str::<Metrics>(&s).ok()),
|
||||||
|
Some(Ok(Message::Text(json))) => serde_json::from_str::<Metrics>(&json).ok(),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a "get_disks" request and await a JSON Vec<DiskInfo>
|
||||||
|
pub async fn request_disks(ws: &mut WsStream) -> Option<Vec<DiskInfo>> {
|
||||||
|
if ws.send(Message::Text("get_disks".into())).await.is_err() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
match ws.next().await {
|
||||||
|
Some(Ok(Message::Binary(b))) => gunzip_to_string(&b)
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| serde_json::from_str::<Vec<DiskInfo>>(&s).ok()),
|
||||||
|
Some(Ok(Message::Text(json))) => serde_json::from_str::<Vec<DiskInfo>>(&json).ok(),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a "get_processes" request and await a ProcessesPayload decoded from protobuf (binary, may be gzipped)
|
||||||
|
pub async fn request_processes(ws: &mut WsStream) -> Option<ProcessesPayload> {
|
||||||
|
if ws
|
||||||
|
.send(Message::Text("get_processes".into()))
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
match ws.next().await {
|
||||||
|
Some(Ok(Message::Binary(b))) => {
|
||||||
|
let gz = is_gzip(&b);
|
||||||
|
let data = if gz { gunzip_to_vec(&b).ok()? } else { b };
|
||||||
|
match pb::Processes::decode(data.as_slice()) {
|
||||||
|
Ok(pb) => {
|
||||||
|
let rows: Vec<ProcessInfo> = pb
|
||||||
|
.rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|p: pb::Process| ProcessInfo {
|
||||||
|
pid: p.pid,
|
||||||
|
name: p.name,
|
||||||
|
cpu_usage: p.cpu_usage,
|
||||||
|
mem_bytes: p.mem_bytes,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Some(ProcessesPayload {
|
||||||
|
process_count: pb.process_count as usize,
|
||||||
|
top_processes: rows,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if std::env::var("SOCKTOP_DEBUG").ok().as_deref() == Some("1") {
|
||||||
|
eprintln!("protobuf decode failed: {e}");
|
||||||
|
}
|
||||||
|
// Fallback: maybe it's JSON (bytes already decompressed if gz)
|
||||||
|
match String::from_utf8(data) {
|
||||||
|
Ok(s) => serde_json::from_str::<ProcessesPayload>(&s).ok(),
|
||||||
|
Err(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(Ok(Message::Text(json))) => serde_json::from_str::<ProcessesPayload>(&json).ok(),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
67
socktop_connector/src/utils.rs
Normal file
67
socktop_connector/src/utils.rs
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
//! Shared utilities for both networking and WASM implementations.
|
||||||
|
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
use flate2::read::GzDecoder;
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
|
use crate::error::{ConnectorError, Result};
|
||||||
|
|
||||||
|
// WebSocket state constants
|
||||||
|
#[cfg(feature = "wasm")]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub const WEBSOCKET_CONNECTING: u16 = 0;
|
||||||
|
#[cfg(feature = "wasm")]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub const WEBSOCKET_OPEN: u16 = 1;
|
||||||
|
#[cfg(feature = "wasm")]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub const WEBSOCKET_CLOSING: u16 = 2;
|
||||||
|
#[cfg(feature = "wasm")]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub const WEBSOCKET_CLOSED: u16 = 3;
|
||||||
|
|
||||||
|
// Gzip magic header constants
|
||||||
|
pub const GZIP_MAGIC_1: u8 = 0x1f;
|
||||||
|
pub const GZIP_MAGIC_2: u8 = 0x8b;
|
||||||
|
|
||||||
|
/// Unified gzip decompression to string for both networking and WASM
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
pub fn gunzip_to_string(bytes: &[u8]) -> Result<String> {
|
||||||
|
let mut decoder = GzDecoder::new(bytes);
|
||||||
|
let mut decompressed = String::new();
|
||||||
|
decoder
|
||||||
|
.read_to_string(&mut decompressed)
|
||||||
|
.map_err(|e| ConnectorError::protocol_error(format!("Gzip decompression failed: {e}")))?;
|
||||||
|
Ok(decompressed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unified gzip decompression to bytes for both networking and WASM
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
pub fn gunzip_to_vec(bytes: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
let mut decoder = GzDecoder::new(bytes);
|
||||||
|
let mut decompressed = Vec::new();
|
||||||
|
decoder
|
||||||
|
.read_to_end(&mut decompressed)
|
||||||
|
.map_err(|e| ConnectorError::protocol_error(format!("Gzip decompression failed: {e}")))?;
|
||||||
|
Ok(decompressed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unified gzip detection for both networking and WASM
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
pub fn is_gzip(bytes: &[u8]) -> bool {
|
||||||
|
bytes.len() >= 2 && bytes[0] == GZIP_MAGIC_1 && bytes[1] == GZIP_MAGIC_2
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unified debug logging for both networking and WASM modes
|
||||||
|
#[cfg(any(feature = "networking", feature = "wasm"))]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn log_debug(message: &str) {
|
||||||
|
#[cfg(feature = "networking")]
|
||||||
|
if std::env::var("SOCKTOP_DEBUG").ok().as_deref() == Some("1") {
|
||||||
|
eprintln!("{message}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(feature = "wasm", not(feature = "networking")))]
|
||||||
|
eprintln!("{message}");
|
||||||
|
}
|
||||||
66
socktop_connector/src/wasm/connection.rs
Normal file
66
socktop_connector/src/wasm/connection.rs
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
//! WebSocket connection handling for WASM environments.
|
||||||
|
|
||||||
|
use crate::config::ConnectorConfig;
|
||||||
|
use crate::error::{ConnectorError, Result};
|
||||||
|
use crate::utils::{WEBSOCKET_CLOSED, WEBSOCKET_CLOSING, WEBSOCKET_OPEN};
|
||||||
|
|
||||||
|
use wasm_bindgen::JsCast;
|
||||||
|
use wasm_bindgen::prelude::*;
|
||||||
|
use web_sys::WebSocket;
|
||||||
|
|
||||||
|
/// Connect to the agent using WASM WebSocket
|
||||||
|
pub async fn connect_to_agent(config: &ConnectorConfig) -> Result<WebSocket> {
|
||||||
|
let websocket = WebSocket::new(&config.url).map_err(|e| {
|
||||||
|
ConnectorError::protocol_error(format!("Failed to create WebSocket: {e:?}"))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Set binary type for proper message handling
|
||||||
|
websocket.set_binary_type(web_sys::BinaryType::Arraybuffer);
|
||||||
|
|
||||||
|
// Wait for connection to be ready with proper async delays
|
||||||
|
let start_time = js_sys::Date::now();
|
||||||
|
let timeout_ms = 10000.0; // 10 second timeout (increased from 5)
|
||||||
|
|
||||||
|
// Poll connection status until ready or timeout
|
||||||
|
loop {
|
||||||
|
let ready_state = websocket.ready_state();
|
||||||
|
|
||||||
|
if ready_state == WEBSOCKET_OPEN {
|
||||||
|
// OPEN - connection is ready
|
||||||
|
break;
|
||||||
|
} else if ready_state == WEBSOCKET_CLOSED {
|
||||||
|
// CLOSED
|
||||||
|
return Err(ConnectorError::protocol_error(
|
||||||
|
"WebSocket connection closed",
|
||||||
|
));
|
||||||
|
} else if ready_state == WEBSOCKET_CLOSING {
|
||||||
|
// CLOSING
|
||||||
|
return Err(ConnectorError::protocol_error("WebSocket is closing"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check timeout
|
||||||
|
let now = js_sys::Date::now();
|
||||||
|
if now - start_time > timeout_ms {
|
||||||
|
return Err(ConnectorError::protocol_error(
|
||||||
|
"WebSocket connection timeout",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Proper async delay using setTimeout Promise
|
||||||
|
let promise = js_sys::Promise::new(&mut |resolve, _| {
|
||||||
|
let closure = Closure::once(move || resolve.call0(&JsValue::UNDEFINED));
|
||||||
|
web_sys::window()
|
||||||
|
.unwrap()
|
||||||
|
.set_timeout_with_callback_and_timeout_and_arguments_0(
|
||||||
|
closure.as_ref().unchecked_ref(),
|
||||||
|
100, // 100ms delay between polls
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
closure.forget();
|
||||||
|
});
|
||||||
|
|
||||||
|
let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(websocket)
|
||||||
|
}
|
||||||
7
socktop_connector/src/wasm/mod.rs
Normal file
7
socktop_connector/src/wasm/mod.rs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
//! WASM module for browser WebSocket connections.
|
||||||
|
|
||||||
|
pub mod connection;
|
||||||
|
pub mod requests;
|
||||||
|
|
||||||
|
pub use connection::*;
|
||||||
|
pub use requests::*;
|
||||||
398
socktop_connector/src/wasm/requests.rs
Normal file
398
socktop_connector/src/wasm/requests.rs
Normal file
@ -0,0 +1,398 @@
|
|||||||
|
//! WebSocket request handlers for WASM environments.
|
||||||
|
|
||||||
|
use crate::error::{ConnectorError, Result};
|
||||||
|
use crate::pb::Processes;
|
||||||
|
use crate::utils::{gunzip_to_string, gunzip_to_vec, is_gzip, log_debug};
|
||||||
|
use crate::{AgentRequest, AgentResponse, DiskInfo, Metrics, ProcessInfo, ProcessesPayload};
|
||||||
|
|
||||||
|
use prost::Message as ProstMessage;
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use wasm_bindgen::JsCast;
|
||||||
|
use wasm_bindgen::prelude::*;
|
||||||
|
use web_sys::WebSocket;
|
||||||
|
|
||||||
|
/// Send a request and wait for response with binary data handling
|
||||||
|
pub async fn send_request_and_wait(
|
||||||
|
websocket: &WebSocket,
|
||||||
|
request: AgentRequest,
|
||||||
|
) -> Result<AgentResponse> {
|
||||||
|
// Use the legacy string format that the agent expects
|
||||||
|
let request_string = request.to_legacy_string();
|
||||||
|
|
||||||
|
// Send request
|
||||||
|
websocket
|
||||||
|
.send_with_str(&request_string)
|
||||||
|
.map_err(|e| ConnectorError::protocol_error(format!("Failed to send message: {e:?}")))?;
|
||||||
|
|
||||||
|
// Wait for response using JavaScript Promise
|
||||||
|
let (response, binary_data) = wait_for_response_with_binary(websocket).await?;
|
||||||
|
|
||||||
|
// Parse the response based on the request type
|
||||||
|
match request {
|
||||||
|
AgentRequest::Metrics => {
|
||||||
|
// Check if this is binary data (protobuf from agent)
|
||||||
|
if response.starts_with("BINARY_DATA:") {
|
||||||
|
// Extract the byte count
|
||||||
|
let byte_count: usize = response
|
||||||
|
.strip_prefix("BINARY_DATA:")
|
||||||
|
.unwrap_or("0")
|
||||||
|
.parse()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
// For now, return a placeholder metrics response indicating binary data received
|
||||||
|
// TODO: Implement proper protobuf decoding for binary data
|
||||||
|
let placeholder_metrics = Metrics {
|
||||||
|
cpu_total: 0.0,
|
||||||
|
cpu_per_core: vec![0.0],
|
||||||
|
mem_total: 0,
|
||||||
|
mem_used: 0,
|
||||||
|
swap_total: 0,
|
||||||
|
swap_used: 0,
|
||||||
|
hostname: format!("Binary protobuf data ({byte_count} bytes)"),
|
||||||
|
cpu_temp_c: None,
|
||||||
|
disks: vec![],
|
||||||
|
networks: vec![],
|
||||||
|
top_processes: vec![],
|
||||||
|
gpus: None,
|
||||||
|
process_count: None,
|
||||||
|
};
|
||||||
|
Ok(AgentResponse::Metrics(placeholder_metrics))
|
||||||
|
} else {
|
||||||
|
// Try to parse as JSON (fallback)
|
||||||
|
let metrics: Metrics = serde_json::from_str(&response).map_err(|e| {
|
||||||
|
ConnectorError::serialization_error(format!("Failed to parse metrics: {e}"))
|
||||||
|
})?;
|
||||||
|
Ok(AgentResponse::Metrics(metrics))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
AgentRequest::Disks => {
|
||||||
|
let disks: Vec<DiskInfo> = serde_json::from_str(&response).map_err(|e| {
|
||||||
|
ConnectorError::serialization_error(format!("Failed to parse disks: {e}"))
|
||||||
|
})?;
|
||||||
|
Ok(AgentResponse::Disks(disks))
|
||||||
|
}
|
||||||
|
AgentRequest::Processes => {
|
||||||
|
log_debug(&format!(
|
||||||
|
"🔍 Processing process request - response: {}",
|
||||||
|
if response.len() > 100 {
|
||||||
|
format!("{}...", &response[..100])
|
||||||
|
} else {
|
||||||
|
response.clone()
|
||||||
|
}
|
||||||
|
));
|
||||||
|
log_debug(&format!(
|
||||||
|
"🔍 Binary data available: {}",
|
||||||
|
binary_data.is_some()
|
||||||
|
));
|
||||||
|
if let Some(ref data) = binary_data {
|
||||||
|
log_debug(&format!("🔍 Binary data size: {} bytes", data.len()));
|
||||||
|
// Check if it's gzipped data and decompress it first
|
||||||
|
if is_gzip(data) {
|
||||||
|
log_debug("🔍 Process data is gzipped, decompressing...");
|
||||||
|
match gunzip_to_vec(data) {
|
||||||
|
Ok(decompressed_bytes) => {
|
||||||
|
log_debug(&format!(
|
||||||
|
"🔍 Successfully decompressed {} bytes, now decoding protobuf...",
|
||||||
|
decompressed_bytes.len()
|
||||||
|
));
|
||||||
|
// Now decode the decompressed bytes as protobuf
|
||||||
|
match <Processes as ProstMessage>::decode(decompressed_bytes.as_slice())
|
||||||
|
{
|
||||||
|
Ok(protobuf_processes) => {
|
||||||
|
log_debug(&format!(
|
||||||
|
"✅ Successfully decoded {} processes from gzipped protobuf",
|
||||||
|
protobuf_processes.rows.len()
|
||||||
|
));
|
||||||
|
|
||||||
|
// Convert protobuf processes to ProcessInfo structs
|
||||||
|
let processes: Vec<ProcessInfo> = protobuf_processes
|
||||||
|
.rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|p| ProcessInfo {
|
||||||
|
pid: p.pid,
|
||||||
|
name: p.name,
|
||||||
|
cpu_usage: p.cpu_usage,
|
||||||
|
mem_bytes: p.mem_bytes,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let processes_payload = ProcessesPayload {
|
||||||
|
top_processes: processes,
|
||||||
|
process_count: protobuf_processes.process_count as usize,
|
||||||
|
};
|
||||||
|
return Ok(AgentResponse::Processes(processes_payload));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log_debug(&format!(
|
||||||
|
"❌ Failed to decode decompressed protobuf: {e}"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log_debug(&format!(
|
||||||
|
"❌ Failed to decompress gzipped process data: {e}"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this is binary data (protobuf from agent)
|
||||||
|
if response.starts_with("BINARY_DATA:") {
|
||||||
|
// Extract the binary data size and decode protobuf
|
||||||
|
let byte_count_str = response.strip_prefix("BINARY_DATA:").unwrap_or("0");
|
||||||
|
let _byte_count: usize = byte_count_str.parse().unwrap_or(0);
|
||||||
|
|
||||||
|
// Check if we have the actual binary data
|
||||||
|
if let Some(binary_bytes) = binary_data {
|
||||||
|
log_debug(&format!(
|
||||||
|
"🔧 Decoding {} bytes of protobuf process data",
|
||||||
|
binary_bytes.len()
|
||||||
|
));
|
||||||
|
|
||||||
|
// Try to decode the protobuf data using the prost Message trait
|
||||||
|
match <Processes as ProstMessage>::decode(&binary_bytes[..]) {
|
||||||
|
Ok(protobuf_processes) => {
|
||||||
|
log_debug(&format!(
|
||||||
|
"✅ Successfully decoded {} processes from protobuf",
|
||||||
|
protobuf_processes.rows.len()
|
||||||
|
));
|
||||||
|
|
||||||
|
// Convert protobuf processes to ProcessInfo structs
|
||||||
|
let processes: Vec<ProcessInfo> = protobuf_processes
|
||||||
|
.rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|p| ProcessInfo {
|
||||||
|
pid: p.pid,
|
||||||
|
name: p.name,
|
||||||
|
cpu_usage: p.cpu_usage,
|
||||||
|
mem_bytes: p.mem_bytes,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let processes_payload = ProcessesPayload {
|
||||||
|
top_processes: processes,
|
||||||
|
process_count: protobuf_processes.process_count as usize,
|
||||||
|
};
|
||||||
|
Ok(AgentResponse::Processes(processes_payload))
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log_debug(&format!("❌ Failed to decode protobuf: {e}"));
|
||||||
|
// Fallback to empty processes
|
||||||
|
let processes = ProcessesPayload {
|
||||||
|
top_processes: vec![],
|
||||||
|
process_count: 0,
|
||||||
|
};
|
||||||
|
Ok(AgentResponse::Processes(processes))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log_debug(
|
||||||
|
"❌ Binary data indicator received but no actual binary data preserved",
|
||||||
|
);
|
||||||
|
let processes = ProcessesPayload {
|
||||||
|
top_processes: vec![],
|
||||||
|
process_count: 0,
|
||||||
|
};
|
||||||
|
Ok(AgentResponse::Processes(processes))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Try to parse as JSON (fallback)
|
||||||
|
let processes: ProcessesPayload = serde_json::from_str(&response).map_err(|e| {
|
||||||
|
ConnectorError::serialization_error(format!("Failed to parse processes: {e}"))
|
||||||
|
})?;
|
||||||
|
Ok(AgentResponse::Processes(processes))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_response_with_binary(websocket: &WebSocket) -> Result<(String, Option<Vec<u8>>)> {
|
||||||
|
let start_time = js_sys::Date::now();
|
||||||
|
let timeout_ms = 10000.0; // 10 second timeout
|
||||||
|
|
||||||
|
// Store the response in a shared location
|
||||||
|
let response_cell = Rc::new(RefCell::new(None::<String>));
|
||||||
|
let binary_data_cell = Rc::new(RefCell::new(None::<Vec<u8>>));
|
||||||
|
let error_cell = Rc::new(RefCell::new(None::<String>));
|
||||||
|
|
||||||
|
// Use a unique request ID to avoid message collision
|
||||||
|
let _request_id = js_sys::Math::random();
|
||||||
|
let response_received = Rc::new(RefCell::new(false));
|
||||||
|
|
||||||
|
// Set up the message handler that only processes if we haven't gotten a response yet
|
||||||
|
{
|
||||||
|
let response_cell = response_cell.clone();
|
||||||
|
let binary_data_cell = binary_data_cell.clone();
|
||||||
|
let response_received = response_received.clone();
|
||||||
|
let onmessage_callback = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| {
|
||||||
|
// Only process if we haven't already received a response for this request
|
||||||
|
if !*response_received.borrow() {
|
||||||
|
// Handle text messages (JSON responses for metrics/disks)
|
||||||
|
if let Ok(data) = e.data().dyn_into::<js_sys::JsString>() {
|
||||||
|
let message = data.as_string().unwrap_or_default();
|
||||||
|
if !message.is_empty() {
|
||||||
|
// Debug: Log what we received (truncated)
|
||||||
|
let preview = if message.len() > 100 {
|
||||||
|
format!("{}...", &message[..100])
|
||||||
|
} else {
|
||||||
|
message.clone()
|
||||||
|
};
|
||||||
|
log_debug(&format!("🔍 Received text: {preview}"));
|
||||||
|
|
||||||
|
*response_cell.borrow_mut() = Some(message);
|
||||||
|
*response_received.borrow_mut() = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Handle binary messages (could be JSON as text bytes or actual protobuf)
|
||||||
|
else if let Ok(array_buffer) = e.data().dyn_into::<js_sys::ArrayBuffer>() {
|
||||||
|
let uint8_array = js_sys::Uint8Array::new(&array_buffer);
|
||||||
|
let length = uint8_array.length() as usize;
|
||||||
|
let mut bytes = vec![0u8; length];
|
||||||
|
uint8_array.copy_to(&mut bytes);
|
||||||
|
|
||||||
|
log_debug(&format!("🔍 Received binary data: {length} bytes"));
|
||||||
|
|
||||||
|
// Debug: Log the first few bytes to see what we're dealing with
|
||||||
|
let first_bytes = if bytes.len() >= 4 {
|
||||||
|
format!(
|
||||||
|
"0x{:02x} 0x{:02x} 0x{:02x} 0x{:02x}",
|
||||||
|
bytes[0], bytes[1], bytes[2], bytes[3]
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
format!("Only {} bytes available", bytes.len())
|
||||||
|
};
|
||||||
|
log_debug(&format!("🔍 First bytes: {first_bytes}"));
|
||||||
|
|
||||||
|
// Try to decode as UTF-8 text first (in case it's JSON sent as binary)
|
||||||
|
match String::from_utf8(bytes.clone()) {
|
||||||
|
Ok(text) => {
|
||||||
|
// If it decodes to valid UTF-8, check if it looks like JSON
|
||||||
|
let trimmed = text.trim();
|
||||||
|
if (trimmed.starts_with('{') && trimmed.ends_with('}'))
|
||||||
|
|| (trimmed.starts_with('[') && trimmed.ends_with(']'))
|
||||||
|
{
|
||||||
|
log_debug(&format!(
|
||||||
|
"🔍 Binary data is actually JSON text: {}",
|
||||||
|
if text.len() > 100 {
|
||||||
|
format!("{}...", &text[..100])
|
||||||
|
} else {
|
||||||
|
text.clone()
|
||||||
|
}
|
||||||
|
));
|
||||||
|
*response_cell.borrow_mut() = Some(text);
|
||||||
|
*response_received.borrow_mut() = true;
|
||||||
|
} else {
|
||||||
|
log_debug(&format!(
|
||||||
|
"🔍 Binary data is UTF-8 text but not JSON: {}",
|
||||||
|
if text.len() > 100 {
|
||||||
|
format!("{}...", &text[..100])
|
||||||
|
} else {
|
||||||
|
text.clone()
|
||||||
|
}
|
||||||
|
));
|
||||||
|
*response_cell.borrow_mut() = Some(text);
|
||||||
|
*response_received.borrow_mut() = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// If it's not valid UTF-8, check if it's gzipped data
|
||||||
|
if is_gzip(&bytes) {
|
||||||
|
log_debug(&format!(
|
||||||
|
"🔍 Binary data appears to be gzipped ({length} bytes)"
|
||||||
|
));
|
||||||
|
// Try to decompress using unified gzip decompression
|
||||||
|
match gunzip_to_string(&bytes) {
|
||||||
|
Ok(decompressed_text) => {
|
||||||
|
log_debug(&format!(
|
||||||
|
"🔍 Gzipped data decompressed to text: {}",
|
||||||
|
if decompressed_text.len() > 100 {
|
||||||
|
format!("{}...", &decompressed_text[..100])
|
||||||
|
} else {
|
||||||
|
decompressed_text.clone()
|
||||||
|
}
|
||||||
|
));
|
||||||
|
*response_cell.borrow_mut() = Some(decompressed_text);
|
||||||
|
*response_received.borrow_mut() = true;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log_debug(&format!("🔍 Failed to decompress gzip: {e}"));
|
||||||
|
// Fallback: treat as actual binary protobuf data
|
||||||
|
*binary_data_cell.borrow_mut() = Some(bytes.clone());
|
||||||
|
*response_cell.borrow_mut() =
|
||||||
|
Some(format!("BINARY_DATA:{length}"));
|
||||||
|
*response_received.borrow_mut() = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If it's not valid UTF-8 and not gzipped, it's likely actual binary protobuf data
|
||||||
|
log_debug(&format!(
|
||||||
|
"🔍 Binary data is actual protobuf ({length} bytes)"
|
||||||
|
));
|
||||||
|
*binary_data_cell.borrow_mut() = Some(bytes);
|
||||||
|
*response_cell.borrow_mut() = Some(format!("BINARY_DATA:{length}"));
|
||||||
|
*response_received.borrow_mut() = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Log what type of data we got
|
||||||
|
log_debug(&format!("🔍 Received unknown data type: {:?}", e.data()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}) as Box<dyn FnMut(_)>);
|
||||||
|
websocket.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
|
||||||
|
onmessage_callback.forget();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up the error handler
|
||||||
|
{
|
||||||
|
let error_cell = error_cell.clone();
|
||||||
|
let response_received = response_received.clone();
|
||||||
|
let onerror_callback = Closure::wrap(Box::new(move |_e: web_sys::ErrorEvent| {
|
||||||
|
if !*response_received.borrow() {
|
||||||
|
*error_cell.borrow_mut() = Some("WebSocket error occurred".to_string());
|
||||||
|
*response_received.borrow_mut() = true;
|
||||||
|
}
|
||||||
|
}) as Box<dyn FnMut(_)>);
|
||||||
|
websocket.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
|
||||||
|
onerror_callback.forget();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Poll for response with proper async delays
|
||||||
|
loop {
|
||||||
|
// Check for response
|
||||||
|
if *response_received.borrow() {
|
||||||
|
if let Some(response) = response_cell.borrow().as_ref() {
|
||||||
|
let binary_data = binary_data_cell.borrow().clone();
|
||||||
|
return Ok((response.clone(), binary_data));
|
||||||
|
}
|
||||||
|
if let Some(error) = error_cell.borrow().as_ref() {
|
||||||
|
return Err(ConnectorError::protocol_error(error));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check timeout
|
||||||
|
let now = js_sys::Date::now();
|
||||||
|
if now - start_time > timeout_ms {
|
||||||
|
*response_received.borrow_mut() = true; // Mark as done to prevent future processing
|
||||||
|
return Err(ConnectorError::protocol_error("WebSocket response timeout"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait 50ms before checking again
|
||||||
|
let promise = js_sys::Promise::new(&mut |resolve, _| {
|
||||||
|
let closure = Closure::once(move || resolve.call0(&JsValue::UNDEFINED));
|
||||||
|
web_sys::window()
|
||||||
|
.unwrap()
|
||||||
|
.set_timeout_with_callback_and_timeout_and_arguments_0(
|
||||||
|
closure.as_ref().unchecked_ref(),
|
||||||
|
50,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
closure.forget();
|
||||||
|
});
|
||||||
|
let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user