Reference: Usage as a lib #8

- Implement protocol versioning
- migrate to thisError
- general error handling improvements in socktop_connector lib
- improve documentation
- increment  version
This commit is contained in:
jasonwitty 2025-09-07 18:55:23 -07:00
parent ffc246b705
commit 06cd6d0c82
9 changed files with 462 additions and 67 deletions

4
Cargo.lock generated
View File

@ -2213,9 +2213,8 @@ dependencies = [
[[package]] [[package]]
name = "socktop_connector" name = "socktop_connector"
version = "0.1.0" version = "0.1.2"
dependencies = [ dependencies = [
"anyhow",
"flate2", "flate2",
"futures-util", "futures-util",
"prost", "prost",
@ -2225,6 +2224,7 @@ dependencies = [
"rustls-pemfile", "rustls-pemfile",
"serde", "serde",
"serde_json", "serde_json",
"thiserror 2.0.12",
"tokio", "tokio",
"tokio-tungstenite 0.24.0", "tokio-tungstenite 0.24.0",
"url", "url",

View File

@ -207,7 +207,7 @@ socktop -t /path/to/cert.pem wss://HOST:8443/ws
Intervals (client-driven): Intervals (client-driven):
- Fast metrics: ~500 ms - Fast metrics: ~500 ms
- Processes: ~2 s (top 50) - Processes: ~2 s
- Disks: ~5 s - Disks: ~5 s
The agent stays idle unless queried. When queried, it collects just whats needed. The agent stays idle unless queried. When queried, it collects just whats needed.

0
socktop/src/ws.rs Normal file
View File

View File

@ -28,6 +28,10 @@ Environment toggles:
- SOCKTOP_AGENT_PROCESSES_TTL_MS=1000 - SOCKTOP_AGENT_PROCESSES_TTL_MS=1000
- SOCKTOP_AGENT_DISKS_TTL_MS=1000 - SOCKTOP_AGENT_DISKS_TTL_MS=1000
*NOTE ON ENV vars*
Generally these have been added for debugging purposes. you do not need to configure them, default values are tuned and GPU will deisable itself after the first poll if not available.
Systemd unit example & full docs: Systemd unit example & full docs:
https://github.com/jasonwitty/socktop https://github.com/jasonwitty/socktop

View File

@ -1,6 +1,6 @@
[package] [package]
name = "socktop_connector" name = "socktop_connector"
version = "0.1.0" version = "0.1.2"
edition = "2024" edition = "2024"
license = "MIT" license = "MIT"
description = "WebSocket connector library for socktop agent communication" description = "WebSocket connector library for socktop agent communication"
@ -38,7 +38,7 @@ flate2 = "1.0"
prost = { workspace = true } prost = { workspace = true }
# Error handling # Error handling
anyhow = { workspace = true } thiserror = "2.0"
[build-dependencies] [build-dependencies]
prost-build = "0.13" prost-build = "0.13"

View File

@ -6,6 +6,8 @@ A WebSocket connector library for communicating with socktop agents.
`socktop_connector` provides a high-level, type-safe interface for connecting to socktop agents over WebSocket connections. It handles connection management, TLS certificate pinning, compression, and protocol buffer decoding automatically. `socktop_connector` provides a high-level, type-safe interface for connecting to socktop agents over WebSocket connections. It handles connection management, TLS certificate pinning, compression, and protocol buffer decoding automatically.
The library is designed for professional use with structured error handling that allows you to pattern match on specific error types, making it easy to implement robust error recovery and monitoring strategies.
## Features ## Features
- **WebSocket Communication**: Support for both `ws://` and `wss://` connections - **WebSocket Communication**: Support for both `ws://` and `wss://` connections
@ -14,7 +16,7 @@ A WebSocket connector library for communicating with socktop agents.
- **Type Safety**: Strongly typed requests and responses - **Type Safety**: Strongly typed requests and responses
- **Automatic Compression**: Handles gzip compression/decompression transparently - **Automatic Compression**: Handles gzip compression/decompression transparently
- **Protocol Buffer Support**: Decodes binary process data automatically - **Protocol Buffer Support**: Decodes binary process data automatically
- **Error Handling**: Comprehensive error handling with detailed error messages - **Error Handling**: Comprehensive error handling with structured error types for pattern matching
## Connection Types ## Connection Types
@ -80,6 +82,46 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
``` ```
### Error Handling with Pattern Matching
Take advantage of structured error types for robust error handling:
```rust
use socktop_connector::{connect_to_socktop_agent, ConnectorError, AgentRequest};
#[tokio::main]
async fn main() {
// Handle connection errors specifically
let mut connector = match connect_to_socktop_agent("ws://localhost:3000/ws").await {
Ok(conn) => conn,
Err(ConnectorError::WebSocketError(e)) => {
eprintln!("Failed to connect to WebSocket: {}", e);
return;
}
Err(ConnectorError::UrlError(e)) => {
eprintln!("Invalid URL provided: {}", e);
return;
}
Err(e) => {
eprintln!("Connection failed: {}", e);
return;
}
};
// Handle request errors specifically
match connector.request(AgentRequest::Metrics).await {
Ok(response) => println!("Success: {:?}", response),
Err(ConnectorError::JsonError(e)) => {
eprintln!("Failed to parse server response: {}", e);
}
Err(ConnectorError::WebSocketError(e)) => {
eprintln!("Communication error: {}", e);
}
Err(e) => eprintln!("Request failed: {}", e),
}
}
```
### TLS with Certificate Pinning ### TLS with Certificate Pinning
```rust ```rust
@ -127,12 +169,42 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
``` ```
### WebSocket Protocol Configuration
For version compatibility (if applies), you can configure WebSocket protocol version and sub-protocols:
```rust
use socktop_connector::{ConnectorConfig, SocktopConnector, connect_to_socktop_agent_with_config};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Method 1: Using the convenience function
let connector = connect_to_socktop_agent_with_config(
"ws://localhost:3000/ws",
Some(vec!["socktop".to_string(), "v1".to_string()]), // Sub-protocols
Some("13".to_string()), // WebSocket version (13 is standard)
).await?;
// Method 2: Using ConnectorConfig builder
let config = ConnectorConfig::new("ws://localhost:3000/ws")
.with_protocols(vec!["socktop".to_string()])
.with_version("13");
let mut connector = SocktopConnector::new(config);
connector.connect().await?;
Ok(())
}
```
**Note:** WebSocket version 13 is the current standard and is used by default. The sub-protocols feature is useful for protocol negotiation with servers that support multiple protocols.
## Continuous Updates ## Continuous Updates
The socktop agent provides real-time system metrics. Each request returns the current snapshot, but you can implement continuous monitoring by making requests in a loop: The socktop agent provides real-time system metrics. Each request returns the current snapshot, but you can implement continuous monitoring by making requests in a loop:
```rust ```rust
use socktop_connector::{connect_to_socktop_agent, AgentRequest, AgentResponse}; use socktop_connector::{connect_to_socktop_agent, AgentRequest, AgentResponse, ConnectorError};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
#[tokio::main] #[tokio::main]
@ -156,7 +228,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
Err(e) => { Err(e) => {
eprintln!("Error getting metrics: {}", e); eprintln!("Error getting metrics: {}", e);
break;
// You can pattern match on specific error types for different handling
match e {
socktop_connector::ConnectorError::WebSocketError(_) => {
eprintln!("Connection lost, attempting to reconnect...");
// Implement reconnection logic here
break;
}
socktop_connector::ConnectorError::JsonError(_) => {
eprintln!("Data parsing error, continuing...");
// Continue with next iteration for transient parsing errors
}
_ => {
eprintln!("Other error, stopping monitoring");
break;
}
}
} }
_ => unreachable!(), _ => unreachable!(),
} }
@ -185,9 +273,12 @@ fn format_bytes(bytes: u64) -> String {
The socktop agent implements intelligent caching to avoid overwhelming the system: The socktop agent implements intelligent caching to avoid overwhelming the system:
- **Metrics**: Cached for ~250ms by default (fast-changing data like CPU, memory) - **Metrics**: Cached for ~250ms by default (cheap / fast-changing data like CPU, memory)
- **Processes**: Cached for ~1500ms by default (moderately changing data) - **Processes**: Cached for ~1500ms by default (exppensive / moderately changing data)
- **Disks**: Cached for ~1000ms by default (slowly changing data) - **Disks**: Cached for ~1000ms by default (cheap / slowly changing data)
These values have been generally tuned in advance. You should not need to override them. The reason for this cache is for the use case that multiple clients are requesting data. In general a single client should never really hit a cached response since the polling rates are slower that the cache intervals. Cache intervals have been tuned based on how much work the agent has to do in the case of reloading fresh data.
This means: This means:
@ -252,6 +343,8 @@ The library provides flexible configuration through the `ConnectorConfig` builde
- `with_hostname_verification(bool)` - Control hostname verification for TLS connections - `with_hostname_verification(bool)` - Control hostname verification for TLS connections
- `true` (recommended): Verify the server hostname matches the certificate - `true` (recommended): Verify the server hostname matches the certificate
- `false`: Skip hostname verification (useful for localhost or IP-based connections) - `false`: Skip hostname verification (useful for localhost or IP-based connections)
- `with_protocols(Vec<String>)` - Set WebSocket sub-protocols for protocol negotiation
- `with_version(String)` - Set WebSocket protocol version (default is "13", the current standard)
**Note**: Hostname verification only applies to TLS connections (`wss://`). Non-TLS connections (`ws://`) don't use certificates, so hostname verification is not applicable. **Note**: Hostname verification only applies to TLS connections (`wss://`). Non-TLS connections (`ws://`) don't use certificates, so hostname verification is not applicable.
@ -272,7 +365,7 @@ tokio = { version = "1", features = ["rt", "time", "macros"] }
# Note: "net" feature not needed in WASM - WebSocket connections use browser APIs # Note: "net" feature not needed in WASM - WebSocket connections use browser APIs
``` ```
### Limitations ### WASM Limitations
- **No TLS support**: `wss://` connections are not available - **No TLS support**: `wss://` connections are not available
- **No certificate pinning**: TLS-related features are disabled - **No certificate pinning**: TLS-related features are disabled
- **Browser WebSocket API**: Uses browser's native WebSocket implementation - **Browser WebSocket API**: Uses browser's native WebSocket implementation
@ -300,9 +393,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
## Security Considerations ## Security Considerations
- **Production TLS**: Always enable hostname verification (`verify_hostname: true`) for production - **Production TLS**: You can hostname verification (`verify_hostname: true`) for production systems, This will add an additional level of production of verifying the hostname against the certificate. Generally this is to stop a man in the middle attack, but since it will be the client who is fooled and not the server, the risk and likelyhood of this use case is rather low. Which is why this is disabled by default.
- **Development/Testing**: You may disable hostname verification for localhost or IP addresses - **Certificate Pinning**: Use `with_tls_ca()` for self-signed certificates, the socktop agent will generate certificates on start. see main readme for more details.
- **Certificate Pinning**: Use `with_tls_ca()` for self-signed certificates
- **Non-TLS**: Use only for development or trusted networks - **Non-TLS**: Use only for development or trusted networks
## Environment Variables ## Environment Variables
@ -311,12 +403,77 @@ Currently no environment variables are used. All configuration is done through t
## Error Handling ## Error Handling
The library uses `anyhow::Error` for error handling, providing detailed error messages for common failure scenarios: The library uses structured error types via `thiserror` for comprehensive error handling. You can pattern match on specific error types:
- Connection failures ```rust
- TLS certificate validation errors use socktop_connector::{connect_to_socktop_agent, ConnectorError, AgentRequest};
- Protocol errors
- Parsing errors #[tokio::main]
async fn main() {
match connect_to_socktop_agent("invalid://url").await {
Ok(mut connector) => {
// Handle successful connection
match connector.request(AgentRequest::Metrics).await {
Ok(response) => println!("Got response: {:?}", response),
Err(ConnectorError::WebSocketError(e)) => {
eprintln!("WebSocket communication failed: {}", e);
}
Err(ConnectorError::JsonError(e)) => {
eprintln!("Failed to parse response: {}", e);
}
Err(e) => eprintln!("Other error: {}", e),
}
}
Err(ConnectorError::UrlError(e)) => {
eprintln!("Invalid URL: {}", e);
}
Err(ConnectorError::WebSocketError(e)) => {
eprintln!("Failed to connect: {}", e);
}
Err(ConnectorError::TlsError(msg)) => {
eprintln!("TLS error: {}", msg);
}
Err(e) => {
eprintln!("Connection failed: {}", e);
}
}
}
```
### Error Types
The `ConnectorError` enum provides specific variants for different error conditions:
- `ConnectorError::WebSocketError` - WebSocket connection or communication errors
- `ConnectorError::TlsError` - TLS-related errors (certificate validation, etc.)
- `ConnectorError::UrlError` - URL parsing errors
- `ConnectorError::JsonError` - JSON serialization/deserialization errors
- `ConnectorError::ProtocolError` - Protocol-level errors
- `ConnectorError::CompressionError` - Gzip compression/decompression errors
- `ConnectorError::IoError` - I/O errors
- `ConnectorError::Other` - Other errors with descriptive messages
All errors implement `std::error::Error` so they work seamlessly with `Box<dyn std::error::Error>`, `anyhow`, and other error handling crates.
### Migration from Generic Errors
If you were previously using the library with generic error handling, your existing code will continue to work:
```rust
// This continues to work as before
async fn my_function() -> Result<(), Box<dyn std::error::Error>> {
let mut connector = connect_to_socktop_agent("ws://localhost:3000/ws").await?;
let response = connector.request(AgentRequest::Metrics).await?;
Ok(())
}
// But now you can also use structured error handling for better control
async fn improved_function() -> Result<(), ConnectorError> {
let mut connector = connect_to_socktop_agent("ws://localhost:3000/ws").await?;
let response = connector.request(AgentRequest::Metrics).await?;
Ok(())
}
```
## License ## License

View File

@ -17,6 +17,7 @@ use tokio_tungstenite::{
}; };
use url::Url; use url::Url;
use crate::error::{ConnectorError, Result};
use crate::types::{AgentRequest, AgentResponse, DiskInfo, Metrics, ProcessInfo, ProcessesPayload}; use crate::types::{AgentRequest, AgentResponse, DiskInfo, Metrics, ProcessInfo, ProcessesPayload};
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
@ -41,6 +42,8 @@ pub struct ConnectorConfig {
pub url: String, pub url: String,
pub tls_ca_path: Option<String>, pub tls_ca_path: Option<String>,
pub verify_hostname: bool, pub verify_hostname: bool,
pub ws_protocols: Option<Vec<String>>,
pub ws_version: Option<String>,
} }
impl ConnectorConfig { impl ConnectorConfig {
@ -49,6 +52,8 @@ impl ConnectorConfig {
url: url.into(), url: url.into(),
tls_ca_path: None, tls_ca_path: None,
verify_hostname: false, verify_hostname: false,
ws_protocols: None,
ws_version: None,
} }
} }
@ -61,6 +66,18 @@ impl ConnectorConfig {
self.verify_hostname = verify; self.verify_hostname = verify;
self self
} }
/// Set WebSocket sub-protocols to negotiate
pub fn with_protocols(mut self, protocols: Vec<String>) -> Self {
self.ws_protocols = Some(protocols);
self
}
/// Set WebSocket protocol version (default is "13")
pub fn with_version(mut self, version: impl Into<String>) -> Self {
self.ws_version = Some(version.into());
self
}
} }
/// A WebSocket connector for communicating with socktop agents /// A WebSocket connector for communicating with socktop agents
@ -79,39 +96,33 @@ impl SocktopConnector {
} }
/// Connect to the agent /// Connect to the agent
pub async fn connect(&mut self) -> Result<(), Box<dyn std::error::Error>> { pub async fn connect(&mut self) -> Result<()> {
let stream = connect_to_agent( let stream = connect_to_agent(&self.config).await?;
&self.config.url,
self.config.tls_ca_path.as_deref(),
self.config.verify_hostname,
)
.await?;
self.stream = Some(stream); self.stream = Some(stream);
Ok(()) Ok(())
} }
/// Send a request to the agent and get the response /// Send a request to the agent and get the response
pub async fn request( pub async fn request(&mut self, request: AgentRequest) -> Result<AgentResponse> {
&mut self, let stream = self.stream.as_mut().ok_or(ConnectorError::NotConnected)?;
request: AgentRequest,
) -> Result<AgentResponse, Box<dyn std::error::Error>> {
let stream = self.stream.as_mut().ok_or("Not connected")?;
match request { match request {
AgentRequest::Metrics => { AgentRequest::Metrics => {
let metrics = request_metrics(stream) let metrics = request_metrics(stream)
.await .await
.ok_or("Failed to get metrics")?; .ok_or_else(|| ConnectorError::invalid_response("Failed to get metrics"))?;
Ok(AgentResponse::Metrics(metrics)) Ok(AgentResponse::Metrics(metrics))
} }
AgentRequest::Disks => { AgentRequest::Disks => {
let disks = request_disks(stream).await.ok_or("Failed to get disks")?; let disks = request_disks(stream)
.await
.ok_or_else(|| ConnectorError::invalid_response("Failed to get disks"))?;
Ok(AgentResponse::Disks(disks)) Ok(AgentResponse::Disks(disks))
} }
AgentRequest::Processes => { AgentRequest::Processes => {
let processes = request_processes(stream) let processes = request_processes(stream)
.await .await
.ok_or("Failed to get processes")?; .ok_or_else(|| ConnectorError::invalid_response("Failed to get processes"))?;
Ok(AgentResponse::Processes(processes)) Ok(AgentResponse::Processes(processes))
} }
} }
@ -123,7 +134,7 @@ impl SocktopConnector {
} }
/// Disconnect from the agent /// Disconnect from the agent
pub async fn disconnect(&mut self) -> Result<(), Box<dyn std::error::Error>> { pub async fn disconnect(&mut self) -> Result<()> {
if let Some(mut stream) = self.stream.take() { if let Some(mut stream) = self.stream.take() {
let _ = stream.close(None).await; let _ = stream.close(None).await;
} }
@ -132,32 +143,54 @@ impl SocktopConnector {
} }
// Connect to the agent and return the WS stream // Connect to the agent and return the WS stream
async fn connect_to_agent( async fn connect_to_agent(config: &ConnectorConfig) -> Result<WsStream> {
url: &str,
tls_ca: Option<&str>,
verify_hostname: bool,
) -> Result<WsStream, Box<dyn std::error::Error>> {
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
ensure_crypto_provider(); ensure_crypto_provider();
let mut u = Url::parse(url)?; let mut u = Url::parse(&config.url)?;
if let Some(ca_path) = tls_ca { if let Some(ca_path) = &config.tls_ca_path {
if u.scheme() == "ws" { if u.scheme() == "ws" {
let _ = u.set_scheme("wss"); let _ = u.set_scheme("wss");
} }
return connect_with_ca(u.as_str(), ca_path, verify_hostname).await; return connect_with_ca_and_config(u.as_str(), ca_path, config).await;
} }
// No TLS - hostname verification is not applicable // No TLS - hostname verification is not applicable
let (ws, _) = connect_async(u.as_str()).await?; connect_without_ca_and_config(u.as_str(), config).await
}
async fn connect_without_ca_and_config(url: &str, config: &ConnectorConfig) -> Result<WsStream> {
let mut req = url.into_client_request()?;
// Apply WebSocket protocol configuration
if let Some(version) = &config.ws_version {
req.headers_mut().insert(
"Sec-WebSocket-Version",
version
.parse()
.map_err(|_| ConnectorError::protocol_error("Invalid WebSocket version"))?,
);
}
if let Some(protocols) = &config.ws_protocols {
let protocols_str = protocols.join(", ");
req.headers_mut().insert(
"Sec-WebSocket-Protocol",
protocols_str
.parse()
.map_err(|_| ConnectorError::protocol_error("Invalid WebSocket protocols"))?,
);
}
let (ws, _) = connect_async(req).await?;
Ok(ws) Ok(ws)
} }
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
async fn connect_with_ca( async fn connect_with_ca_and_config(
url: &str, url: &str,
ca_path: &str, ca_path: &str,
verify_hostname: bool, config: &ConnectorConfig,
) -> Result<WsStream, Box<dyn std::error::Error>> { ) -> Result<WsStream> {
// Initialize the crypto provider for rustls // Initialize the crypto provider for rustls
let _ = rustls::crypto::ring::default_provider().install_default(); let _ = rustls::crypto::ring::default_provider().install_default();
@ -175,8 +208,29 @@ async fn connect_with_ca(
.with_root_certificates(root) .with_root_certificates(root)
.with_no_client_auth(); .with_no_client_auth();
let req = url.into_client_request()?; let mut req = url.into_client_request()?;
if !verify_hostname {
// Apply WebSocket protocol configuration
if let Some(version) = &config.ws_version {
req.headers_mut().insert(
"Sec-WebSocket-Version",
version
.parse()
.map_err(|_| ConnectorError::protocol_error("Invalid WebSocket version"))?,
);
}
if let Some(protocols) = &config.ws_protocols {
let protocols_str = protocols.join(", ");
req.headers_mut().insert(
"Sec-WebSocket-Protocol",
protocols_str
.parse()
.map_err(|_| ConnectorError::protocol_error("Invalid WebSocket protocols"))?,
);
}
if !config.verify_hostname {
#[derive(Debug)] #[derive(Debug)]
struct NoVerify; struct NoVerify;
impl ServerCertVerifier for NoVerify { impl ServerCertVerifier for NoVerify {
@ -187,7 +241,7 @@ async fn connect_with_ca(
_server_name: &ServerName, _server_name: &ServerName,
_ocsp_response: &[u8], _ocsp_response: &[u8],
_now: UnixTime, _now: UnixTime,
) -> Result<ServerCertVerified, rustls::Error> { ) -> std::result::Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion()) Ok(ServerCertVerified::assertion())
} }
fn verify_tls12_signature( fn verify_tls12_signature(
@ -195,7 +249,7 @@ async fn connect_with_ca(
_message: &[u8], _message: &[u8],
_cert: &CertificateDer<'_>, _cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct, _dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> { ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion()) Ok(HandshakeSignatureValid::assertion())
} }
fn verify_tls13_signature( fn verify_tls13_signature(
@ -203,7 +257,7 @@ async fn connect_with_ca(
_message: &[u8], _message: &[u8],
_cert: &CertificateDer<'_>, _cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct, _dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> { ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion()) Ok(HandshakeSignatureValid::assertion())
} }
fn supported_verify_schemes(&self) -> Vec<SignatureScheme> { fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
@ -220,18 +274,26 @@ async fn connect_with_ca(
); );
} }
let cfg = Arc::new(cfg); let cfg = Arc::new(cfg);
let (ws, _) = let (ws, _) = connect_async_tls_with_config(
connect_async_tls_with_config(req, None, verify_hostname, Some(Connector::Rustls(cfg))) req,
.await?; None,
config.verify_hostname,
Some(Connector::Rustls(cfg)),
)
.await?;
Ok(ws) Ok(ws)
} }
#[cfg(not(feature = "tls"))] #[cfg(not(feature = "tls"))]
async fn connect_with_ca( async fn connect_with_ca_and_config(
_url: &str, _url: &str,
_ca_path: &str, _ca_path: &str,
) -> Result<WsStream, Box<dyn std::error::Error>> { _config: &ConnectorConfig,
Err("TLS support not compiled in".into()) ) -> Result<WsStream> {
Err(ConnectorError::tls_error(
"TLS support not compiled in",
std::io::Error::new(std::io::ErrorKind::Unsupported, "TLS not available"),
))
} }
// Send a "get_metrics" request and await a single JSON reply // Send a "get_metrics" request and await a single JSON reply
@ -334,9 +396,7 @@ fn is_gzip(bytes: &[u8]) -> bool {
/// certificate involved, hostname verification is not applicable. /// certificate involved, hostname verification is not applicable.
/// ///
/// For TLS connections with certificate pinning, use `connect_to_socktop_agent_with_tls()`. /// For TLS connections with certificate pinning, use `connect_to_socktop_agent_with_tls()`.
pub async fn connect_to_socktop_agent( pub async fn connect_to_socktop_agent(url: impl Into<String>) -> Result<SocktopConnector> {
url: impl Into<String>,
) -> Result<SocktopConnector, Box<dyn std::error::Error>> {
let config = ConnectorConfig::new(url); let config = ConnectorConfig::new(url);
let mut connector = SocktopConnector::new(config); let mut connector = SocktopConnector::new(config);
connector.connect().await?; connector.connect().await?;
@ -354,7 +414,7 @@ pub async fn connect_to_socktop_agent_with_tls(
url: impl Into<String>, url: impl Into<String>,
ca_path: impl Into<String>, ca_path: impl Into<String>,
verify_hostname: bool, verify_hostname: bool,
) -> Result<SocktopConnector, Box<dyn std::error::Error>> { ) -> Result<SocktopConnector> {
let config = ConnectorConfig::new(url) let config = ConnectorConfig::new(url)
.with_tls_ca(ca_path) .with_tls_ca(ca_path)
.with_hostname_verification(verify_hostname); .with_hostname_verification(verify_hostname);
@ -362,3 +422,42 @@ pub async fn connect_to_socktop_agent_with_tls(
connector.connect().await?; connector.connect().await?;
Ok(connector) Ok(connector)
} }
/// Convenience function to create a connector with custom WebSocket protocol configuration.
///
/// This function allows you to specify WebSocket protocol version and sub-protocols.
/// Most users should use the simpler `connect_to_socktop_agent()` function instead.
///
/// # Example
/// ```no_run
/// use socktop_connector::connect_to_socktop_agent_with_config;
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let connector = connect_to_socktop_agent_with_config(
/// "ws://localhost:3000/ws",
/// Some(vec!["socktop".to_string()]), // WebSocket sub-protocols
/// Some("13".to_string()), // WebSocket version (13 is standard)
/// ).await?;
/// # Ok(())
/// # }
/// ```
pub async fn connect_to_socktop_agent_with_config(
url: impl Into<String>,
protocols: Option<Vec<String>>,
version: Option<String>,
) -> Result<SocktopConnector> {
let mut config = ConnectorConfig::new(url);
if let Some(protocols) = protocols {
config = config.with_protocols(protocols);
}
if let Some(version) = version {
config = config.with_version(version);
}
let mut connector = SocktopConnector::new(config);
connector.connect().await?;
Ok(connector)
}

View File

@ -0,0 +1,136 @@
//! Error types for socktop_connector
use thiserror::Error;
/// Errors that can occur when using socktop_connector
#[derive(Error, Debug)]
pub enum ConnectorError {
/// WebSocket connection failed
#[error("WebSocket connection failed: {source}")]
ConnectionFailed {
#[from]
source: tokio_tungstenite::tungstenite::Error,
},
/// URL parsing error
#[error("Invalid URL: {url}")]
InvalidUrl {
url: String,
#[source]
source: url::ParseError,
},
/// TLS certificate error
#[error("TLS certificate error: {message}")]
TlsError {
message: String,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
/// Certificate file not found or invalid
#[error("Certificate file error at '{path}': {message}")]
CertificateError { path: String, message: String },
/// Invalid server response format
#[error("Invalid response from server: {message}")]
InvalidResponse { message: String },
/// JSON parsing error
#[error("JSON parsing error: {source}")]
JsonError {
#[from]
source: serde_json::Error,
},
/// Request/response protocol error
#[error("Protocol error: {message}")]
ProtocolError { message: String },
/// Connection is not established
#[error("Not connected to server")]
NotConnected,
/// Connection was closed unexpectedly
#[error("Connection closed: {reason}")]
ConnectionClosed { reason: String },
/// IO error (network, file system, etc.)
#[error("IO error: {source}")]
IoError {
#[from]
source: std::io::Error,
},
/// Compression/decompression error
#[error("Compression error: {message}")]
CompressionError { message: String },
/// Protocol Buffer parsing error
#[error("Protocol buffer error: {source}")]
ProtobufError {
#[from]
source: prost::DecodeError,
},
}
/// Result type alias for connector operations
pub type Result<T> = std::result::Result<T, ConnectorError>;
impl ConnectorError {
/// Create a TLS error with context
pub fn tls_error(
message: impl Into<String>,
source: impl std::error::Error + Send + Sync + 'static,
) -> Self {
Self::TlsError {
message: message.into(),
source: Box::new(source),
}
}
/// Create a certificate error
pub fn certificate_error(path: impl Into<String>, message: impl Into<String>) -> Self {
Self::CertificateError {
path: path.into(),
message: message.into(),
}
}
/// Create a protocol error
pub fn protocol_error(message: impl Into<String>) -> Self {
Self::ProtocolError {
message: message.into(),
}
}
/// Create an invalid response error
pub fn invalid_response(message: impl Into<String>) -> Self {
Self::InvalidResponse {
message: message.into(),
}
}
/// Create a connection closed error
pub fn connection_closed(reason: impl Into<String>) -> Self {
Self::ConnectionClosed {
reason: reason.into(),
}
}
/// Create a compression error
pub fn compression_error(message: impl Into<String>) -> Self {
Self::CompressionError {
message: message.into(),
}
}
}
impl From<url::ParseError> for ConnectorError {
fn from(source: url::ParseError) -> Self {
Self::InvalidUrl {
url: "unknown".to_string(), // We don't have the URL in the error context
source,
}
}
}

View File

@ -141,16 +141,15 @@
#![cfg_attr(docsrs, feature(doc_cfg))] #![cfg_attr(docsrs, feature(doc_cfg))]
pub mod connector; pub mod connector;
pub mod error;
pub mod types; pub mod types;
pub use connector::{ pub use connector::{
ConnectorConfig, SocktopConnector, WsStream, connect_to_socktop_agent, ConnectorConfig, SocktopConnector, WsStream, connect_to_socktop_agent,
connect_to_socktop_agent_with_tls, connect_to_socktop_agent_with_config, connect_to_socktop_agent_with_tls,
}; };
pub use error::{ConnectorError, Result};
pub use types::{ pub use types::{
AgentRequest, AgentResponse, DiskInfo, GpuInfo, Metrics, NetworkInfo, ProcessInfo, AgentRequest, AgentResponse, DiskInfo, GpuInfo, Metrics, NetworkInfo, ProcessInfo,
ProcessesPayload, ProcessesPayload,
}; };
/// Re-export commonly used error type
pub use anyhow::Error;