From 7e93a5d3cfbe0254dcbf7aba9759e3173e97a3df Mon Sep 17 00:00:00 2001 From: rustdesk Date: Sun, 7 Jan 2024 19:01:35 +0800 Subject: [PATCH] refactor to prepare for tcp rendezvous --- src/client.rs | 85 +------------- src/common.rs | 90 ++++++++++++++- src/rendezvous_mediator.rs | 225 ++++++++++++++++++++++++------------- 3 files changed, 238 insertions(+), 162 deletions(-) diff --git a/src/client.rs b/src/client.rs index 725e61908..d35f8da1d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -41,7 +41,7 @@ use hbb_common::{ rendezvous_proto::*, socket_client, sodiumoxide::base64, - sodiumoxide::crypto::{box_, secretbox, sign}, + sodiumoxide::crypto::sign, tcp::FramedStream, timeout, tokio::time::Duration, @@ -57,7 +57,7 @@ use scrap::{ use crate::{ check_port, common::input::{MOUSE_BUTTON_LEFT, MOUSE_BUTTON_RIGHT, MOUSE_TYPE_DOWN, MOUSE_TYPE_UP}, - is_keyboard_mode_supported, + create_symmetric_key_msg, decode_id_pk, get_rs_pk, is_keyboard_mode_supported, secure_tcp, ui_session_interface::{InvokeUiSession, Session}, }; @@ -311,7 +311,7 @@ impl Client { if !key.is_empty() && !token.is_empty() { // mainly for the security of token - allow_err!(secure_punch_connection(&mut socket, key).await); + allow_err!(secure_tcp(&mut socket, key).await); } let start = std::time::Instant::now(); @@ -620,7 +620,7 @@ impl Client { if !key.is_empty() && !token.is_empty() { // mainly for the security of token - allow_err!(secure_punch_connection(&mut socket, key).await); + allow_err!(secure_tcp(&mut socket, key).await); } ipv4 = socket.local_addr().is_ipv4(); @@ -2996,80 +2996,3 @@ pub fn check_if_retry(msgtype: &str, title: &str, text: &str, retry_for_relay: b && !text.to_lowercase().contains("manually") && !text.to_lowercase().contains("not allowed"))) } - -#[inline] -fn get_pk(pk: &[u8]) -> Option<[u8; 32]> { - if pk.len() == 32 { - let mut tmp = [0u8; 32]; - tmp[..].copy_from_slice(&pk); - Some(tmp) - } else { - None - } -} - -#[inline] -fn get_rs_pk(str_base64: &str) -> Option { - if let Ok(pk) = crate::decode64(str_base64) { - get_pk(&pk).map(|x| sign::PublicKey(x)) - } else { - None - } -} - -fn decode_id_pk(signed: &[u8], key: &sign::PublicKey) -> ResultType<(String, [u8; 32])> { - let res = IdPk::parse_from_bytes( - &sign::verify(signed, key).map_err(|_| anyhow!("Signature mismatch"))?, - )?; - if let Some(pk) = get_pk(&res.pk) { - Ok((res.id, pk)) - } else { - bail!("Wrong their public length"); - } -} - -fn create_symmetric_key_msg(their_pk_b: [u8; 32]) -> (Bytes, Bytes, secretbox::Key) { - let their_pk_b = box_::PublicKey(their_pk_b); - let (our_pk_b, out_sk_b) = box_::gen_keypair(); - let key = secretbox::gen_key(); - let nonce = box_::Nonce([0u8; box_::NONCEBYTES]); - let sealed_key = box_::seal(&key.0, &nonce, &their_pk_b, &out_sk_b); - (Vec::from(our_pk_b.0).into(), sealed_key.into(), key) -} - -async fn secure_punch_connection(conn: &mut FramedStream, key: &str) -> ResultType<()> { - let rs_pk = get_rs_pk(key); - let Some(rs_pk) = rs_pk else { - bail!("Handshake failed: invalid public key from rendezvous server"); - }; - match timeout(READ_TIMEOUT, conn.next()).await? { - Some(Ok(bytes)) => { - if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { - match msg_in.union { - Some(rendezvous_message::Union::KeyExchange(ex)) => { - if ex.keys.len() != 1 { - bail!("Handshake failed: invalid key exchange message"); - } - let their_pk_b = sign::verify(&ex.keys[0], &rs_pk) - .map_err(|_| anyhow!("Signature mismatch in key exchange"))?; - let (asymmetric_value, symmetric_value, key) = create_symmetric_key_msg( - get_pk(&their_pk_b) - .context("Wrong their public length in key exchange")?, - ); - let mut msg_out = RendezvousMessage::new(); - msg_out.set_key_exchange(KeyExchange { - keys: vec![asymmetric_value, symmetric_value], - ..Default::default() - }); - timeout(CONNECT_TIMEOUT, conn.send(&msg_out)).await??; - conn.set_key(key); - log::info!("Token secured"); - } - _ => {} - } - } - } - _ => {} - } - Ok(()) -} diff --git a/src/common.rs b/src/common.rs index 7e872ff7e..b5af73d1a 100644 --- a/src/common.rs +++ b/src/common.rs @@ -127,6 +127,9 @@ impl ClipboardContext { use hbb_common::compress::decompress; use hbb_common::{ allow_err, + anyhow::{anyhow, Context}, + bail, + bytes::Bytes, compress::compress as compress_func, config::{self, Config, CONNECT_TIMEOUT, READ_TIMEOUT}, get_version_number, log, @@ -135,8 +138,9 @@ use hbb_common::{ protobuf::Message as _, rendezvous_proto::*, socket_client, + sodiumoxide::crypto::{box_, secretbox, sign}, tcp::FramedStream, - tokio, ResultType, + timeout, tokio, ResultType, }; // #[cfg(any(target_os = "android", target_os = "ios", feature = "cli"))] use hbb_common::{config::RENDEZVOUS_PORT, futures::future::join_all}; @@ -1076,7 +1080,10 @@ pub fn make_privacy_mode_msg_with_details( } #[inline] -pub fn make_privacy_mode_msg(state: back_notification::PrivacyModeState, impl_key: String) -> Message { +pub fn make_privacy_mode_msg( + state: back_notification::PrivacyModeState, + impl_key: String, +) -> Message { make_privacy_mode_msg_with_details(state, "".to_owned(), impl_key) } @@ -1165,7 +1172,7 @@ pub async fn get_key(sync: bool) -> String { let mut options = crate::ipc::get_options_async().await; options.remove("key").unwrap_or_default() }; - if key.is_empty() && !option_env!("RENDEZVOUS_SERVER").unwrap_or("").is_empty() { + if key.is_empty() { key = config::RS_PUB_KEY.to_owned(); } key @@ -1244,3 +1251,80 @@ pub fn check_process(arg: &str, same_uid: bool) -> bool { } false } + +pub async fn secure_tcp(conn: &mut FramedStream, key: &str) -> ResultType<()> { + let rs_pk = get_rs_pk(key); + let Some(rs_pk) = rs_pk else { + bail!("Handshake failed: invalid public key from rendezvous server"); + }; + match timeout(READ_TIMEOUT, conn.next()).await? { + Some(Ok(bytes)) => { + if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { + match msg_in.union { + Some(rendezvous_message::Union::KeyExchange(ex)) => { + if ex.keys.len() != 1 { + bail!("Handshake failed: invalid key exchange message"); + } + let their_pk_b = sign::verify(&ex.keys[0], &rs_pk) + .map_err(|_| anyhow!("Signature mismatch in key exchange"))?; + let (asymmetric_value, symmetric_value, key) = create_symmetric_key_msg( + get_pk(&their_pk_b) + .context("Wrong their public length in key exchange")?, + ); + let mut msg_out = RendezvousMessage::new(); + msg_out.set_key_exchange(KeyExchange { + keys: vec![asymmetric_value, symmetric_value], + ..Default::default() + }); + timeout(CONNECT_TIMEOUT, conn.send(&msg_out)).await??; + conn.set_key(key); + log::info!("Token secured"); + } + _ => {} + } + } + } + _ => {} + } + Ok(()) +} + +#[inline] +fn get_pk(pk: &[u8]) -> Option<[u8; 32]> { + if pk.len() == 32 { + let mut tmp = [0u8; 32]; + tmp[..].copy_from_slice(&pk); + Some(tmp) + } else { + None + } +} + +#[inline] +pub fn get_rs_pk(str_base64: &str) -> Option { + if let Ok(pk) = crate::decode64(str_base64) { + get_pk(&pk).map(|x| sign::PublicKey(x)) + } else { + None + } +} + +pub fn decode_id_pk(signed: &[u8], key: &sign::PublicKey) -> ResultType<(String, [u8; 32])> { + let res = IdPk::parse_from_bytes( + &sign::verify(signed, key).map_err(|_| anyhow!("Signature mismatch"))?, + )?; + if let Some(pk) = get_pk(&res.pk) { + Ok((res.id, pk)) + } else { + bail!("Wrong their public length"); + } +} + +pub fn create_symmetric_key_msg(their_pk_b: [u8; 32]) -> (Bytes, Bytes, secretbox::Key) { + let their_pk_b = box_::PublicKey(their_pk_b); + let (our_pk_b, out_sk_b) = box_::gen_keypair(); + let key = secretbox::gen_key(); + let nonce = box_::Nonce([0u8; box_::NONCEBYTES]); + let sealed_key = box_::seal(&key.0, &nonce, &their_pk_b, &out_sk_b); + (Vec::from(our_pk_b.0).into(), sealed_key.into(), key) +} diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index a2b5f8101..d7c8788a8 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -9,29 +9,34 @@ use std::{ use uuid::Uuid; -use hbb_common::tcp::FramedStream; use hbb_common::{ allow_err, - anyhow::bail, + anyhow::{self, bail}, config::{Config, CONNECT_TIMEOUT, READ_TIMEOUT, REG_INTERVAL, RENDEZVOUS_PORT}, futures::future::join_all, log, protobuf::Message as _, rendezvous_proto::*, sleep, - socket_client::{self, is_ipv4}, + socket_client::{self, connect_tcp, is_ipv4}, + tcp::FramedStream, tokio::{ self, select, time::{interval, Duration}, }, udp::FramedSocket, - AddrMangle, ResultType, + AddrMangle, IntoTargetAddr, ResultType, TargetAddr, }; -use crate::server::{check_zombie, new as new_server, ServerPtr}; +use crate::{ + check_port, + server::{check_zombie, new as new_server, ServerPtr}, +}; type Message = RendezvousMessage; +const TIMER_OUT: Duration = Duration::from_secs(1); + lazy_static::lazy_static! { static ref SOLVING_PK_MISMATCH: Arc> = Default::default(); } @@ -39,7 +44,7 @@ static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); #[derive(Clone)] pub struct RendezvousMediator { - addr: hbb_common::tokio_socks::TargetAddr<'static>, + addr: TargetAddr<'static>, host: String, host_prefix: String, last_id_pk_registry: String, @@ -112,8 +117,7 @@ impl RendezvousMediator { // crate::platform::linux_desktop_manager::stop_xdesktop(); } - pub async fn start(server: ServerPtr, host: String) -> ResultType<()> { - log::info!("start rendezvous mediator of {}", host); + pub async fn start_udp(server: ServerPtr, host: String) -> ResultType<()> { let host_prefix: String = host .split(".") .next() @@ -125,16 +129,15 @@ impl RendezvousMediator { } }) .unwrap_or(host.to_owned()); - let host = crate::check_port(&host, RENDEZVOUS_PORT); + let host = check_port(&host, RENDEZVOUS_PORT); let (mut socket, addr) = socket_client::new_udp_for(&host, CONNECT_TIMEOUT).await?; let mut rz = Self { - addr: addr, + addr: addr.clone(), host: host.clone(), host_prefix, last_id_pk_registry: "".to_owned(), }; - const TIMER_OUT: Duration = Duration::from_secs(1); let mut timer = interval(TIMER_OUT); let mut last_timer: Option = None; const REG_TIMEOUT: i64 = 3_000; @@ -177,63 +180,8 @@ impl RendezvousMediator { n = socket.next() => { match n { Some(Ok((bytes, _))) => { - if let Ok(msg_in) = Message::parse_from_bytes(&bytes) { - match msg_in.union { - Some(rendezvous_message::Union::RegisterPeerResponse(rpr)) => { - update_latency(); - if rpr.request_pk { - log::info!("request_pk received from {}", host); - allow_err!(rz.register_pk(&mut socket).await); - continue; - } - } - Some(rendezvous_message::Union::RegisterPkResponse(rpr)) => { - update_latency(); - match rpr.result.enum_value() { - Ok(register_pk_response::Result::OK) => { - Config::set_key_confirmed(true); - Config::set_host_key_confirmed(&rz.host_prefix, true); - *SOLVING_PK_MISMATCH.lock().unwrap() = "".to_owned(); - } - Ok(register_pk_response::Result::UUID_MISMATCH) => { - allow_err!(rz.handle_uuid_mismatch(&mut socket).await); - } - _ => { - log::error!("unknown RegisterPkResponse"); - } - } - } - Some(rendezvous_message::Union::PunchHole(ph)) => { - let rz = rz.clone(); - let server = server.clone(); - tokio::spawn(async move { - allow_err!(rz.handle_punch_hole(ph, server).await); - }); - } - Some(rendezvous_message::Union::RequestRelay(rr)) => { - let rz = rz.clone(); - let server = server.clone(); - tokio::spawn(async move { - allow_err!(rz.handle_request_relay(rr, server).await); - }); - } - Some(rendezvous_message::Union::FetchLocalAddr(fla)) => { - let rz = rz.clone(); - let server = server.clone(); - tokio::spawn(async move { - allow_err!(rz.handle_intranet(fla, server).await); - }); - } - Some(rendezvous_message::Union::ConfigureUpdate(cu)) => { - let v0 = Config::get_rendezvous_servers(); - Config::set_option("rendezvous-servers".to_owned(), cu.rendezvous_servers.join(",")); - Config::set_serial(cu.serial); - if v0 != Config::get_rendezvous_servers() { - Self::restart(); - } - } - _ => {} - } + if let Ok(msg) = Message::parse_from_bytes(&bytes) { + rz.handle_resp(msg.union, Sink::Framed(&mut socket, &addr), &server, &mut update_latency).await?; } else { log::debug!("Non-protobuf message bytes received: {:?}", bytes); } @@ -257,7 +205,7 @@ impl RendezvousMediator { let elapsed_resp = last_register_resp.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL); let timeout = (elapsed_resp - last_register_sent.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL)) > REG_TIMEOUT; if timeout || elapsed_resp >= REG_INTERVAL { - allow_err!(rz.register_peer(&mut socket).await); + allow_err!(rz.register_peer(Sink::Framed(&mut socket, &addr)).await); last_register_sent = now; if timeout { fails += 1; @@ -285,6 +233,113 @@ impl RendezvousMediator { Ok(()) } + #[inline] + async fn handle_resp( + &mut self, + msg: Option, + sink: Sink<'_>, + server: &ServerPtr, + update_latency: &mut impl FnMut(), + ) -> ResultType<()> { + match msg { + Some(rendezvous_message::Union::RegisterPeerResponse(rpr)) => { + update_latency(); + if rpr.request_pk { + log::info!("request_pk received from {}", self.host); + allow_err!(self.register_pk(sink).await); + } + } + Some(rendezvous_message::Union::RegisterPkResponse(rpr)) => { + update_latency(); + match rpr.result.enum_value() { + Ok(register_pk_response::Result::OK) => { + Config::set_key_confirmed(true); + Config::set_host_key_confirmed(&self.host_prefix, true); + *SOLVING_PK_MISMATCH.lock().unwrap() = "".to_owned(); + } + Ok(register_pk_response::Result::UUID_MISMATCH) => { + allow_err!(self.handle_uuid_mismatch(sink).await); + } + _ => { + log::error!("unknown RegisterPkResponse"); + } + } + } + Some(rendezvous_message::Union::PunchHole(ph)) => { + let rz = self.clone(); + let server = server.clone(); + tokio::spawn(async move { + allow_err!(rz.handle_punch_hole(ph, server).await); + }); + } + Some(rendezvous_message::Union::RequestRelay(rr)) => { + let rz = self.clone(); + let server = server.clone(); + tokio::spawn(async move { + allow_err!(rz.handle_request_relay(rr, server).await); + }); + } + Some(rendezvous_message::Union::FetchLocalAddr(fla)) => { + let rz = self.clone(); + let server = server.clone(); + tokio::spawn(async move { + allow_err!(rz.handle_intranet(fla, server).await); + }); + } + Some(rendezvous_message::Union::ConfigureUpdate(cu)) => { + let v0 = Config::get_rendezvous_servers(); + Config::set_option( + "rendezvous-servers".to_owned(), + cu.rendezvous_servers.join(","), + ); + Config::set_serial(cu.serial); + if v0 != Config::get_rendezvous_servers() { + Self::restart(); + } + } + _ => {} + } + Ok(()) + } + + pub async fn start_tcp(server: ServerPtr, host: String) -> ResultType<()> { + let mut conn = connect_tcp(check_port(&host, RENDEZVOUS_PORT), CONNECT_TIMEOUT).await?; + let key = crate::get_key(true).await; + crate::secure_tcp(&mut conn, &key).await?; + let mut rz = Self { + addr: conn.local_addr().into_target_addr()?, + host: host.clone(), + host_prefix: host.clone(), + last_id_pk_registry: "".to_owned(), + }; + let mut timer = interval(TIMER_OUT); + loop { + let mut update_latency = || {}; + select! { + res = conn.next() => { + let bytes = res.ok_or_else(|| anyhow::anyhow!("rendezvous server disconnected"))??; + let msg = Message::parse_from_bytes(&bytes)?; + rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await? + } + _ = timer.tick() => { + if SHOULD_EXIT.load(Ordering::SeqCst) { + break; + } + } + } + } + Ok(()) + } + + pub async fn start(server: ServerPtr, host: String) -> ResultType<()> { + log::info!("start rendezvous mediator of {}", host); + if cfg!(debug_assertions) && option_env!("TEST_TCP").is_some() { + Self::start_tcp(server, host).await + } else { + Self::start_udp(server, host).await + } + } + async fn handle_request_relay(&self, rr: RequestRelay, server: ServerPtr) -> ResultType<()> { self.create_relay( rr.socket_addr.into(), @@ -315,7 +370,7 @@ impl RendezvousMediator { secure, ); - let mut socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?; + let mut socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?; let mut msg_out = Message::new(); let mut rr = RelayResponse { @@ -360,7 +415,7 @@ impl RendezvousMediator { } let peer_addr = AddrMangle::decode(&fla.socket_addr); log::debug!("Handle intranet from {:?}", peer_addr); - let mut socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?; + let mut socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?; let local_addr = socket.local_addr(); let local_addr: SocketAddr = format!("{}:{}", local_addr.ip(), local_addr.port()).parse()?; @@ -399,7 +454,7 @@ impl RendezvousMediator { let peer_addr = AddrMangle::decode(&ph.socket_addr); log::debug!("Punch hole to {:?}", peer_addr); let mut socket = { - let socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?; + let socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?; let local_addr = socket.local_addr(); // key important here for punch hole to tell my gateway incoming peer is safe. // it can not be async here, because local_addr can not be reused, we must close the connection before use it again. @@ -423,7 +478,7 @@ impl RendezvousMediator { Ok(()) } - async fn register_pk(&mut self, socket: &mut FramedSocket) -> ResultType<()> { + async fn register_pk(&mut self, socket: Sink<'_>) -> ResultType<()> { let mut msg_out = Message::new(); let pk = Config::get_key_pair().1; let uuid = hbb_common::get_uuid(); @@ -435,11 +490,11 @@ impl RendezvousMediator { pk: pk.into(), ..Default::default() }); - socket.send(&msg_out, self.addr.to_owned()).await?; + socket.send(&msg_out).await?; Ok(()) } - async fn handle_uuid_mismatch(&mut self, socket: &mut FramedSocket) -> ResultType<()> { + async fn handle_uuid_mismatch(&mut self, socket: Sink<'_>) -> ResultType<()> { if self.last_id_pk_registry != Config::get_id() { return Ok(()); } @@ -457,7 +512,7 @@ impl RendezvousMediator { self.register_pk(socket).await } - async fn register_peer(&mut self, socket: &mut FramedSocket) -> ResultType<()> { + async fn register_peer(&mut self, socket: Sink<'_>) -> ResultType<()> { if !SOLVING_PK_MISMATCH.lock().unwrap().is_empty() { return Ok(()); } @@ -481,7 +536,7 @@ impl RendezvousMediator { serial, ..Default::default() }); - socket.send(&msg_out, self.addr.to_owned()).await?; + socket.send(&msg_out).await?; Ok(()) } @@ -622,7 +677,7 @@ async fn create_online_stream() -> ResultType { bail!("Invalid server address: {}", rendezvous_server); } let online_server = format!("{}:{}", tmp[0], port - 1); - socket_client::connect_tcp(online_server, CONNECT_TIMEOUT).await + connect_tcp(online_server, CONNECT_TIMEOUT).await } async fn query_online_states_( @@ -680,6 +735,20 @@ async fn query_online_states_( } } +enum Sink<'a> { + Framed(&'a mut FramedSocket, &'a TargetAddr<'a>), + Stream(&'a mut FramedStream), +} + +impl Sink<'_> { + async fn send(self, msg: &Message) -> ResultType<()> { + match self { + Sink::Framed(socket, addr) => socket.send(msg, addr.to_owned()).await, + Sink::Stream(stream) => stream.send(msg).await, + } + } +} + #[cfg(test)] mod tests { use hbb_common::tokio;