code refactory

This commit is contained in:
rustdesk 2022-01-15 21:33:20 +08:00
parent 6bd730bc67
commit 9651666d41

View File

@ -4,7 +4,7 @@ use hbb_common::{
config::Config,
fs,
futures::{SinkExt, StreamExt},
protobuf, sleep, timeout,
sleep, timeout,
tokio::{
net::TcpStream,
sync::mpsc,
@ -61,7 +61,6 @@ pub struct Connection {
disable_clipboard: bool, // by peer
disable_audio: bool, // by peer
tx_input: SyncSender<MessageInput>, // handle input messages
rx_input_res: Receiver<bool>,
}
impl Subscriber for ConnInner {
@ -111,8 +110,8 @@ impl Connection {
let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
let (tx_video, mut rx_video) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
let (tx_input, rx_input) = sync_channel(1);
let (tx_input_res, rx_input_res) = sync_channel(1);
let tx_cloned = tx.clone();
let mut conn = Self {
inner: ConnInner {
id,
@ -141,7 +140,6 @@ impl Connection {
disable_audio: false,
disable_clipboard: false,
tx_input,
rx_input_res,
};
tokio::spawn(async move {
if let Err(err) = start_ipc(rx_to_cm, tx_from_cm).await {
@ -172,7 +170,7 @@ impl Connection {
},
);
let handler_input = std::thread::spawn(move || Self::handle_input(rx_input, tx_input_res));
let handler_input = std::thread::spawn(move || Self::handle_input(rx_input, tx_cloned));
loop {
tokio::select! {
@ -314,7 +312,15 @@ impl Connection {
}
}
video_service::notify_video_frame_feched(id, None);
super::video_service::update_test_latency(id, 0);
super::video_service::update_image_quality(id, None);
if let Err(err) = conn.try_port_forward_loop(&mut rx_from_cm).await {
conn.on_close(&err.to_string(), false);
}
conn.tx_input.send(MessageInput::Exit).ok();
// join at the end so that not blocking video
if let Err(e) = handler_input.join() {
log::error!("Failed to join input thread, {:?}", e);
} else {
@ -323,16 +329,9 @@ impl Connection {
let _ = crate::platform::block_input(false);
crate::platform::toggle_blank_screen(false);
video_service::notify_video_frame_feched(id, None);
super::video_service::update_test_latency(id, 0);
super::video_service::update_image_quality(id, None);
if let Err(err) = conn.try_port_forward_loop(&mut rx_from_cm).await {
conn.on_close(&err.to_string(), false);
}
}
fn handle_input(receiver: Receiver<MessageInput>, tx_input_res: SyncSender<bool>) {
fn handle_input(receiver: Receiver<MessageInput>, tx: Sender) {
let mut block_input_mode = false;
let (tx_blank, rx_blank) = sync_channel(1);
@ -347,34 +346,26 @@ impl Connection {
MessageInput::BlockOn => {
if crate::platform::block_input(true) {
block_input_mode = true;
tx_input_res.send(true).ok();
} else {
tx_input_res.send(false).ok();
Self::send_option_error(&tx, "Failed to turn on block input mode");
}
}
MessageInput::BlockOff => {
if crate::platform::block_input(false) {
block_input_mode = false;
tx_input_res.send(true).ok();
} else {
tx_input_res.send(false).ok();
Self::send_option_error(&tx, "Failed to turn off block input mode");
}
}
MessageInput::PrivacyOn => {
if crate::platform::block_input(true) {
block_input_mode = true;
tx_input_res.send(true).ok();
} else {
tx_input_res.send(false).ok();
}
tx_blank.send(MessageInput::PrivacyOn).ok();
}
MessageInput::PrivacyOff => {
if crate::platform::block_input(false) {
block_input_mode = false;
tx_input_res.send(true).ok();
} else {
tx_input_res.send(false).ok();
}
tx_blank.send(MessageInput::PrivacyOff).ok();
}
@ -651,16 +642,15 @@ impl Connection {
self.send(msg_out).await;
}
async fn send_option_error<T: std::string::ToString>(&mut self, err: T, opt: OptionMessage) {
fn send_option_error<T: std::string::ToString>(s: &Sender, err: T) {
let mut msg_out = Message::new();
let mut res = OptionResponse::new();
let mut misc = Misc::new();
res.opt = protobuf::MessageField::some(opt);
res.error = err.to_string();
misc.set_option_response(res);
msg_out.set_misc(misc);
self.send(msg_out).await;
s.send((Instant::now(), Arc::new(msg_out))).ok();
}
async fn on_message(&mut self, msg: Message) -> bool {
@ -1006,22 +996,10 @@ impl Connection {
BoolOption::Yes => {
self.privacy_mode = true;
self.tx_input.send(MessageInput::PrivacyOn).ok();
if self.rx_input_res.recv().unwrap() {
log::info!("Privacy mode on");
} else {
log::error!("Failed to trun on privacy mode");
self.send_option_error("Failed to turn on privacy mode", o.clone())
.await;
}
}
BoolOption::No => {
self.privacy_mode = false;
self.tx_input.send(MessageInput::PrivacyOff).ok();
if self.rx_input_res.recv().unwrap() {
log::info!("Privacy mode off");
} else {
log::warn!("Failed to trun off privacy mode")
}
}
_ => {}
}
@ -1032,21 +1010,9 @@ impl Connection {
match q {
BoolOption::Yes => {
self.tx_input.send(MessageInput::BlockOn).ok();
if self.rx_input_res.recv().unwrap() {
log::info!("Block input mode on");
} else {
log::error!("Failed to trun on block input mode");
self.send_option_error("Failed to turn on block input mode", o.clone())
.await;
}
}
BoolOption::No => {
self.tx_input.send(MessageInput::BlockOff).ok();
if self.rx_input_res.recv().unwrap() {
log::info!("Block input mode off");
} else {
log::error!("Failed to trun off block input mode");
}
}
_ => {}
}