socktop-webterm/src/lib.rs
jasonwitty 12f2d6e6af
Some checks failed
Build and Deploy to K3s / deploy (push) Blocked by required conditions
Build and Deploy to K3s / build-and-push (push) Has been cancelled
- modernize packages and rust edition. - increase timeout for rollout -
increment cargo version
2025-11-28 14:43:49 -08:00

495 lines
15 KiB
Rust

// Copyright (c) 2019 Fabian Freyer <fabian.freyer@physik.tu-berlin.de>.
// Copyright (c) 2024 Jason Witty <jasonpwitty+socktop@proton.me>.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// 2: Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors
// may be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
use actix::prelude::*;
use actix::{Actor, StreamHandler};
use actix_web::{web, App, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use std::io::{Read, Write};
use std::process::Command;
use std::time::{Duration, Instant};
use bytes::Bytes;
use handlebars::Handlebars;
use portable_pty::{native_pty_system, CommandBuilder, PtySize};
use serde_json::json;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
const IDLE_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes
const IDLE_CHECK_INTERVAL: Duration = Duration::from_secs(30); // Check every 30 seconds
mod event;
mod terminado;
use event::{ChildDied, TerminadoMessage, IO};
/// Actix WebSocket actor
pub struct Websocket {
cons: Option<Addr<Terminal>>,
hb: Instant,
command: Option<Command>,
}
impl Actor for Websocket {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
// Start heartbeat
self.hb(ctx);
let command = self
.command
.take()
.expect("command was None at start of WebSocket.");
// Start PTY
self.cons = Some(Terminal::new(ctx.address(), command).start());
log::trace!("Started WebSocket");
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
log::trace!("Stopping WebSocket");
if let Some(_cons) = self.cons.take() {
log::info!("WebSocket disconnecting, Terminal will timeout if idle");
}
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
log::trace!("Stopped WebSocket");
}
}
impl Handler<IO> for Websocket {
type Result = ();
fn handle(&mut self, msg: IO, ctx: &mut <Self as Actor>::Context) {
log::trace!("Websocket <- Terminal : {:?}", msg);
ctx.binary(msg.0);
}
}
impl Handler<TerminadoMessage> for Websocket {
type Result = ();
fn handle(&mut self, msg: TerminadoMessage, ctx: &mut <Self as Actor>::Context) {
log::trace!("Websocket <- Terminal : {:?}", msg);
match msg {
TerminadoMessage::Stdout(_) => {
let json = serde_json::to_string(&msg);
if let Ok(json) = json {
ctx.text(json);
}
}
_ => log::error!(
r#"Invalid event::TerminadoMessage to Websocket: only "stdout" supported"#
),
}
}
}
impl Websocket {
pub fn new(command: Command) -> Self {
Self {
hb: Instant::now(),
cons: None,
command: Some(command),
}
}
fn hb(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
log::warn!("Client heartbeat timeout, disconnecting.");
ctx.stop();
return;
}
ctx.ping(b"");
});
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Websocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let cons: &mut Addr<Terminal> = match self.cons {
Some(ref mut c) => c,
None => {
log::error!("Terminal died, closing websocket.");
ctx.stop();
return;
}
};
let msg = match msg {
Ok(msg) => msg,
Err(e) => {
log::error!("WebSocket protocol error: {}", e);
ctx.stop();
return;
}
};
match msg {
ws::Message::Ping(msg) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
ws::Message::Pong(_) => self.hb = Instant::now(),
ws::Message::Text(t) => {
// Attempt to parse the message as JSON.
if let Ok(tmsg) = TerminadoMessage::from_json(t.as_ref()) {
cons.do_send(tmsg);
} else {
// Otherwise, it's just byte data.
cons.do_send(IO::from(t.to_string()));
}
}
ws::Message::Binary(b) => cons.do_send(IO::from(b)),
ws::Message::Close(_) => ctx.stop(),
ws::Message::Nop | ws::Message::Continuation(_) => {}
};
}
}
impl Handler<ChildDied> for Websocket {
type Result = ();
fn handle(&mut self, _msg: ChildDied, ctx: &mut <Self as Actor>::Context) {
log::trace!("Websocket <- ChildDied");
ctx.close(None);
ctx.stop();
}
}
/// Represents a PTY backend with attached child
pub struct Terminal {
pty_master: Option<Box<dyn portable_pty::MasterPty + Send>>,
pty_writer: Option<Box<dyn Write + Send>>,
child: Option<Box<dyn portable_pty::Child + Send>>,
ws: Addr<Websocket>,
command: Command,
last_activity: Instant,
idle_timeout: Duration,
}
impl Terminal {
pub fn new(ws: Addr<Websocket>, command: Command) -> Self {
Self {
pty_master: None,
pty_writer: None,
child: None,
ws,
command,
last_activity: Instant::now(),
idle_timeout: IDLE_TIMEOUT,
}
}
}
impl Actor for Terminal {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
log::info!("Started Terminal");
let pty_system = native_pty_system();
let pty_pair = match pty_system.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
}) {
Ok(pair) => pair,
Err(e) => {
log::error!("Unable to open PTY: {:?}", e);
ctx.stop();
return;
}
};
let mut cmd_builder = CommandBuilder::new(self.command.get_program());
for arg in self.command.get_args() {
cmd_builder.arg(arg);
}
for (key, val) in self.command.get_envs() {
if let Some(val) = val {
cmd_builder.env(key, val);
}
}
let child = match pty_pair.slave.spawn_command(cmd_builder) {
Ok(child) => child,
Err(e) => {
log::error!("Unable to spawn child: {:?}", e);
ctx.stop();
return;
}
};
log::info!("Spawned new child process");
// Get reader and writer
let reader = match pty_pair.master.try_clone_reader() {
Ok(r) => r,
Err(e) => {
log::error!("Unable to clone reader: {:?}", e);
ctx.stop();
return;
}
};
let writer = match pty_pair.master.take_writer() {
Ok(w) => w,
Err(e) => {
log::error!("Unable to get writer: {:?}", e);
ctx.stop();
return;
}
};
self.pty_master = Some(pty_pair.master);
self.pty_writer = Some(writer);
self.child = Some(child);
// Spawn blocking thread to read from PTY
let ws = self.ws.clone();
std::thread::spawn(move || {
let mut reader = reader;
let mut buf = [0u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => {
log::info!("PTY reader reached EOF");
break;
}
Ok(n) => {
let data = Bytes::copy_from_slice(&buf[..n]);
ws.do_send(TerminadoMessage::Stdout(IO(data)));
}
Err(e) => {
log::error!("Error reading from PTY: {}", e);
break;
}
}
}
});
// Start idle timeout checker
ctx.run_interval(IDLE_CHECK_INTERVAL, |act, ctx| {
let idle_duration = Instant::now().duration_since(act.last_activity);
if idle_duration >= act.idle_timeout {
log::info!(
"Terminal idle timeout reached ({:?} idle), stopping session",
idle_duration
);
ctx.stop();
}
});
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
log::info!("Stopping Terminal");
if let Some(mut child) = self.child.take() {
let _ = child.kill();
let _ = child.wait();
}
// Notify the websocket that the child died.
self.ws.do_send(ChildDied());
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
log::info!("Stopped Terminal");
}
}
impl Handler<IO> for Terminal {
type Result = ();
fn handle(&mut self, msg: IO, ctx: &mut <Self as Actor>::Context) {
// Reset idle timer on activity
self.last_activity = Instant::now();
let writer = match &mut self.pty_writer {
Some(w) => w,
None => {
log::error!("PTY writer died, stopping Terminal.");
ctx.stop();
return;
}
};
if let Err(e) = writer.write_all(&msg.0) {
log::error!("Could not write to PTY: {}", e);
ctx.stop();
return;
}
log::trace!("Websocket -> Terminal : {:?}", msg);
}
}
impl Handler<TerminadoMessage> for Terminal {
type Result = ();
fn handle(&mut self, msg: TerminadoMessage, ctx: &mut <Self as Actor>::Context) {
log::trace!("Websocket -> Terminal : {:?}", msg);
match msg {
TerminadoMessage::Stdin(io) => {
// Reset idle timer on user input
self.last_activity = Instant::now();
let writer = match &mut self.pty_writer {
Some(w) => w,
None => {
log::error!("PTY writer died, stopping Terminal.");
ctx.stop();
return;
}
};
if let Err(e) = writer.write_all(&io.0) {
log::error!("Could not write to PTY: {}", e);
ctx.stop();
}
}
TerminadoMessage::Resize { rows, cols } => {
// Reset idle timer on resize (user interaction)
self.last_activity = Instant::now();
// Ignore zero-sized resizes
if rows == 0 || cols == 0 {
log::trace!(
"Ignoring zero-sized resize: cols = {}, rows = {}",
cols,
rows
);
return;
}
log::info!("Resize: cols = {}, rows = {}", cols, rows);
let pty = match &mut self.pty_master {
Some(p) => p,
None => {
log::error!("PTY died, stopping Terminal.");
ctx.stop();
return;
}
};
if let Err(e) = pty.resize(PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
}) {
log::error!("Resize failed: {}", e);
ctx.stop();
}
}
TerminadoMessage::Stdout(_) => {
log::error!("Invalid Terminado Message: Stdout cannot go to PTY")
}
};
}
}
/// Trait to extend an [actix_web::App] by serving a web terminal.
pub trait WebTermExt {
/// Serve the websocket for the webterm
fn webterm_socket<F>(self, endpoint: &str, handler: F) -> Self
where
F: Clone + Fn(&actix_web::HttpRequest) -> Command + 'static;
fn webterm_ui(self, endpoint: &str, webterm_socket_endpoint: &str, static_path: &str) -> Self;
}
impl<T> WebTermExt for App<T>
where
T: actix_web::dev::ServiceFactory<
actix_web::dev::ServiceRequest,
Config = (),
Error = actix_web::Error,
InitError = (),
>,
{
fn webterm_socket<F>(self, endpoint: &str, handler: F) -> Self
where
F: Clone + Fn(&actix_web::HttpRequest) -> Command + 'static,
{
self.route(
endpoint,
web::get().to(move |req: HttpRequest, stream: web::Payload| {
let cmd = handler(&req);
async move { ws::start(Websocket::new(cmd), &req, stream) }
}),
)
}
fn webterm_ui(self, endpoint: &str, webterm_socket_endpoint: &str, static_path: &str) -> Self {
let mut handlebars = Handlebars::new();
handlebars
.register_template_file("term", "./templates/term.html")
.unwrap();
let handlebars_ref = web::Data::new(handlebars);
let static_path = static_path.to_owned();
let webterm_socket_endpoint = webterm_socket_endpoint.to_owned();
self.app_data(handlebars_ref.clone()).route(
endpoint,
web::get().to(move |hb: web::Data<Handlebars<'static>>| {
let websocket_path = webterm_socket_endpoint.clone();
let static_path_clone = static_path.clone();
async move {
let data = json!({
"websocket_path": websocket_path,
"static_path": static_path_clone,
});
let body = hb.render("term", &data).unwrap();
HttpResponse::Ok().body(body)
}
}),
)
}
}