// Copyright (c) 2019 Fabian Freyer . // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // // 1. Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // // 2: Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // // 3. Neither the name of the copyright holder nor the names of its contributors // may be used to endorse or promote products derived from this software // without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. #[macro_use] extern crate serde_json; #[macro_use] extern crate log; use actix::prelude::*; use actix::{Actor, StreamHandler}; use actix_web::{web, App, HttpRequest, HttpResponse}; use actix_web_actors::ws; use std::io::Write; use std::process::Command; use std::time::{Duration, Instant}; use tokio_codec::{BytesCodec, Decoder, FramedRead}; use tokio_pty_process::{AsyncPtyMaster, AsyncPtyMasterWriteHalf, Child, CommandExt}; use handlebars::Handlebars; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); const IDLE_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes const IDLE_CHECK_INTERVAL: Duration = Duration::from_secs(30); // Check every 30 seconds mod event; mod terminado; /// Actix WebSocket actor pub struct Websocket { cons: Option>, hb: Instant, command: Option, } impl Actor for Websocket { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { // Start heartbeat self.hb(ctx); let command = self .command .take() .expect("command was None at start of WebSocket."); // Start PTY self.cons = Some(Terminal::new(ctx.address(), command).start()); trace!("Started WebSocket"); } fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { trace!("Stopping WebSocket"); // When the WebSocket disconnects, the Terminal's idle timeout will // automatically clean up the PTY session after IDLE_TIMEOUT (5 minutes). // This prevents "grey goo" accumulation of orphaned terminal processes // while giving reconnecting clients a grace period. if let Some(_cons) = self.cons.take() { info!("WebSocket disconnecting, Terminal will timeout if idle"); } Running::Stop } fn stopped(&mut self, _ctx: &mut Self::Context) { trace!("Stopped WebSocket"); } } impl Handler for Websocket { type Result = (); fn handle(&mut self, msg: event::IO, ctx: &mut ::Context) { trace!("Websocket <- Terminal : {:?}", msg); ctx.binary(msg); } } impl Handler for Websocket { type Result = (); fn handle(&mut self, msg: event::TerminadoMessage, ctx: &mut ::Context) { trace!("Websocket <- Terminal : {:?}", msg); match msg { event::TerminadoMessage::Stdout(_) => { let json = serde_json::to_string(&msg); if let Ok(json) = json { ctx.text(json); } } _ => error!(r#"Invalid event::TerminadoMessage to Websocket: only "stdout" supported"#), } } } impl Websocket { pub fn new(command: Command) -> Self { Self { hb: Instant::now(), cons: None, command: Some(command), } } fn hb(&self, ctx: &mut ::Context) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { warn!("Client heartbeat timeout, disconnecting."); ctx.stop(); return; } ctx.ping(""); }); } } impl StreamHandler for Websocket { fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { let cons: &mut Addr = match self.cons { Some(ref mut c) => c, None => { error!("Terminalole died, closing websocket."); ctx.stop(); return; } }; match msg { ws::Message::Ping(msg) => { self.hb = Instant::now(); ctx.pong(&msg); } ws::Message::Pong(_) => self.hb = Instant::now(), ws::Message::Text(t) => { // Attempt to parse the message as JSON. if let Ok(tmsg) = event::TerminadoMessage::from_json(&t) { cons.do_send(tmsg); } else { // Otherwise, it's just byte data. cons.do_send(event::IO::from(t)); } } ws::Message::Binary(b) => cons.do_send(event::IO::from(b)), ws::Message::Close(_) => ctx.stop(), ws::Message::Nop => {} }; } } impl Handler for Websocket { type Result = (); fn handle(&mut self, _msg: event::ChildDied, ctx: &mut ::Context) { trace!("Websocket <- ChildDied"); ctx.close(None); ctx.stop(); } } /// Represents a PTY backenActix WebSocket actor.d with attached child pub struct Terminal { pty_write: Option, child: Option, ws: Addr, command: Command, last_activity: Instant, idle_timeout: Duration, } impl Terminal { pub fn new(ws: Addr, command: Command) -> Self { Self { pty_write: None, child: None, ws, command, last_activity: Instant::now(), idle_timeout: IDLE_TIMEOUT, } } } impl StreamHandler<::Item, ::Error> for Terminal { fn handle(&mut self, msg: ::Item, _ctx: &mut Self::Context) { self.ws .do_send(event::TerminadoMessage::Stdout(event::IO(msg))); } } impl Actor for Terminal { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { info!("Started Terminal"); let pty = match AsyncPtyMaster::open() { Err(e) => { error!("Unable to open PTY: {:?}", e); ctx.stop(); return; } Ok(pty) => pty, }; let child = match self.command.spawn_pty_async(&pty) { Err(e) => { error!("Unable to spawn child: {:?}", e); ctx.stop(); return; } Ok(child) => child, }; info!("Spawned new child process with PID {}", child.id()); let (pty_read, mut pty_write) = pty.split(); // Set a sensible default PTY size immediately after splitting the PTY. // This avoids sending an initial 0x0 resize to the backend which can // cause panics in terminal UI libraries like ratatui. // // We use the Resize helper which accepts a mutable reference to the // write-half of the PTY and block until the resize completes. let _ = event::Resize::new(&mut pty_write, 24, 80).wait(); self.pty_write = Some(pty_write); self.child = Some(child); Self::add_stream(FramedRead::new(pty_read, BytesCodec::new()), ctx); // Start idle timeout checker ctx.run_interval(IDLE_CHECK_INTERVAL, |act, ctx| { let idle_duration = Instant::now().duration_since(act.last_activity); if idle_duration >= act.idle_timeout { info!( "Terminal idle timeout reached ({:?} idle), stopping session", idle_duration ); ctx.stop(); } }); } fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { info!("Stopping Terminal"); let child = self.child.take(); if child.is_none() { // Great, child is already dead! return Running::Stop; } let mut child = child.unwrap(); match child.kill() { Ok(()) => match child.wait() { Ok(exit) => info!("Child died: {:?}", exit), Err(e) => error!("Child wouldn't die: {}", e), }, Err(e) => error!("Could not kill child with PID {}: {}", child.id(), e), }; // Notify the websocket that the child died. self.ws.do_send(event::ChildDied()); Running::Stop } fn stopped(&mut self, _ctx: &mut Self::Context) { info!("Stopped Terminal"); } } impl Handler for Terminal { type Result = (); fn handle(&mut self, msg: event::IO, ctx: &mut ::Context) { // Reset idle timer on activity self.last_activity = Instant::now(); let pty = match self.pty_write { Some(ref mut p) => p, None => { error!("Write half of PTY died, stopping Terminal."); ctx.stop(); return; } }; if let Err(e) = pty.write(msg.as_ref()) { error!("Could not write to PTY: {}", e); ctx.stop(); } trace!("Websocket -> Terminal : {:?}", msg); } } impl Handler for Terminal { type Result = (); fn handle(&mut self, msg: event::TerminadoMessage, ctx: &mut ::Context) { let pty = match self.pty_write { Some(ref mut p) => p, None => { error!("Write half of PTY died, stopping Terminal."); ctx.stop(); return; } }; trace!("Websocket -> Terminal : {:?}", msg); match msg { event::TerminadoMessage::Stdin(io) => { // Reset idle timer on user input self.last_activity = Instant::now(); if let Err(e) = pty.write(io.as_ref()) { error!("Could not write to PTY: {}", e); ctx.stop(); } } event::TerminadoMessage::Resize { rows, cols } => { // Reset idle timer on resize (user interaction) self.last_activity = Instant::now(); // Ignore zero-sized resizes which can cause panics in backends // such as ratatui when they receive a Rect with width or height 0. if rows == 0 || cols == 0 { trace!( "Ignoring zero-sized resize: cols = {}, rows = {}", cols, rows ); return; } info!("Resize: cols = {}, rows = {}", cols, rows); if let Err(e) = event::Resize::new(pty, rows, cols).wait() { error!("Resize failed: {}", e); ctx.stop(); } } event::TerminadoMessage::Stdout(_) => { error!("Invalid Terminado Message: Stdin cannot go to PTY") } }; } } /// Trait to extend an [actix_web::App] by serving a web terminal. pub trait WebTermExt { /// Serve the websocket for the webterm fn webterm_socket(self: Self, endpoint: &str, handler: F) -> Self where F: Clone + Fn(&actix_web::HttpRequest) -> Command + 'static; fn webterm_ui( self: Self, endpoint: &str, webterm_socket_endpoint: &str, static_path: &str, ) -> Self; } impl WebTermExt for App where B: actix_web::body::MessageBody, T: actix_service::NewService< Config = (), Request = actix_web::dev::ServiceRequest, Response = actix_web::dev::ServiceResponse, Error = actix_web::Error, InitError = (), >, { fn webterm_socket(self: Self, endpoint: &str, handler: F) -> Self where F: Clone + Fn(&actix_web::HttpRequest) -> Command + 'static, { self.route( endpoint, web::get().to(move |req: HttpRequest, stream: web::Payload| { ws::start(Websocket::new(handler(&req)), &req, stream) }), ) } fn webterm_ui( self: Self, endpoint: &str, webterm_socket_endpoint: &str, static_path: &str, ) -> Self { let mut handlebars = Handlebars::new(); handlebars .register_templates_directory(".html", "./templates") .unwrap(); let handlebars_ref = web::Data::new(handlebars); let static_path = static_path.to_owned(); let webterm_socket_endpoint = webterm_socket_endpoint.to_owned(); self.register_data(handlebars_ref.clone()).route( endpoint, web::get().to(move |hb: web::Data| { let data = json!({ "websocket_path": webterm_socket_endpoint, "static_path": static_path, }); let body = hb.render("term", &data).unwrap(); HttpResponse::Ok().body(body) }), ) } }