From f7b35defc9c57198db1e3c26311b4d1735c12345 Mon Sep 17 00:00:00 2001 From: rustdesk Date: Tue, 9 Jan 2024 22:41:11 +0800 Subject: [PATCH] finish tcp rendezvous keep alive logic following mqtt, but defined by server so that it can be easily to be controlled at server side. --- libs/hbb_common/protos/rendezvous.proto | 1 + libs/hbb_common/src/tcp.rs | 3 ++ src/common.rs | 2 +- src/rendezvous_mediator.rs | 50 +++++++++++++++++++++++-- 4 files changed, 51 insertions(+), 5 deletions(-) diff --git a/libs/hbb_common/protos/rendezvous.proto b/libs/hbb_common/protos/rendezvous.proto index 5d57b083d..fac9aa433 100644 --- a/libs/hbb_common/protos/rendezvous.proto +++ b/libs/hbb_common/protos/rendezvous.proto @@ -73,6 +73,7 @@ message RegisterPkResponse { SERVER_ERROR = 7; } Result result = 1; + int32 keep_alive = 2; } message PunchHoleResponse { diff --git a/libs/hbb_common/src/tcp.rs b/libs/hbb_common/src/tcp.rs index 351955825..88ff43a22 100644 --- a/libs/hbb_common/src/tcp.rs +++ b/libs/hbb_common/src/tcp.rs @@ -330,6 +330,9 @@ impl Encrypt { } pub fn dec(&mut self, bytes: &mut BytesMut) -> Result<(), Error> { + if bytes.is_empty() { + return Ok(()); + } self.2 += 1; let nonce = FramedStream::get_nonce(self.2); match secretbox::open(bytes, &nonce, &self.0) { diff --git a/src/common.rs b/src/common.rs index b5af73d1a..09ebb6ba2 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1278,7 +1278,7 @@ pub async fn secure_tcp(conn: &mut FramedStream, key: &str) -> ResultType<()> { }); timeout(CONNECT_TIMEOUT, conn.send(&msg_out)).await??; conn.set_key(key); - log::info!("Token secured"); + log::info!("Connection secured"); } _ => {} } diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 1a9b33897..50d9ac1e8 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -37,22 +37,26 @@ use crate::{ type Message = RendezvousMessage; const TIMER_OUT: Duration = Duration::from_secs(1); +const DEFAULT_KEEP_ALIVE: i32 = 60_000; lazy_static::lazy_static! { static ref SOLVING_PK_MISMATCH: Arc> = Default::default(); } static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); +static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false); #[derive(Clone)] pub struct RendezvousMediator { addr: TargetAddr<'static>, host: String, host_prefix: String, + keep_alive: i32, } impl RendezvousMediator { pub fn restart() { SHOULD_EXIT.store(true, Ordering::SeqCst); + MANUAL_RESTARTED.store(true, Ordering::SeqCst); log::info!("server restart"); } @@ -83,7 +87,7 @@ impl RendezvousMediator { #[cfg(not(any(feature = "flatpak", feature = "appimage")))] crate::platform::linux_desktop_manager::start_xdesktop(); loop { - Config::reset_online(); + let conn_start_time = Instant::now(); *SOLVING_PK_MISMATCH.lock().await = "".to_owned(); if Config::get_option("stop-service").is_empty() && !crate::platform::installing_service() @@ -95,6 +99,7 @@ impl RendezvousMediator { let mut futs = Vec::new(); let servers = Config::get_rendezvous_servers(); SHOULD_EXIT.store(false, Ordering::SeqCst); + MANUAL_RESTARTED.store(false, Ordering::SeqCst); for host in servers.clone() { let server = server.clone(); futs.push(tokio::spawn(async move { @@ -109,7 +114,13 @@ impl RendezvousMediator { } else { server.write().unwrap().close_connections(); } - sleep(1.).await; + Config::reset_online(); + if !MANUAL_RESTARTED.load(Ordering::SeqCst) { + let elapsed = conn_start_time.elapsed().as_millis() as u64; + if elapsed < CONNECT_TIMEOUT { + sleep(((CONNECT_TIMEOUT - elapsed) / 1000) as _).await; + } + } } // It should be better to call stop_xdesktop. // But for server, it also is Ok without calling this method. @@ -136,6 +147,7 @@ impl RendezvousMediator { addr: addr.clone(), host: host.clone(), host_prefix, + keep_alive: DEFAULT_KEEP_ALIVE, }; let mut timer = interval(TIMER_OUT); @@ -264,6 +276,10 @@ impl RendezvousMediator { log::error!("unknown RegisterPkResponse"); } } + if rpr.keep_alive > 0 { + self.keep_alive = rpr.keep_alive * 1000; + log::info!("keep_alive: {}ms", self.keep_alive); + } } Some(rendezvous_message::Union::PunchHole(ph)) => { let rz = self.clone(); @@ -310,13 +326,29 @@ impl RendezvousMediator { addr: conn.local_addr().into_target_addr()?, host: host.clone(), host_prefix: host.clone(), + keep_alive: DEFAULT_KEEP_ALIVE, }; let mut timer = interval(TIMER_OUT); + let mut last_register_sent: Option = None; + let mut last_recv_msg = Instant::now(); + // we won't support connecting to multiple rendzvous servers any more, so we can use a global variable here. + Config::set_host_key_confirmed(&host, false); loop { - let mut update_latency = || {}; + let mut update_latency = || { + let latency = last_register_sent + .map(|x| x.elapsed().as_micros() as i64) + .unwrap_or(0); + Config::update_latency(&host, latency); + log::debug!("Latency of {}: {}ms", host, latency as f64 / 1000.); + }; select! { res = conn.next() => { - let bytes = res.ok_or_else(|| anyhow::anyhow!("rendezvous server disconnected"))??; + last_recv_msg = Instant::now(); + let bytes = res.ok_or_else(|| anyhow::anyhow!("Rendezvous connection is reset by the peer"))??; + if bytes.is_empty() { + conn.send_bytes(bytes::Bytes::new()).await?; + continue; // heartbeat + } let msg = Message::parse_from_bytes(&bytes)?; rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await? } @@ -324,6 +356,16 @@ impl RendezvousMediator { if SHOULD_EXIT.load(Ordering::SeqCst) { break; } + // https://www.emqx.com/en/blog/mqtt-keep-alive + if last_recv_msg.elapsed().as_millis() as u64 > rz.keep_alive as u64 * 3 / 2 { + bail!("Rendezvous connection is timeout"); + } + if (!Config::get_key_confirmed() || + !Config::get_host_key_confirmed(&host)) && + last_register_sent.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL) >= REG_INTERVAL { + rz.register_pk(Sink::Stream(&mut conn)).await?; + last_register_sent = Some(Instant::now()); + } } } }