From 3e590b8212090095a5134639c21d9ded4922cad3 Mon Sep 17 00:00:00 2001 From: open-trade Date: Tue, 4 Jan 2022 19:49:44 +0800 Subject: [PATCH] refactor to_socket_addr and dns_check --- libs/hbb_common/src/config.rs | 4 +-- libs/hbb_common/src/lib.rs | 13 ++------- libs/hbb_common/src/socket_client.rs | 34 +++++++++++++++++----- libs/hbb_common/src/udp.rs | 2 +- src/common.rs | 7 +++-- src/ipc.rs | 4 +-- src/rendezvous_mediator.rs | 43 ++++++++++++---------------- 7 files changed, 57 insertions(+), 50 deletions(-) diff --git a/libs/hbb_common/src/config.rs b/libs/hbb_common/src/config.rs index 49b12f4b0..fa2db12cd 100644 --- a/libs/hbb_common/src/config.rs +++ b/libs/hbb_common/src/config.rs @@ -349,7 +349,7 @@ impl Config { format!("{}:0", BIND_INTERFACE).parse().unwrap() } - pub fn get_rendezvous_server() -> SocketAddr { + pub async fn get_rendezvous_server() -> SocketAddr { let mut rendezvous_server = Self::get_option("custom-rendezvous-server"); if rendezvous_server.is_empty() { rendezvous_server = CONFIG2.write().unwrap().rendezvous_server.clone(); @@ -363,7 +363,7 @@ impl Config { if !rendezvous_server.contains(":") { rendezvous_server = format!("{}:{}", rendezvous_server, RENDEZVOUS_PORT); } - if let Ok(addr) = crate::to_socket_addr(&rendezvous_server) { + if let Ok(addr) = crate::to_socket_addr(&rendezvous_server).await { addr } else { Self::get_any_listen_addr() diff --git a/libs/hbb_common/src/lib.rs b/libs/hbb_common/src/lib.rs index dd685f77b..367507243 100644 --- a/libs/hbb_common/src/lib.rs +++ b/libs/hbb_common/src/lib.rs @@ -9,15 +9,15 @@ pub use protobuf; use std::{ fs::File, io::{self, BufRead}, - net::{Ipv4Addr, SocketAddr, SocketAddrV4, ToSocketAddrs}, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, path::Path, time::{self, SystemTime, UNIX_EPOCH}, }; pub use tokio; pub use tokio_util; +pub mod socket_client; pub mod tcp; pub mod udp; -pub mod socket_client; pub use env_logger; pub use log; pub mod bytes_codec; @@ -27,6 +27,7 @@ pub use anyhow::{self, bail}; pub use futures_util; pub mod config; pub mod fs; +pub use socket_client::to_socket_addr; pub use sodiumoxide; pub use tokio_socks; @@ -149,14 +150,6 @@ pub fn get_version_from_url(url: &str) -> String { "".to_owned() } -pub fn to_socket_addr(host: &str) -> ResultType { - let addrs: Vec = host.to_socket_addrs()?.collect(); - if addrs.is_empty() { - bail!("Failed to solve {}", host); - } - Ok(addrs[0]) -} - pub fn gen_version() { let mut file = File::create("./src/version.rs").unwrap(); for line in read_lines("Cargo.toml").unwrap() { diff --git a/libs/hbb_common/src/socket_client.rs b/libs/hbb_common/src/socket_client.rs index 2fd9bcff1..e38ea5ed6 100644 --- a/libs/hbb_common/src/socket_client.rs +++ b/libs/hbb_common/src/socket_client.rs @@ -1,5 +1,5 @@ use crate::{ - config::{Config, NetworkType}, + config::{Config, NetworkType, RENDEZVOUS_TIMEOUT}, tcp::FramedStream, udp::FramedSocket, ResultType, @@ -47,15 +47,35 @@ pub async fn connect_tcp<'t, T: IntoTargetAddr<'t>>( } } -pub async fn connect_udp<'t, T1: IntoTargetAddr<'t>, T2: ToSocketAddrs>( +fn native_to_socket_addr(host: &str) -> ResultType { + use std::net::ToSocketAddrs; + let addrs: Vec = host.to_socket_addrs()?.collect(); + if addrs.is_empty() { + bail!("Failed to solve {}", host); + } + Ok(addrs[0]) +} + +pub async fn to_socket_addr(host: &str) -> ResultType { + Ok( + new_udp(host, Config::get_any_listen_addr(), RENDEZVOUS_TIMEOUT) + .await? + .1, + ) +} + +pub async fn new_udp<'t, T1: IntoTargetAddr<'t> + std::fmt::Display, T2: ToSocketAddrs>( target: T1, local: T2, ms_timeout: u64, -) -> ResultType<(FramedSocket, Option)> { +) -> ResultType<(FramedSocket, SocketAddr)> { match Config::get_socks() { - None => Ok((FramedSocket::new(local).await?, None)), + None => Ok(( + FramedSocket::new(local).await?, + native_to_socket_addr(&target.to_string())?, + )), Some(conf) => { - let (socket, addr) = FramedSocket::connect( + let (socket, addr) = FramedSocket::new_proxy( conf.proxy.as_str(), target, local, @@ -64,12 +84,12 @@ pub async fn connect_udp<'t, T1: IntoTargetAddr<'t>, T2: ToSocketAddrs>( ms_timeout, ) .await?; - Ok((socket, Some(addr))) + Ok((socket, addr)) } } } -pub async fn reconnect_udp(local: T) -> ResultType> { +pub async fn rebind(local: T) -> ResultType> { match Config::get_network_type() { NetworkType::Direct => Ok(Some(FramedSocket::new(local).await?)), _ => Ok(None), diff --git a/libs/hbb_common/src/udp.rs b/libs/hbb_common/src/udp.rs index 719cea076..f5d088623 100644 --- a/libs/hbb_common/src/udp.rs +++ b/libs/hbb_common/src/udp.rs @@ -49,7 +49,7 @@ impl FramedSocket { bail!("could not resolve to any address"); } - pub async fn connect<'a, 't, P: ToProxyAddrs, T1: IntoTargetAddr<'t>, T2: ToSocketAddrs>( + pub async fn new_proxy<'a, 't, P: ToProxyAddrs, T1: IntoTargetAddr<'t>, T2: ToSocketAddrs>( proxy: P, target: T1, local: T2, diff --git a/src/common.rs b/src/common.rs index dcef673ed..03256ce4c 100644 --- a/src/common.rs +++ b/src/common.rs @@ -415,12 +415,13 @@ pub fn is_modifier(evt: &KeyEvent) -> bool { } } -pub fn test_if_valid_server(host: String) -> String { +#[tokio::main(flavor = "current_thread")] +pub async fn test_if_valid_server(host: String) -> String { let mut host = host; if !host.contains(":") { host = format!("{}:{}", host, 0); } - match hbb_common::to_socket_addr(&host) { + match hbb_common::to_socket_addr(&host).await { Err(err) => err.to_string(), Ok(_) => "".to_owned(), } @@ -443,7 +444,7 @@ async fn _check_software_update() -> hbb_common::ResultType<()> { sleep(3.).await; let rendezvous_server = get_rendezvous_server(1_000).await; - let (mut socket, _) = socket_client::connect_udp( + let (mut socket, _) = socket_client::new_udp( rendezvous_server, Config::get_any_listen_addr(), RENDEZVOUS_TIMEOUT, diff --git a/src/ipc.rs b/src/ipc.rs index 0474e457f..cd69ccfb3 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -202,7 +202,7 @@ async fn handle(data: Data, stream: &mut Connection) { } else if name == "salt" { value = Some(Config::get_salt()); } else if name == "rendezvous_server" { - value = Some(Config::get_rendezvous_server().to_string()); + value = Some(Config::get_rendezvous_server().await.to_string()); } else { value = None; } @@ -403,7 +403,7 @@ pub async fn get_rendezvous_server(ms_timeout: u64) -> SocketAddr { return v; } } - return Config::get_rendezvous_server(); + return Config::get_rendezvous_server().await; } async fn get_options_(ms_timeout: u64) -> ResultType> { diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 128cef6c2..123a1e016 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -99,18 +99,13 @@ impl RendezvousMediator { rendezvous_servers, last_id_pk_registry: "".to_owned(), }; - let mut host_addr = rz.addr; - allow_err!(rz.dns_check(&mut host_addr)); - - let bind_addr = Config::get_any_listen_addr(); - let target = format!("{}:{}", host, RENDEZVOUS_PORT); - let (mut socket, target_addr) = - socket_client::connect_udp(target, bind_addr, RENDEZVOUS_TIMEOUT).await?; - if let Some(addr) = target_addr { - rz.addr = addr; - } else { - rz.addr = host_addr; - } + let (mut socket, target_addr) = socket_client::new_udp( + crate::check_port(&host, RENDEZVOUS_PORT), + Config::get_any_listen_addr(), + RENDEZVOUS_TIMEOUT, + ) + .await?; + rz.addr = target_addr; const TIMER_OUT: Duration = Duration::from_secs(1); let mut timer = interval(TIMER_OUT); let mut last_timer = SystemTime::UNIX_EPOCH; @@ -228,16 +223,14 @@ impl RendezvousMediator { break; } if rz.addr.port() == 0 { - // tcp is established to help connecting socks5 - allow_err!(rz.dns_check(&mut host_addr)); - if host_addr.port() == 0 { + allow_err!(rz.dns_check().await); + if rz.addr.port() == 0 { continue; } else { // have to do this for osx, to avoid "Can't assign requested address" // when socket created before OS network ready - if let Some(s) = socket_client::reconnect_udp(bind_addr).await? { + if let Some(s) = socket_client::rebind(Config::get_any_listen_addr()).await? { socket = s; - rz.addr = host_addr; }; } } @@ -258,12 +251,11 @@ impl RendezvousMediator { Config::update_latency(&host, -1); old_latency = 0; if now.duration_since(last_dns_check).map(|d| d.as_millis() as i64).unwrap_or(0) > DNS_INTERVAL { - if let Ok(_) = rz.dns_check(&mut host_addr) { + if let Ok(_) = rz.dns_check().await { // in some case of network reconnect (dial IP network), // old UDP socket not work any more after network recover - if let Some(s) = socket_client::reconnect_udp(bind_addr).await? { + if let Some(s) = socket_client::rebind(Config::get_any_listen_addr()).await? { socket = s; - rz.addr = host_addr; }; } last_dns_check = now; @@ -280,8 +272,9 @@ impl RendezvousMediator { Ok(()) } - fn dns_check(&self, addr: &mut SocketAddr) -> ResultType<()> { - *addr = hbb_common::to_socket_addr(&crate::check_port(&self.host, RENDEZVOUS_PORT))?; + async fn dns_check(&mut self) -> ResultType<()> { + self.addr = + hbb_common::to_socket_addr(&crate::check_port(&self.host, RENDEZVOUS_PORT)).await?; log::debug!("Lookup dns of {}", self.host); Ok(()) } @@ -317,7 +310,7 @@ impl RendezvousMediator { ); let mut socket = socket_client::connect_tcp( - format!("{}:{}", self.host, RENDEZVOUS_PORT), + self.addr, Config::get_any_listen_addr(), RENDEZVOUS_TIMEOUT, ) @@ -345,7 +338,7 @@ impl RendezvousMediator { let peer_addr = AddrMangle::decode(&fla.socket_addr); log::debug!("Handle intranet from {:?}", peer_addr); let mut socket = socket_client::connect_tcp( - format!("{}:{}", self.host, RENDEZVOUS_PORT), + self.addr, Config::get_any_listen_addr(), RENDEZVOUS_TIMEOUT, ) @@ -389,7 +382,7 @@ impl RendezvousMediator { log::debug!("Punch hole to {:?}", peer_addr); let mut socket = { let socket = socket_client::connect_tcp( - format!("{}:{}", self.host, RENDEZVOUS_PORT), + self.addr, Config::get_any_listen_addr(), RENDEZVOUS_TIMEOUT, )