From 4f3e8f6443ff5ad40a54b13e64aacb68f607495a Mon Sep 17 00:00:00 2001 From: Fabian Freyer Date: Sat, 23 Mar 2019 00:49:09 -0400 Subject: [PATCH] refactor: move things into event module --- src/event.rs | 68 +++++++++++++++++++++++++++++++++ src/main.rs | 98 +++++++++++------------------------------------- src/terminado.rs | 2 +- 3 files changed, 90 insertions(+), 78 deletions(-) create mode 100644 src/event.rs diff --git a/src/event.rs b/src/event.rs new file mode 100644 index 0000000..91eb907 --- /dev/null +++ b/src/event.rs @@ -0,0 +1,68 @@ +use actix::Message; +use actix_web::Binary; +use futures::{Future, Poll}; +use libc::c_ushort; +use tokio_pty_process::PtyMaster; + +pub use crate::terminado::TerminadoMessage; + +use tokio_codec::{BytesCodec, Decoder}; +type BytesMut = ::Item; + +pub struct Resize { + pty: T, + rows: c_ushort, + cols: c_ushort, +} + +impl Resize { + pub fn new(pty: T, rows: c_ushort, cols: c_ushort) -> Self { + Self { pty, rows, cols } + } +} + +impl Future for Resize { + type Item = (); + type Error = std::io::Error; + + fn poll(&mut self) -> Poll { + self.pty.resize(self.rows, self.cols) + } +} + +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct IO(pub BytesMut); + +impl Message for IO { + type Result = (); +} + +impl Into for IO { + fn into(self) -> Binary { + self.0.into() + } +} + +impl AsRef<[u8]> for IO { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +impl From for IO { + fn from(b: Binary) -> Self { + Self(b.as_ref().into()) + } +} + +impl From for IO { + fn from(s: String) -> Self { + Self(s.into()) + } +} + +impl From<&str> for IO { + fn from(s: &str) -> Self { + Self(s.into()) + } +} diff --git a/src/main.rs b/src/main.rs index 119691e..cb14709 100644 --- a/src/main.rs +++ b/src/main.rs @@ -42,63 +42,22 @@ extern crate log; extern crate pretty_env_logger; use actix::*; -use actix_web::{fs::NamedFile, fs::StaticFiles, server, ws, App, Binary, HttpRequest, Result}; +use actix_web::{fs::NamedFile, fs::StaticFiles, server, ws, App, HttpRequest, Result}; use futures::prelude::*; -use libc::c_ushort; - use std::io::Write; use std::process::Command; use std::time::{Duration, Instant}; use tokio_codec::{BytesCodec, Decoder, FramedRead}; -use tokio_pty_process::{AsyncPtyMaster, AsyncPtyMasterWriteHalf, Child, CommandExt, PtyMaster}; +use tokio_pty_process::{AsyncPtyMaster, AsyncPtyMasterWriteHalf, Child, CommandExt}; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); -type BytesMut = ::Item; - +mod event; mod terminado; -use crate::terminado::TerminadoMessage; - -#[derive(Debug, Eq, PartialEq, Clone)] -pub struct IO(BytesMut); - -impl Message for IO { - type Result = (); -} - -impl Into for IO { - fn into(self) -> Binary { - self.0.into() - } -} - -impl AsRef<[u8]> for IO { - fn as_ref(&self) -> &[u8] { - self.0.as_ref() - } -} - -impl From for IO { - fn from(b: Binary) -> Self { - Self(b.as_ref().into()) - } -} - -impl From for IO { - fn from(s: String) -> Self { - Self(s.into()) - } -} - -impl From<&str> for IO { - fn from(s: &str) -> Self { - Self(s.into()) - } -} struct Websocket { cons: Option>, @@ -129,29 +88,29 @@ impl Actor for Websocket { } } -impl Handler for Websocket { +impl Handler for Websocket { type Result = (); - fn handle(&mut self, msg: IO, ctx: &mut ::Context) { + fn handle(&mut self, msg: event::IO, ctx: &mut ::Context) { trace!("Websocket <- Terminal : {:?}", msg); ctx.binary(msg); } } -impl Handler for Websocket { +impl Handler for Websocket { type Result = (); - fn handle(&mut self, msg: TerminadoMessage, ctx: &mut ::Context) { + fn handle(&mut self, msg: event::TerminadoMessage, ctx: &mut ::Context) { trace!("Websocket <- Terminal : {:?}", msg); match msg { - TerminadoMessage::Stdout(_) => { + event::TerminadoMessage::Stdout(_) => { let json = serde_json::to_string(&msg); if let Ok(json) = json { ctx.text(json); } } - _ => error!(r#"Invalid TerminadoMessage to Websocket: only "stdout" supported"#), + _ => error!(r#"Invalid event::TerminadoMessage to Websocket: only "stdout" supported"#), } } } @@ -200,14 +159,14 @@ impl StreamHandler for Websocket { ws::Message::Pong(_) => self.hb = Instant::now(), ws::Message::Text(t) => { // Attempt to parse the message as JSON. - if let Ok(tmsg) = TerminadoMessage::from_json(&t) { + if let Ok(tmsg) = event::TerminadoMessage::from_json(&t) { cons.do_send(tmsg); } else { // Otherwise, it's just byte data. - cons.do_send(IO::from(t)); + cons.do_send(event::IO::from(t)); } } - ws::Message::Binary(b) => cons.do_send(IO::from(b)), + ws::Message::Binary(b) => cons.do_send(event::IO::from(b)), ws::Message::Close(_) => ctx.stop(), }; } @@ -231,7 +190,7 @@ impl Terminal { impl StreamHandler<::Item, ::Error> for Terminal { fn handle(&mut self, msg: ::Item, _ctx: &mut Self::Context) { - self.ws.do_send(TerminadoMessage::Stdout(IO(msg))); + self.ws.do_send(event::TerminadoMessage::Stdout(event::IO(msg))); } } @@ -296,10 +255,10 @@ impl Actor for Terminal { } } -impl Handler for Terminal { +impl Handler for Terminal { type Result = (); - fn handle(&mut self, msg: IO, ctx: &mut ::Context) { + fn handle(&mut self, msg: event::IO, ctx: &mut ::Context) { let pty = match self.pty_write { Some(ref mut p) => p, None => { @@ -318,25 +277,10 @@ impl Handler for Terminal { } } -struct Resize { - pty: T, - rows: c_ushort, - cols: c_ushort, -} - -impl Future for Resize { - type Item = (); - type Error = std::io::Error; - - fn poll(&mut self) -> Poll { - self.pty.resize(self.rows, self.cols) - } -} - -impl Handler for Terminal { +impl Handler for Terminal { type Result = (); - fn handle(&mut self, msg: TerminadoMessage, ctx: &mut ::Context) { + fn handle(&mut self, msg: event::TerminadoMessage, ctx: &mut ::Context) { let pty = match self.pty_write { Some(ref mut p) => p, None => { @@ -348,20 +292,20 @@ impl Handler for Terminal { trace!("Websocket -> Terminal : {:?}", msg); match msg { - TerminadoMessage::Stdin(io) => { + event::TerminadoMessage::Stdin(io) => { if let Err(e) = pty.write(io.as_ref()) { error!("Could not write to PTY: {}", e); ctx.stop(); } } - TerminadoMessage::Resize { cols, rows } => { + event::TerminadoMessage::Resize { cols, rows } => { info!("Resize: cols = {}, rows = {}", cols, rows); - if let Err(e) = (Resize { pty, cols, rows }).wait() { + if let Err(e) = event::Resize::new(pty, cols, rows).wait() { error!("Resize failed: {}", e); ctx.stop(); } } - TerminadoMessage::Stdout(_) => { + event::TerminadoMessage::Stdout(_) => { error!("Invalid Terminado Message: Stdin cannot go to PTY") } }; diff --git a/src/terminado.rs b/src/terminado.rs index 042ad68..41bd660 100644 --- a/src/terminado.rs +++ b/src/terminado.rs @@ -8,7 +8,7 @@ use serde::ser::SerializeSeq; use serde::{Serialize, Serializer}; use serde_json; -use crate::IO; +use crate::event::IO; impl Message for TerminadoMessage { type Result = ();