use std::{ net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, Arc, Mutex, }, time::Instant, }; use uuid::Uuid; use hbb_common::tcp::FramedStream; use hbb_common::{ allow_err, anyhow::bail, config::{Config, CONNECT_TIMEOUT, READ_TIMEOUT, REG_INTERVAL, RENDEZVOUS_PORT}, futures::future::join_all, log, protobuf::Message as _, rendezvous_proto::*, sleep, socket_client::{self, is_ipv4}, tokio::{ self, select, time::{interval, Duration}, }, udp::FramedSocket, AddrMangle, ResultType, }; use crate::server::{check_zombie, new as new_server, ServerPtr}; type Message = RendezvousMessage; lazy_static::lazy_static! { static ref SOLVING_PK_MISMATCH: Arc> = Default::default(); } static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); #[derive(Clone)] pub struct RendezvousMediator { addr: hbb_common::tokio_socks::TargetAddr<'static>, host: String, host_prefix: String, last_id_pk_registry: String, } impl RendezvousMediator { pub fn restart() { SHOULD_EXIT.store(true, Ordering::SeqCst); log::info!("server restart"); } pub async fn start_all() { crate::hbbs_http::sync::start(); let mut nat_tested = false; check_zombie(); let server = new_server(); if Config::get_nat_type() == NatType::UNKNOWN_NAT as i32 { crate::test_nat_type(); nat_tested = true; } if !Config::get_option("stop-service").is_empty() { crate::test_rendezvous_server(); } let server_cloned = server.clone(); tokio::spawn(async move { direct_server(server_cloned).await; }); #[cfg(not(any(target_os = "android", target_os = "ios")))] if crate::platform::is_installed() { std::thread::spawn(move || { allow_err!(super::lan::start_listening()); }); } // It is ok to run xdesktop manager when the headless function is not allowed. #[cfg(all(target_os = "linux", feature = "linux_headless"))] #[cfg(not(any(feature = "flatpak", feature = "appimage")))] crate::platform::linux_desktop_manager::start_xdesktop(); loop { Config::reset_online(); if Config::get_option("stop-service").is_empty() && !crate::platform::installing_service() { if !nat_tested { crate::test_nat_type(); nat_tested = true; } let mut futs = Vec::new(); let servers = Config::get_rendezvous_servers(); SHOULD_EXIT.store(false, Ordering::SeqCst); for host in servers.clone() { let server = server.clone(); futs.push(tokio::spawn(async move { if let Err(err) = Self::start(server, host).await { log::error!("rendezvous mediator error: {err}"); } // SHOULD_EXIT here is to ensure once one exits, the others also exit. SHOULD_EXIT.store(true, Ordering::SeqCst); })); } join_all(futs).await; } else { server.write().unwrap().close_connections(); } sleep(1.).await; } // It should be better to call stop_xdesktop. // But for server, it also is Ok without calling this method. // #[cfg(all(target_os = "linux", feature = "linux_headless"))] // #[cfg(not(any(feature = "flatpak", feature = "appimage")))] // crate::platform::linux_desktop_manager::stop_xdesktop(); } pub async fn start(server: ServerPtr, host: String) -> ResultType<()> { log::info!("start rendezvous mediator of {}", host); let host_prefix: String = host .split(".") .next() .map(|x| { if x.parse::().is_ok() { host.clone() } else { x.to_string() } }) .unwrap_or(host.to_owned()); let host = crate::check_port(&host, RENDEZVOUS_PORT); let (mut socket, addr) = socket_client::new_udp_for(&host, CONNECT_TIMEOUT).await?; let mut rz = Self { addr: addr, host: host.clone(), host_prefix, last_id_pk_registry: "".to_owned(), }; const TIMER_OUT: Duration = Duration::from_secs(1); let mut timer = interval(TIMER_OUT); let mut last_timer: Option = None; const REG_TIMEOUT: i64 = 3_000; const MAX_FAILS1: i64 = 3; const MAX_FAILS2: i64 = 6; const DNS_INTERVAL: i64 = 60_000; let mut fails = 0; let mut last_register_resp: Option = None; let mut last_register_sent: Option = None; let mut last_dns_check = Instant::now(); let mut old_latency = 0; let mut ema_latency = 0; loop { let mut update_latency = || { last_register_resp = Some(Instant::now()); fails = 0; let mut latency = last_register_sent .map(|x| x.elapsed().as_micros() as i64) .unwrap_or(0); if latency < 0 || latency > 1_000_000 { return; } if ema_latency == 0 { ema_latency = latency; } else { ema_latency = latency / 30 + (ema_latency * 29 / 30); latency = ema_latency; } let mut n = latency / 5; if n < 3000 { n = 3000; } if (latency - old_latency).abs() > n || old_latency <= 0 { Config::update_latency(&host, latency); log::debug!("Latency of {}: {}ms", host, latency as f64 / 1000.); old_latency = latency; } }; select! { n = socket.next() => { match n { Some(Ok((bytes, _))) => { if let Ok(msg_in) = Message::parse_from_bytes(&bytes) { match msg_in.union { Some(rendezvous_message::Union::RegisterPeerResponse(rpr)) => { update_latency(); if rpr.request_pk { log::info!("request_pk received from {}", host); allow_err!(rz.register_pk(&mut socket).await); continue; } } Some(rendezvous_message::Union::RegisterPkResponse(rpr)) => { update_latency(); match rpr.result.enum_value() { Ok(register_pk_response::Result::OK) => { Config::set_key_confirmed(true); Config::set_host_key_confirmed(&rz.host_prefix, true); *SOLVING_PK_MISMATCH.lock().unwrap() = "".to_owned(); } Ok(register_pk_response::Result::UUID_MISMATCH) => { allow_err!(rz.handle_uuid_mismatch(&mut socket).await); } _ => { log::error!("unknown RegisterPkResponse"); } } } Some(rendezvous_message::Union::PunchHole(ph)) => { let rz = rz.clone(); let server = server.clone(); tokio::spawn(async move { allow_err!(rz.handle_punch_hole(ph, server).await); }); } Some(rendezvous_message::Union::RequestRelay(rr)) => { let rz = rz.clone(); let server = server.clone(); tokio::spawn(async move { allow_err!(rz.handle_request_relay(rr, server).await); }); } Some(rendezvous_message::Union::FetchLocalAddr(fla)) => { let rz = rz.clone(); let server = server.clone(); tokio::spawn(async move { allow_err!(rz.handle_intranet(fla, server).await); }); } Some(rendezvous_message::Union::ConfigureUpdate(cu)) => { let v0 = Config::get_rendezvous_servers(); Config::set_option("rendezvous-servers".to_owned(), cu.rendezvous_servers.join(",")); Config::set_serial(cu.serial); if v0 != Config::get_rendezvous_servers() { Self::restart(); } } _ => {} } } else { log::debug!("Non-protobuf message bytes received: {:?}", bytes); } }, Some(Err(e)) => bail!("Failed to receive next {}", e), // maybe socks5 tcp disconnected None => { bail!("Socket receive none. Maybe socks5 server is down."); }, } }, _ = timer.tick() => { if SHOULD_EXIT.load(Ordering::SeqCst) { break; } let now = Some(Instant::now()); if last_timer.map(|x| x.elapsed() < TIMER_OUT).unwrap_or(false) { // a workaround of tokio timer bug continue; } last_timer = now; let elapsed_resp = last_register_resp.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL); let timeout = (elapsed_resp - last_register_sent.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL)) > REG_TIMEOUT; if timeout || elapsed_resp >= REG_INTERVAL { allow_err!(rz.register_peer(&mut socket).await); last_register_sent = now; if timeout { fails += 1; if fails > MAX_FAILS2 { Config::update_latency(&host, -1); old_latency = 0; if last_dns_check.elapsed().as_millis() as i64 > DNS_INTERVAL { // in some case of network reconnect (dial IP network), // old UDP socket not work any more after network recover if let Some((s, addr)) = socket_client::rebind_udp_for(&rz.host).await? { socket = s; rz.addr = addr; } last_dns_check = Instant::now(); } } else if fails > MAX_FAILS1 { Config::update_latency(&host, 0); old_latency = 0; } } } } } } Ok(()) } async fn handle_request_relay(&self, rr: RequestRelay, server: ServerPtr) -> ResultType<()> { self.create_relay( rr.socket_addr.into(), rr.relay_server, rr.uuid, server, rr.secure, false, ) .await } async fn create_relay( &self, socket_addr: Vec, relay_server: String, uuid: String, server: ServerPtr, secure: bool, initiate: bool, ) -> ResultType<()> { let peer_addr = AddrMangle::decode(&socket_addr); log::info!( "create_relay requested from {:?}, relay_server: {}, uuid: {}, secure: {}", peer_addr, relay_server, uuid, secure, ); let mut socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?; let mut msg_out = Message::new(); let mut rr = RelayResponse { socket_addr: socket_addr.into(), version: crate::VERSION.to_owned(), ..Default::default() }; if initiate { rr.uuid = uuid.clone(); rr.relay_server = relay_server.clone(); rr.set_id(Config::get_id()); } msg_out.set_relay_response(rr); socket.send(&msg_out).await?; crate::create_relay_connection( server, relay_server, uuid, peer_addr, secure, is_ipv4(&self.addr), ) .await; Ok(()) } async fn handle_intranet(&self, fla: FetchLocalAddr, server: ServerPtr) -> ResultType<()> { let relay_server = self.get_relay_server(fla.relay_server); if !is_ipv4(&self.addr) { // nat64, go relay directly, because current hbbs will crash if demangle ipv6 address let uuid = Uuid::new_v4().to_string(); return self .create_relay( fla.socket_addr.into(), relay_server, uuid, server, true, true, ) .await; } let peer_addr = AddrMangle::decode(&fla.socket_addr); log::debug!("Handle intranet from {:?}", peer_addr); let mut socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?; let local_addr = socket.local_addr(); let local_addr: SocketAddr = format!("{}:{}", local_addr.ip(), local_addr.port()).parse()?; let mut msg_out = Message::new(); msg_out.set_local_addr(LocalAddr { id: Config::get_id(), socket_addr: AddrMangle::encode(peer_addr).into(), local_addr: AddrMangle::encode(local_addr).into(), relay_server, version: crate::VERSION.to_owned(), ..Default::default() }); let bytes = msg_out.write_to_bytes()?; socket.send_raw(bytes).await?; crate::accept_connection(server.clone(), socket, peer_addr, true).await; Ok(()) } async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> { let relay_server = self.get_relay_server(ph.relay_server); if ph.nat_type.enum_value() == Ok(NatType::SYMMETRIC) || Config::get_nat_type() == NatType::SYMMETRIC as i32 { let uuid = Uuid::new_v4().to_string(); return self .create_relay( ph.socket_addr.into(), relay_server, uuid, server, true, true, ) .await; } let peer_addr = AddrMangle::decode(&ph.socket_addr); log::debug!("Punch hole to {:?}", peer_addr); let mut socket = { let socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?; let local_addr = socket.local_addr(); // key important here for punch hole to tell my gateway incoming peer is safe. // it can not be async here, because local_addr can not be reused, we must close the connection before use it again. allow_err!(socket_client::connect_tcp_local(peer_addr, Some(local_addr), 30).await); socket }; let mut msg_out = Message::new(); use hbb_common::protobuf::Enum; let nat_type = NatType::from_i32(Config::get_nat_type()).unwrap_or(NatType::UNKNOWN_NAT); msg_out.set_punch_hole_sent(PunchHoleSent { socket_addr: ph.socket_addr, id: Config::get_id(), relay_server, nat_type: nat_type.into(), version: crate::VERSION.to_owned(), ..Default::default() }); let bytes = msg_out.write_to_bytes()?; socket.send_raw(bytes).await?; crate::accept_connection(server.clone(), socket, peer_addr, true).await; Ok(()) } async fn register_pk(&mut self, socket: &mut FramedSocket) -> ResultType<()> { let mut msg_out = Message::new(); let pk = Config::get_key_pair().1; let uuid = hbb_common::get_uuid(); let id = Config::get_id(); self.last_id_pk_registry = id.clone(); msg_out.set_register_pk(RegisterPk { id, uuid: uuid.into(), pk: pk.into(), ..Default::default() }); socket.send(&msg_out, self.addr.to_owned()).await?; Ok(()) } async fn handle_uuid_mismatch(&mut self, socket: &mut FramedSocket) -> ResultType<()> { if self.last_id_pk_registry != Config::get_id() { return Ok(()); } { let mut solving = SOLVING_PK_MISMATCH.lock().unwrap(); if solving.is_empty() || *solving == self.host { log::info!("UUID_MISMATCH received from {}", self.host); Config::set_key_confirmed(false); Config::update_id(); *solving = self.host.clone(); } else { return Ok(()); } } self.register_pk(socket).await } async fn register_peer(&mut self, socket: &mut FramedSocket) -> ResultType<()> { if !SOLVING_PK_MISMATCH.lock().unwrap().is_empty() { return Ok(()); } if !Config::get_key_confirmed() || !Config::get_host_key_confirmed(&self.host_prefix) { log::info!( "register_pk of {} due to key not confirmed", self.host_prefix ); return self.register_pk(socket).await; } let id = Config::get_id(); log::trace!( "Register my id {:?} to rendezvous server {:?}", id, self.addr, ); let mut msg_out = Message::new(); let serial = Config::get_serial(); msg_out.set_register_peer(RegisterPeer { id, serial, ..Default::default() }); socket.send(&msg_out, self.addr.to_owned()).await?; Ok(()) } fn get_relay_server(&self, provided_by_rendezvous_server: String) -> String { let mut relay_server = Config::get_option("relay-server"); if relay_server.is_empty() { relay_server = provided_by_rendezvous_server; } if relay_server.is_empty() { relay_server = crate::increase_port(&self.host, 1); } relay_server } } fn get_direct_port() -> i32 { let mut port = Config::get_option("direct-access-port") .parse::() .unwrap_or(0); if port <= 0 { port = RENDEZVOUS_PORT + 2; } port } async fn direct_server(server: ServerPtr) { let mut listener = None; let mut port = 0; loop { let disabled = Config::get_option("direct-server").is_empty() || !Config::get_option("stop-service").is_empty(); if !disabled && listener.is_none() { port = get_direct_port(); match hbb_common::tcp::listen_any(port as _).await { Ok(l) => { listener = Some(l); log::info!( "Direct server listening on: {:?}", listener.as_ref().map(|l| l.local_addr()) ); } Err(err) => { // to-do: pass to ui log::error!( "Failed to start direct server on port: {}, error: {}", port, err ); loop { if port != get_direct_port() { break; } sleep(1.).await; } } } } if let Some(l) = listener.as_mut() { if disabled || port != get_direct_port() { log::info!("Exit direct access listen"); listener = None; continue; } if let Ok(Ok((stream, addr))) = hbb_common::timeout(1000, l.accept()).await { stream.set_nodelay(true).ok(); log::info!("direct access from {}", addr); let local_addr = stream .local_addr() .unwrap_or(Config::get_any_listen_addr(true)); let server = server.clone(); tokio::spawn(async move { allow_err!( crate::server::create_tcp_connection( server, hbb_common::Stream::from(stream, local_addr), addr, false, ) .await ); }); } else { sleep(0.1).await; } } else { sleep(1.).await; } } } #[tokio::main(flavor = "current_thread")] pub async fn query_online_states, Vec)>(ids: Vec, f: F) { let test = false; if test { sleep(1.5).await; let mut onlines = ids; let offlines = onlines.drain((onlines.len() / 2)..).collect(); f(onlines, offlines) } else { let query_begin = Instant::now(); let query_timeout = std::time::Duration::from_millis(3_000); loop { if SHOULD_EXIT.load(Ordering::SeqCst) { break; } match query_online_states_(&ids, query_timeout).await { Ok((onlines, offlines)) => { f(onlines, offlines); break; } Err(e) => { log::debug!("{}", &e); } } if query_begin.elapsed() > query_timeout { log::debug!("query onlines timeout {:?}", query_timeout); break; } sleep(1.5).await; } } } async fn create_online_stream() -> ResultType { let (rendezvous_server, _servers, _contained) = crate::get_rendezvous_server(READ_TIMEOUT).await; let tmp: Vec<&str> = rendezvous_server.split(":").collect(); if tmp.len() != 2 { bail!("Invalid server address: {}", rendezvous_server); } let port: u16 = tmp[1].parse()?; if port == 0 { bail!("Invalid server address: {}", rendezvous_server); } let online_server = format!("{}:{}", tmp[0], port - 1); socket_client::connect_tcp(online_server, CONNECT_TIMEOUT).await } async fn query_online_states_( ids: &Vec, timeout: std::time::Duration, ) -> ResultType<(Vec, Vec)> { let query_begin = Instant::now(); let mut msg_out = RendezvousMessage::new(); msg_out.set_online_request(OnlineRequest { id: Config::get_id(), peers: ids.clone(), ..Default::default() }); loop { if SHOULD_EXIT.load(Ordering::SeqCst) { // No need to care about onlines return Ok((Vec::new(), Vec::new())); } let mut socket = create_online_stream().await?; socket.send(&msg_out).await?; if let Some(msg_in) = crate::common::get_next_nonkeyexchange_msg(&mut socket, None).await { match msg_in.union { Some(rendezvous_message::Union::OnlineResponse(online_response)) => { let states = online_response.states; let mut onlines = Vec::new(); let mut offlines = Vec::new(); for i in 0..ids.len() { // bytes index from left to right let bit_value = 0x01 << (7 - i % 8); if (states[i / 8] & bit_value) == bit_value { onlines.push(ids[i].clone()); } else { offlines.push(ids[i].clone()); } } return Ok((onlines, offlines)); } _ => { // ignore } } } else { // TODO: Make sure socket closed? bail!("Online stream receives None"); } if query_begin.elapsed() > timeout { bail!("Try query onlines timeout {:?}", &timeout); } sleep(300.0).await; } } #[cfg(test)] mod tests { #[test] fn test_query_onlines() { super::query_online_states( vec![ "152183996".to_owned(), "165782066".to_owned(), "155323351".to_owned(), "460952777".to_owned(), ], |onlines: Vec, offlines: Vec| { println!("onlines: {:?}, offlines: {:?}", &onlines, &offlines); }, ); } }