From 7fcb3d70bb987e4b92e1d2ca2c226e500be8cab8 Mon Sep 17 00:00:00 2001 From: dignow Date: Sat, 30 Sep 2023 22:07:14 +0800 Subject: [PATCH] fix, reconnect deadlock, introduce connection round control Signed-off-by: dignow --- src/client/io_loop.rs | 21 ++++++-- src/flutter.rs | 3 +- src/plugin/native_handlers/session.rs | 9 ++-- src/ui_session_interface.rs | 73 +++++++++++++++++++++++++-- 4 files changed, 94 insertions(+), 12 deletions(-) diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 254f72583..3e9465368 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -106,7 +106,7 @@ impl Remote { } } - pub async fn io_loop(&mut self, key: &str, token: &str) { + pub async fn io_loop(&mut self, key: &str, token: &str, round: u32) { let mut last_recv_time = Instant::now(); let mut received = false; let conn_type = if self.handler.is_file_transfer() { @@ -125,6 +125,11 @@ impl Remote { .await { Ok((mut peer, direct, pk)) => { + self.handler + .connection_round_state + .lock() + .unwrap() + .set_connected(); self.handler.set_connection_type(peer.is_secured(), direct); // flutter -> connection_ready self.handler.update_direct(Some(direct)); if conn_type == ConnType::DEFAULT_CONN { @@ -245,11 +250,21 @@ impl Remote { self.handler.on_establish_connection_error(err.to_string()); } } + // set_disconnected_ok is used to check if new connection round is started. + let set_disconnected_ok = self + .handler + .connection_round_state + .lock() + .unwrap() + .set_disconnected(round); + #[cfg(not(any(target_os = "android", target_os = "ios")))] - Client::try_stop_clipboard(&self.handler.session_id); + if set_disconnected_ok { + Client::try_stop_clipboard(&self.handler.session_id); + } #[cfg(windows)] - { + if set_disconnected_ok { let conn_id = self.client_conn_id; ContextSend::proc(|context: &mut CliprdrClientContext| -> u32 { empty_clipboard(context, conn_id); diff --git a/src/flutter.rs b/src/flutter.rs index 4440fe0c8..0f57e0a73 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -805,7 +805,8 @@ pub fn session_start_( if !is_pre_added { let session = session.clone(); std::thread::spawn(move || { - io_loop(session); + let round = session.connection_round_state.lock().unwrap().new_round(); + io_loop(session, round); }); } Ok(()) diff --git a/src/plugin/native_handlers/session.rs b/src/plugin/native_handlers/session.rs index fda07cd17..52edd3300 100644 --- a/src/plugin/native_handlers/session.rs +++ b/src/plugin/native_handlers/session.rs @@ -8,8 +8,10 @@ use std::{ use flutter_rust_bridge::StreamSink; use crate::{ - define_method_prefix, flutter::FlutterHandler, flutter_ffi::EventToUI, - plugin::MSG_TO_UI_TYPE_PLUGIN_EVENT, ui_session_interface::Session, + define_method_prefix, + flutter::FlutterHandler, + flutter_ffi::EventToUI, + ui_session_interface::{ConnectionState, Session}, }; const MSG_TO_UI_TYPE_SESSION_CREATED: &str = "session_created"; @@ -61,7 +63,8 @@ impl PluginNativeHandler for PluginNativeSessionHandler { let sessions = SESSION_HANDLER.sessions.read().unwrap(); for session in sessions.iter() { if session.id == id { - crate::ui_session_interface::io_loop(session.clone()); + let round = session.connection_round_state.lock().unwrap().new_round(); + crate::ui_session_interface::io_loop(session.clone(), round); } } } diff --git a/src/ui_session_interface.rs b/src/ui_session_interface.rs index 5d3ecb7a6..9a07782b9 100644 --- a/src/ui_session_interface.rs +++ b/src/ui_session_interface.rs @@ -62,6 +62,7 @@ pub struct Session { pub server_file_transfer_enabled: Arc>, pub server_clipboard_enabled: Arc>, pub last_change_display: Arc>, + pub connection_round_state: Arc>, } #[derive(Clone)] @@ -79,6 +80,56 @@ pub struct ChangeDisplayRecord { height: i32, } +enum ConnectionState { + Connecting, + Connected, + Disconnected, +} + +/// ConnectionRoundState is used to control the reconnecting logic. +pub struct ConnectionRoundState { + round: u32, + state: ConnectionState, +} + +impl ConnectionRoundState { + pub fn new_round(&mut self) -> u32 { + self.round += 1; + self.state = ConnectionState::Connecting; + self.round + } + + pub fn set_connected(&mut self) { + self.state = ConnectionState::Connected; + } + + pub fn is_round_gt(&self, round: u32) -> bool { + if round == u32::MAX && self.round == 0 { + true + } else { + round < self.round + } + } + + pub fn set_disconnected(&mut self, round: u32) -> bool { + if self.is_round_gt(round) { + false + } else { + self.state = ConnectionState::Disconnected; + true + } + } +} + +impl Default for ConnectionRoundState { + fn default() -> Self { + Self { + round: 0, + state: ConnectionState::Connecting, + } + } +} + impl Default for ChangeDisplayRecord { fn default() -> Self { Self { @@ -833,16 +884,28 @@ impl Session { } pub fn reconnect(&self, force_relay: bool) { - self.send(Data::Close); + // 1. If current session is connecting, do not reconnect. + // 2. If the connection is established, send `Data::Close`. + // 3. If the connection is disconnected, do nothing. + let mut connection_round_state_lock = self.connection_round_state.lock().unwrap(); + match connection_round_state_lock.state { + ConnectionState::Connecting => return, + ConnectionState::Connected => self.send(Data::Close), + ConnectionState::Disconnected => {} + } + let round = connection_round_state_lock.new_round(); + drop(connection_round_state_lock); + let cloned = self.clone(); // override only if true if true == force_relay { cloned.lc.write().unwrap().force_relay = true; } let mut lock = self.thread.lock().unwrap(); - lock.take().map(|t| t.join()); + // No need to join the previous thread, because it will exit automatically. + // And the previous thread will not change important states. *lock = Some(std::thread::spawn(move || { - io_loop(cloned); + io_loop(cloned, round); })); } @@ -1283,7 +1346,7 @@ impl Session { } #[tokio::main(flavor = "current_thread")] -pub async fn io_loop(handler: Session) { +pub async fn io_loop(handler: Session, round: u32) { // It is ok to call this function multiple times. #[cfg(target_os = "windows")] if !handler.is_file_transfer() && !handler.is_port_forward() { @@ -1402,7 +1465,7 @@ pub async fn io_loop(handler: Session) { frame_count, decode_fps, ); - remote.io_loop(&key, &token).await; + remote.io_loop(&key, &token, round).await; remote.sync_jobs_status_to_local().await; }