diff --git a/Cargo.lock b/Cargo.lock index b4a47f4..934e637 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -591,6 +591,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.2" @@ -1322,6 +1328,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "nom" version = "7.1.3" @@ -1506,6 +1518,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project" version = "1.1.10" @@ -1608,6 +1630,122 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck", + "itertools 0.13.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + +[[package]] +name = "protoc-bin-vendored" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa" +dependencies = [ + "protoc-bin-vendored-linux-aarch_64", + "protoc-bin-vendored-linux-ppcle_64", + "protoc-bin-vendored-linux-s390_64", + "protoc-bin-vendored-linux-x86_32", + "protoc-bin-vendored-linux-x86_64", + "protoc-bin-vendored-macos-aarch_64", + "protoc-bin-vendored-macos-x86_64", + "protoc-bin-vendored-win32", +] + +[[package]] +name = "protoc-bin-vendored-linux-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c" + +[[package]] +name = "protoc-bin-vendored-linux-ppcle_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c" + +[[package]] +name = "protoc-bin-vendored-linux-s390_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0" + +[[package]] +name = "protoc-bin-vendored-linux-x86_32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5" + +[[package]] +name = "protoc-bin-vendored-linux-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78" + +[[package]] +name = "protoc-bin-vendored-macos-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092" + +[[package]] +name = "protoc-bin-vendored-macos-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756" + +[[package]] +name = "protoc-bin-vendored-win32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3" + [[package]] name = "quote" version = "1.0.40" @@ -2042,11 +2180,15 @@ version = "0.1.11" dependencies = [ "anyhow", "assert_cmd", + "bytes", "chrono", "crossterm 0.27.0", "flate2", "futures", "futures-util", + "prost", + "prost-build", + "protoc-bin-vendored", "ratatui", "rustls 0.23.31", "rustls-pemfile", @@ -2065,6 +2207,7 @@ dependencies = [ "assert_cmd", "axum", "axum-server", + "bytes", "flate2", "futures", "futures-util", @@ -2073,6 +2216,10 @@ dependencies = [ "nvml-wrapper", "once_cell", "openssl", + "prost", + "prost-build", + "prost-types", + "protoc-bin-vendored", "rustls 0.23.31", "rustls-pemfile", "serde", @@ -2080,6 +2227,7 @@ dependencies = [ "sysinfo", "tempfile", "tokio", + "tonic-build", "tracing", "tracing-subscriber", "tungstenite 0.27.0", @@ -2330,6 +2478,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index eb286dd..fa97049 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,3 +34,16 @@ chrono = { version = "0.4", features = ["serde"] } # web server (remote-agent) axum = { version = "0.7", features = ["ws"] } + +# protobuf +prost = "0.13" +prost-types = "0.13" +bytes = "1" + +[profile.release] +# Favor smaller, simpler binaries with good runtime perf +lto = "thin" +codegen-units = 1 +panic = "abort" +opt-level = 3 +strip = "symbols" \ No newline at end of file diff --git a/nohup.out b/nohup.out new file mode 100644 index 0000000..86587c9 --- /dev/null +++ b/nohup.out @@ -0,0 +1,8 @@ +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws +Error: Address already in use (os error 98) +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8433/ws +Error: Address already in use (os error 98) +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8443/ws +socktop_agent: TLS enabled. Listening on wss://0.0.0.0:8443/ws diff --git a/proto/processes.proto b/proto/processes.proto new file mode 100644 index 0000000..631e162 --- /dev/null +++ b/proto/processes.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +package socktop; + +// All running processes. Sorting is done client-side. +message Processes { + uint64 process_count = 1; // total processes in the system + repeated Process rows = 2; // all processes +} + +message Process { + uint32 pid = 1; + string name = 2; + float cpu_usage = 3; // 0..100 + uint64 mem_bytes = 4; // RSS bytes +} diff --git a/socktop/Cargo.toml b/socktop/Cargo.toml index cb9c5ce..526001a 100644 --- a/socktop/Cargo.toml +++ b/socktop/Cargo.toml @@ -21,6 +21,12 @@ anyhow = { workspace = true } flate2 = { version = "1", default-features = false, features = ["rust_backend"] } rustls = "0.23" rustls-pemfile = "2.1" +prost = { workspace = true } +bytes = { workspace = true } [dev-dependencies] -assert_cmd = "2.0" \ No newline at end of file +assert_cmd = "2.0" + +[build-dependencies] +prost-build = "0.13" +protoc-bin-vendored = "3" \ No newline at end of file diff --git a/socktop/build.rs b/socktop/build.rs new file mode 100644 index 0000000..0cf4635 --- /dev/null +++ b/socktop/build.rs @@ -0,0 +1,8 @@ +fn main() { + let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc"); + std::env::set_var("PROTOC", protoc); + let mut cfg = prost_build::Config::new(); + cfg.out_dir(std::env::var("OUT_DIR").unwrap()); + cfg.compile_protos(&["../proto/processes.proto"], &["../proto"]) + .expect("compile protos"); +} diff --git a/socktop/src/ws.rs b/socktop/src/ws.rs index 70a2db7..7f85e9a 100644 --- a/socktop/src/ws.rs +++ b/socktop/src/ws.rs @@ -2,19 +2,24 @@ use flate2::bufread::GzDecoder; use futures_util::{SinkExt, StreamExt}; +use prost::Message as _; use rustls::{ClientConfig, RootCertStore}; use rustls_pemfile::Item; -use std::io::{Read}; +use std::io::Read; use std::{fs::File, io::BufReader, sync::Arc}; use tokio::net::TcpStream; -use tokio::time::{interval, Duration}; use tokio_tungstenite::{ connect_async, connect_async_tls_with_config, tungstenite::client::IntoClientRequest, tungstenite::Message, Connector, MaybeTlsStream, WebSocketStream, }; use url::Url; -use crate::types::{DiskInfo, Metrics, ProcessesPayload}; +use crate::types::{DiskInfo, Metrics, ProcessInfo, ProcessesPayload}; + +mod pb { + // generated by build.rs + include!(concat!(env!("OUT_DIR"), "/socktop.rs")); +} pub type WsStream = WebSocketStream>; @@ -56,7 +61,6 @@ async fn connect_with_ca(url: &str, ca_path: &str) -> Result Option { if ws.send(Message::Text("get_metrics".into())).await.is_err() { @@ -79,6 +83,16 @@ fn gunzip_to_string(bytes: &[u8]) -> Option { Some(out) } +fn gunzip_to_vec(bytes: &[u8]) -> Option> { + let mut dec = GzDecoder::new(bytes); + let mut out = Vec::new(); + dec.read_to_end(&mut out).ok()?; + Some(out) +} + +fn is_gzip(bytes: &[u8]) -> bool { + bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b +} // Suppress dead_code until these are wired into the app #[allow(dead_code)] pub enum Payload { @@ -87,23 +101,6 @@ pub enum Payload { Processes(ProcessesPayload), } -#[allow(dead_code)] -fn parse_any_payload(json: &str) -> Result { - if let Ok(m) = serde_json::from_str::(json) { - return Ok(Payload::Metrics(m)); - } - if let Ok(d) = serde_json::from_str::>(json) { - return Ok(Payload::Disks(d)); - } - if let Ok(p) = serde_json::from_str::(json) { - return Ok(Payload::Processes(p)); - } - Err(serde_json::Error::io(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "unknown payload", - ))) -} - // Send a "get_disks" request and await a JSON Vec pub async fn request_disks(ws: &mut WsStream) -> Option> { if ws.send(Message::Text("get_disks".into())).await.is_err() { @@ -118,7 +115,7 @@ pub async fn request_disks(ws: &mut WsStream) -> Option> { } } -// Send a "get_processes" request and await a JSON ProcessesPayload +// Send a "get_processes" request and await a ProcessesPayload decoded from protobuf (binary, may be gzipped) pub async fn request_processes(ws: &mut WsStream) -> Option { if ws .send(Message::Text("get_processes".into())) @@ -129,68 +126,38 @@ pub async fn request_processes(ws: &mut WsStream) -> Option { } match ws.next().await { Some(Ok(Message::Binary(b))) => { - gunzip_to_string(&b).and_then(|s| serde_json::from_str::(&s).ok()) + let gz = is_gzip(&b); + let data = if gz { gunzip_to_vec(&b)? } else { b }; + match pb::Processes::decode(data.as_slice()) { + Ok(pb) => { + let rows: Vec = pb + .rows + .into_iter() + .map(|p: pb::Process| ProcessInfo { + pid: p.pid, + name: p.name, + cpu_usage: p.cpu_usage, + mem_bytes: p.mem_bytes, + }) + .collect(); + Some(ProcessesPayload { + process_count: pb.process_count as usize, + top_processes: rows, + }) + } + Err(e) => { + if std::env::var("SOCKTOP_DEBUG").ok().as_deref() == Some("1") { + eprintln!("protobuf decode failed: {e}"); + } + // Fallback: maybe it's JSON (bytes already decompressed if gz) + match String::from_utf8(data) { + Ok(s) => serde_json::from_str::(&s).ok(), + Err(_) => None, + } + } + } } Some(Ok(Message::Text(json))) => serde_json::from_str::(&json).ok(), _ => None, } } - -#[allow(dead_code)] -pub async fn start_ws_polling(mut ws: WsStream) { - let mut t_fast = interval(Duration::from_millis(500)); - let mut t_procs = interval(Duration::from_secs(2)); - let mut t_disks = interval(Duration::from_secs(5)); - - let _ = ws.send(Message::Text("get_metrics".into())).await; - let _ = ws.send(Message::Text("get_processes".into())).await; - let _ = ws.send(Message::Text("get_disks".into())).await; - - loop { - tokio::select! { - _ = t_fast.tick() => { - let _ = ws.send(Message::Text("get_metrics".into())).await; - } - _ = t_procs.tick() => { - let _ = ws.send(Message::Text("get_processes".into())).await; - } - _ = t_disks.tick() => { - let _ = ws.send(Message::Text("get_disks".into())).await; - } - maybe = ws.next() => { - let Some(result) = maybe else { break; }; - let Ok(msg) = result else { break; }; - match msg { - Message::Binary(b) => { - if let Some(json) = gunzip_to_string(&b) { - if let Ok(payload) = parse_any_payload(&json) { - match payload { - Payload::Metrics(_m) => { - // update your app state with fast metrics - } - Payload::Disks(_d) => { - // update your app state with disks - } - Payload::Processes(_p) => { - // update your app state with processes - } - } - } - } - } - Message::Text(s) => { - if let Ok(payload) = parse_any_payload(&s) { - match payload { - Payload::Metrics(_m) => {} - Payload::Disks(_d) => {} - Payload::Processes(_p) => {} - } - } - } - Message::Close(_) => break, - _ => {} - } - } - } - } -} diff --git a/socktop/tests/ws_probe.rs b/socktop/tests/ws_probe.rs index 523466a..1c7a04f 100644 --- a/socktop/tests/ws_probe.rs +++ b/socktop/tests/ws_probe.rs @@ -17,9 +17,7 @@ async fn probe_ws_endpoints() { // Optional pinned CA for WSS/self-signed setups let tls_ca = std::env::var("SOCKTOP_TLS_CA").ok(); - let mut ws = connect(&url, tls_ca.as_deref()) - .await - .expect("connect ws"); + let mut ws = connect(&url, tls_ca.as_deref()).await.expect("connect ws"); // Should get fast metrics quickly let m = request_metrics(&mut ws).await; diff --git a/socktop_agent/Cargo.toml b/socktop_agent/Cargo.toml index f9bca56..69b1a24 100644 --- a/socktop_agent/Cargo.toml +++ b/socktop_agent/Cargo.toml @@ -27,6 +27,14 @@ rustls-pemfile = "2.1" openssl = { version = "0.10", features = ["vendored"] } # for cross‑platform self‑signed generation anyhow = "1" hostname = "0.3" +bytes = { workspace = true } +prost = { workspace = true } + +[build-dependencies] +prost-build = "0.13" +prost-types = { workspace = true } +tonic-build = { version = "0.12", default-features = false, optional = true } +protoc-bin-vendored = "3" [dev-dependencies] assert_cmd = "2.0" tempfile = "3.10" \ No newline at end of file diff --git a/socktop_agent/build.rs b/socktop_agent/build.rs new file mode 100644 index 0000000..366d490 --- /dev/null +++ b/socktop_agent/build.rs @@ -0,0 +1,11 @@ +fn main() { + // Ensure protoc exists (vendored for reproducible builds) + let protoc = protoc_bin_vendored::protoc_bin_path().expect("protoc"); + std::env::set_var("PROTOC", protoc); + + // Compile protobuf definitions for processes + let mut cfg = prost_build::Config::new(); + cfg.out_dir(std::env::var("OUT_DIR").unwrap()); + cfg.compile_protos(&["../proto/processes.proto"], &["../proto"]) + .expect("compile protos"); +} diff --git a/socktop_agent/src/main.rs b/socktop_agent/src/main.rs index 7d4a22d..0835baf 100644 --- a/socktop_agent/src/main.rs +++ b/socktop_agent/src/main.rs @@ -3,6 +3,7 @@ mod gpu; mod metrics; +mod proto; mod sampler; mod state; mod types; diff --git a/socktop_agent/src/metrics.rs b/socktop_agent/src/metrics.rs index a1fc744..8d5a023 100644 --- a/socktop_agent/src/metrics.rs +++ b/socktop_agent/src/metrics.rs @@ -236,9 +236,9 @@ fn read_proc_jiffies(pid: u32) -> Option { Some(utime.saturating_add(stime)) } -/// Collect top processes (Linux variant): compute CPU% via /proc jiffies delta. +/// Collect all processes (Linux): compute CPU% via /proc jiffies delta; sorting moved to client. #[cfg(target_os = "linux")] -pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload { +pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { // Fresh view to avoid lingering entries and select "no tasks" (no per-thread rows). let mut sys = System::new(); sys.refresh_processes_specifics( @@ -291,7 +291,7 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay .collect(); return ProcessesPayload { process_count: total_count, - top_processes: top_k_sorted(procs, k), + top_processes: procs, }; } @@ -317,13 +317,13 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay ProcessesPayload { process_count: total_count, - top_processes: top_k_sorted(procs, k), + top_processes: procs, } } -/// Collect top processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh. +/// Collect all processes (non-Linux): use sysinfo's internal CPU% by doing a double refresh. #[cfg(not(target_os = "linux"))] -pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPayload { +pub async fn collect_processes_all(state: &AppState) -> ProcessesPayload { use tokio::time::sleep; let mut sys = state.sys.lock().await; @@ -354,8 +354,6 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay mem_bytes: p.memory(), }) .collect(); - - procs = top_k_sorted(procs, k); ProcessesPayload { process_count: total_count, top_processes: procs, @@ -363,19 +361,4 @@ pub async fn collect_processes_top_k(state: &AppState, k: usize) -> ProcessesPay } // Small helper to select and sort top-k by cpu -fn top_k_sorted(mut v: Vec, k: usize) -> Vec { - if v.len() > k { - v.select_nth_unstable_by(k, |a, b| { - b.cpu_usage - .partial_cmp(&a.cpu_usage) - .unwrap_or(std::cmp::Ordering::Equal) - }); - v.truncate(k); - } - v.sort_by(|a, b| { - b.cpu_usage - .partial_cmp(&a.cpu_usage) - .unwrap_or(std::cmp::Ordering::Equal) - }); - v -} +// Client now handles sorting/pagination. diff --git a/socktop_agent/src/proto.rs b/socktop_agent/src/proto.rs index 3ff466f..13c6f6a 100644 --- a/socktop_agent/src/proto.rs +++ b/socktop_agent/src/proto.rs @@ -1,32 +1,5 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Metrics { - pub ts_unix_ms: i64, - pub host: String, - pub uptime_secs: u64, - pub cpu_overall: f32, - pub cpu_per_core: Vec, - pub load_avg: (f64, f64, f64), - pub mem_total_mb: u64, - pub mem_used_mb: u64, - pub swap_total_mb: u64, - pub swap_used_mb: u64, - pub net_aggregate: NetTotals, - pub top_processes: Vec, +// Generated protobuf modules live under OUT_DIR; include them here. +// This module will expose socktop::Processes and socktop::Process types. +pub mod pb { + include!(concat!(env!("OUT_DIR"), "/socktop.rs")); } - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NetTotals { - pub rx_bytes: u64, - pub tx_bytes: u64, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Proc { - pub pid: i32, - pub name: String, - pub cpu: f32, - pub mem_mb: u64, - pub status: String, -} \ No newline at end of file diff --git a/socktop_agent/src/ws.rs b/socktop_agent/src/ws.rs index 343489e..07cee80 100644 --- a/socktop_agent/src/ws.rs +++ b/socktop_agent/src/ws.rs @@ -10,7 +10,8 @@ use futures_util::StreamExt; use std::collections::HashMap; use std::io::Write; -use crate::metrics::{collect_disks, collect_fast_metrics, collect_processes_top_k}; +use crate::metrics::{collect_disks, collect_fast_metrics, collect_processes_all}; +use crate::proto::pb; use crate::state::AppState; pub async fn ws_handler( @@ -44,8 +45,39 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { let _ = send_json(&mut socket, &d).await; } Message::Text(ref text) if text == "get_processes" => { - let p = collect_processes_top_k(&state, 50).await; - let _ = send_json(&mut socket, &p).await; + let payload = collect_processes_all(&state).await; + // Map to protobuf message + let rows: Vec = payload + .top_processes + .into_iter() + .map(|p| pb::Process { + pid: p.pid, + name: p.name, + cpu_usage: p.cpu_usage, + mem_bytes: p.mem_bytes, + }) + .collect(); + let pb = pb::Processes { + process_count: payload.process_count as u64, + rows, + }; + let mut buf = Vec::with_capacity(8 * 1024); + if prost::Message::encode(&pb, &mut buf).is_err() { + let _ = socket.send(Message::Close(None)).await; + } else { + // compress if large + if buf.len() <= 768 { + let _ = socket.send(Message::Binary(buf)).await; + } else { + let mut enc = GzEncoder::new(Vec::new(), Compression::fast()); + if enc.write_all(&buf).is_ok() { + let bin = enc.finish().unwrap_or(buf); + let _ = socket.send(Message::Binary(bin)).await; + } else { + let _ = socket.send(Message::Binary(buf)).await; + } + } + } } Message::Close(_) => break, _ => {}