From 9651666d41238c7c9c51576338df4d53bab3a2f8 Mon Sep 17 00:00:00 2001 From: rustdesk Date: Sat, 15 Jan 2022 21:33:20 +0800 Subject: [PATCH] code refactory --- src/server/connection.rs | 66 ++++++++++------------------------------ 1 file changed, 16 insertions(+), 50 deletions(-) diff --git a/src/server/connection.rs b/src/server/connection.rs index 3912f6722..6b3c203ac 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -4,7 +4,7 @@ use hbb_common::{ config::Config, fs, futures::{SinkExt, StreamExt}, - protobuf, sleep, timeout, + sleep, timeout, tokio::{ net::TcpStream, sync::mpsc, @@ -61,7 +61,6 @@ pub struct Connection { disable_clipboard: bool, // by peer disable_audio: bool, // by peer tx_input: SyncSender, // handle input messages - rx_input_res: Receiver, } impl Subscriber for ConnInner { @@ -111,8 +110,8 @@ impl Connection { let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc)>(); let (tx_video, mut rx_video) = mpsc::unbounded_channel::<(Instant, Arc)>(); let (tx_input, rx_input) = sync_channel(1); - let (tx_input_res, rx_input_res) = sync_channel(1); + let tx_cloned = tx.clone(); let mut conn = Self { inner: ConnInner { id, @@ -141,7 +140,6 @@ impl Connection { disable_audio: false, disable_clipboard: false, tx_input, - rx_input_res, }; tokio::spawn(async move { if let Err(err) = start_ipc(rx_to_cm, tx_from_cm).await { @@ -172,7 +170,7 @@ impl Connection { }, ); - let handler_input = std::thread::spawn(move || Self::handle_input(rx_input, tx_input_res)); + let handler_input = std::thread::spawn(move || Self::handle_input(rx_input, tx_cloned)); loop { tokio::select! { @@ -314,7 +312,15 @@ impl Connection { } } + video_service::notify_video_frame_feched(id, None); + super::video_service::update_test_latency(id, 0); + super::video_service::update_image_quality(id, None); + if let Err(err) = conn.try_port_forward_loop(&mut rx_from_cm).await { + conn.on_close(&err.to_string(), false); + } + conn.tx_input.send(MessageInput::Exit).ok(); + // join at the end so that not blocking video if let Err(e) = handler_input.join() { log::error!("Failed to join input thread, {:?}", e); } else { @@ -323,16 +329,9 @@ impl Connection { let _ = crate::platform::block_input(false); crate::platform::toggle_blank_screen(false); - - video_service::notify_video_frame_feched(id, None); - super::video_service::update_test_latency(id, 0); - super::video_service::update_image_quality(id, None); - if let Err(err) = conn.try_port_forward_loop(&mut rx_from_cm).await { - conn.on_close(&err.to_string(), false); - } } - fn handle_input(receiver: Receiver, tx_input_res: SyncSender) { + fn handle_input(receiver: Receiver, tx: Sender) { let mut block_input_mode = false; let (tx_blank, rx_blank) = sync_channel(1); @@ -347,34 +346,26 @@ impl Connection { MessageInput::BlockOn => { if crate::platform::block_input(true) { block_input_mode = true; - tx_input_res.send(true).ok(); } else { - tx_input_res.send(false).ok(); + Self::send_option_error(&tx, "Failed to turn on block input mode"); } } MessageInput::BlockOff => { if crate::platform::block_input(false) { block_input_mode = false; - tx_input_res.send(true).ok(); } else { - tx_input_res.send(false).ok(); + Self::send_option_error(&tx, "Failed to turn off block input mode"); } } MessageInput::PrivacyOn => { if crate::platform::block_input(true) { block_input_mode = true; - tx_input_res.send(true).ok(); - } else { - tx_input_res.send(false).ok(); } tx_blank.send(MessageInput::PrivacyOn).ok(); } MessageInput::PrivacyOff => { if crate::platform::block_input(false) { block_input_mode = false; - tx_input_res.send(true).ok(); - } else { - tx_input_res.send(false).ok(); } tx_blank.send(MessageInput::PrivacyOff).ok(); } @@ -651,16 +642,15 @@ impl Connection { self.send(msg_out).await; } - async fn send_option_error(&mut self, err: T, opt: OptionMessage) { + fn send_option_error(s: &Sender, err: T) { let mut msg_out = Message::new(); let mut res = OptionResponse::new(); let mut misc = Misc::new(); - res.opt = protobuf::MessageField::some(opt); res.error = err.to_string(); misc.set_option_response(res); msg_out.set_misc(misc); - self.send(msg_out).await; + s.send((Instant::now(), Arc::new(msg_out))).ok(); } async fn on_message(&mut self, msg: Message) -> bool { @@ -1006,22 +996,10 @@ impl Connection { BoolOption::Yes => { self.privacy_mode = true; self.tx_input.send(MessageInput::PrivacyOn).ok(); - if self.rx_input_res.recv().unwrap() { - log::info!("Privacy mode on"); - } else { - log::error!("Failed to trun on privacy mode"); - self.send_option_error("Failed to turn on privacy mode", o.clone()) - .await; - } } BoolOption::No => { self.privacy_mode = false; self.tx_input.send(MessageInput::PrivacyOff).ok(); - if self.rx_input_res.recv().unwrap() { - log::info!("Privacy mode off"); - } else { - log::warn!("Failed to trun off privacy mode") - } } _ => {} } @@ -1032,21 +1010,9 @@ impl Connection { match q { BoolOption::Yes => { self.tx_input.send(MessageInput::BlockOn).ok(); - if self.rx_input_res.recv().unwrap() { - log::info!("Block input mode on"); - } else { - log::error!("Failed to trun on block input mode"); - self.send_option_error("Failed to turn on block input mode", o.clone()) - .await; - } } BoolOption::No => { self.tx_input.send(MessageInput::BlockOff).ok(); - if self.rx_input_res.recv().unwrap() { - log::info!("Block input mode off"); - } else { - log::error!("Failed to trun off block input mode"); - } } _ => {} }