mirror of
https://github.com/rustdesk/rustdesk.git
synced 2025-01-23 10:56:31 +08:00
9ab5512bfa
Signed-off-by: fufesou <linlong1266@gmail.com>
855 lines
31 KiB
Rust
855 lines
31 KiB
Rust
use std::{
|
|
net::SocketAddr,
|
|
sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
Arc,
|
|
},
|
|
time::Instant,
|
|
};
|
|
|
|
use uuid::Uuid;
|
|
|
|
use hbb_common::{
|
|
allow_err,
|
|
anyhow::{self, bail},
|
|
config::{self, Config, CONNECT_TIMEOUT, READ_TIMEOUT, REG_INTERVAL, RENDEZVOUS_PORT},
|
|
futures::future::join_all,
|
|
log,
|
|
protobuf::Message as _,
|
|
proxy::Proxy,
|
|
rendezvous_proto::*,
|
|
sleep,
|
|
socket_client::{self, connect_tcp, is_ipv4},
|
|
tcp::FramedStream,
|
|
tokio::{
|
|
self, select,
|
|
sync::Mutex,
|
|
time::{interval, Duration},
|
|
},
|
|
udp::FramedSocket,
|
|
AddrMangle, IntoTargetAddr, ResultType, TargetAddr,
|
|
};
|
|
|
|
use crate::{
|
|
check_port,
|
|
server::{check_zombie, new as new_server, ServerPtr},
|
|
};
|
|
|
|
type Message = RendezvousMessage;
|
|
|
|
const TIMER_OUT: Duration = Duration::from_secs(1);
|
|
const DEFAULT_KEEP_ALIVE: i32 = 60_000;
|
|
|
|
lazy_static::lazy_static! {
|
|
static ref SOLVING_PK_MISMATCH: Arc<Mutex<String>> = Default::default();
|
|
}
|
|
static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
|
|
static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false);
|
|
|
|
#[derive(Clone)]
|
|
pub struct RendezvousMediator {
|
|
addr: TargetAddr<'static>,
|
|
host: String,
|
|
host_prefix: String,
|
|
keep_alive: i32,
|
|
}
|
|
|
|
impl RendezvousMediator {
|
|
pub fn restart() {
|
|
SHOULD_EXIT.store(true, Ordering::SeqCst);
|
|
MANUAL_RESTARTED.store(true, Ordering::SeqCst);
|
|
log::info!("server restart");
|
|
}
|
|
|
|
pub async fn start_all() {
|
|
if config::is_outgoing_only() {
|
|
loop {
|
|
sleep(1.).await;
|
|
}
|
|
}
|
|
crate::hbbs_http::sync::start();
|
|
let mut nat_tested = false;
|
|
check_zombie();
|
|
let server = new_server();
|
|
if Config::get_nat_type() == NatType::UNKNOWN_NAT as i32 {
|
|
crate::test_nat_type();
|
|
nat_tested = true;
|
|
}
|
|
if config::option2bool("stop-service", &Config::get_option("stop-service")) {
|
|
crate::test_rendezvous_server();
|
|
}
|
|
let server_cloned = server.clone();
|
|
tokio::spawn(async move {
|
|
direct_server(server_cloned).await;
|
|
});
|
|
#[cfg(not(any(target_os = "android", target_os = "ios")))]
|
|
if crate::platform::is_installed() {
|
|
std::thread::spawn(move || {
|
|
allow_err!(super::lan::start_listening());
|
|
});
|
|
}
|
|
// It is ok to run xdesktop manager when the headless function is not allowed.
|
|
#[cfg(target_os = "linux")]
|
|
if crate::is_server() {
|
|
crate::platform::linux_desktop_manager::start_xdesktop();
|
|
}
|
|
loop {
|
|
let conn_start_time = Instant::now();
|
|
*SOLVING_PK_MISMATCH.lock().await = "".to_owned();
|
|
if !config::option2bool("stop-service", &Config::get_option("stop-service"))
|
|
&& !crate::platform::installing_service()
|
|
{
|
|
if !nat_tested {
|
|
crate::test_nat_type();
|
|
nat_tested = true;
|
|
}
|
|
let mut futs = Vec::new();
|
|
let servers = Config::get_rendezvous_servers();
|
|
SHOULD_EXIT.store(false, Ordering::SeqCst);
|
|
MANUAL_RESTARTED.store(false, Ordering::SeqCst);
|
|
for host in servers.clone() {
|
|
let server = server.clone();
|
|
futs.push(tokio::spawn(async move {
|
|
if let Err(err) = Self::start(server, host).await {
|
|
log::error!("rendezvous mediator error: {err}");
|
|
}
|
|
// SHOULD_EXIT here is to ensure once one exits, the others also exit.
|
|
SHOULD_EXIT.store(true, Ordering::SeqCst);
|
|
}));
|
|
}
|
|
join_all(futs).await;
|
|
} else {
|
|
server.write().unwrap().close_connections();
|
|
}
|
|
Config::reset_online();
|
|
if !MANUAL_RESTARTED.load(Ordering::SeqCst) {
|
|
let elapsed = conn_start_time.elapsed().as_millis() as u64;
|
|
if elapsed < CONNECT_TIMEOUT {
|
|
sleep(((CONNECT_TIMEOUT - elapsed) / 1000) as _).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn get_host_prefix(host: &str) -> String {
|
|
host.split(".")
|
|
.next()
|
|
.map(|x| {
|
|
if x.parse::<i32>().is_ok() {
|
|
host.to_owned()
|
|
} else {
|
|
x.to_owned()
|
|
}
|
|
})
|
|
.unwrap_or(host.to_owned())
|
|
}
|
|
|
|
pub async fn start_udp(server: ServerPtr, host: String) -> ResultType<()> {
|
|
let host = check_port(&host, RENDEZVOUS_PORT);
|
|
let (mut socket, mut addr) = socket_client::new_udp_for(&host, CONNECT_TIMEOUT).await?;
|
|
let mut rz = Self {
|
|
addr: addr.clone(),
|
|
host: host.clone(),
|
|
host_prefix: Self::get_host_prefix(&host),
|
|
keep_alive: DEFAULT_KEEP_ALIVE,
|
|
};
|
|
|
|
let mut timer = crate::rustdesk_interval(interval(TIMER_OUT));
|
|
const MIN_REG_TIMEOUT: i64 = 3_000;
|
|
const MAX_REG_TIMEOUT: i64 = 30_000;
|
|
let mut reg_timeout = MIN_REG_TIMEOUT;
|
|
const MAX_FAILS1: i64 = 2;
|
|
const MAX_FAILS2: i64 = 4;
|
|
const DNS_INTERVAL: i64 = 60_000;
|
|
let mut fails = 0;
|
|
let mut last_register_resp: Option<Instant> = None;
|
|
let mut last_register_sent: Option<Instant> = None;
|
|
let mut last_dns_check = Instant::now();
|
|
let mut old_latency = 0;
|
|
let mut ema_latency = 0;
|
|
loop {
|
|
let mut update_latency = || {
|
|
last_register_resp = Some(Instant::now());
|
|
fails = 0;
|
|
reg_timeout = MIN_REG_TIMEOUT;
|
|
let mut latency = last_register_sent
|
|
.map(|x| x.elapsed().as_micros() as i64)
|
|
.unwrap_or(0);
|
|
last_register_sent = None;
|
|
if latency < 0 || latency > 1_000_000 {
|
|
return;
|
|
}
|
|
if ema_latency == 0 {
|
|
ema_latency = latency;
|
|
} else {
|
|
ema_latency = latency / 30 + (ema_latency * 29 / 30);
|
|
latency = ema_latency;
|
|
}
|
|
let mut n = latency / 5;
|
|
if n < 3000 {
|
|
n = 3000;
|
|
}
|
|
if (latency - old_latency).abs() > n || old_latency <= 0 {
|
|
Config::update_latency(&host, latency);
|
|
log::debug!("Latency of {}: {}ms", host, latency as f64 / 1000.);
|
|
old_latency = latency;
|
|
}
|
|
};
|
|
select! {
|
|
n = socket.next() => {
|
|
match n {
|
|
Some(Ok((bytes, _))) => {
|
|
if let Ok(msg) = Message::parse_from_bytes(&bytes) {
|
|
rz.handle_resp(msg.union, Sink::Framed(&mut socket, &addr), &server, &mut update_latency).await?;
|
|
} else {
|
|
log::debug!("Non-protobuf message bytes received: {:?}", bytes);
|
|
}
|
|
},
|
|
Some(Err(e)) => bail!("Failed to receive next {}", e), // maybe socks5 tcp disconnected
|
|
None => {
|
|
bail!("Socket receive none. Maybe socks5 server is down.");
|
|
},
|
|
}
|
|
},
|
|
_ = timer.tick() => {
|
|
if SHOULD_EXIT.load(Ordering::SeqCst) {
|
|
break;
|
|
}
|
|
let now = Some(Instant::now());
|
|
let expired = last_register_resp.map(|x| x.elapsed().as_millis() as i64 >= REG_INTERVAL).unwrap_or(true);
|
|
let timeout = last_register_sent.map(|x| x.elapsed().as_millis() as i64 >= reg_timeout).unwrap_or(false);
|
|
// temporarily disable exponential backoff for android before we add wakeup trigger to force connect in android
|
|
#[cfg(not(any(target_os = "android", target_os = "ios")))]
|
|
if crate::using_public_server() { // only turn on this for public server, may help DDNS self-hosting user.
|
|
if timeout && reg_timeout < MAX_REG_TIMEOUT {
|
|
reg_timeout += MIN_REG_TIMEOUT;
|
|
}
|
|
}
|
|
if timeout || (last_register_sent.is_none() && expired) {
|
|
if timeout {
|
|
fails += 1;
|
|
if fails >= MAX_FAILS2 {
|
|
Config::update_latency(&host, -1);
|
|
old_latency = 0;
|
|
if last_dns_check.elapsed().as_millis() as i64 > DNS_INTERVAL {
|
|
// in some case of network reconnect (dial IP network),
|
|
// old UDP socket not work any more after network recover
|
|
if let Some((s, new_addr)) = socket_client::rebind_udp_for(&rz.host).await? {
|
|
socket = s;
|
|
rz.addr = new_addr.clone();
|
|
addr = new_addr;
|
|
}
|
|
last_dns_check = Instant::now();
|
|
}
|
|
} else if fails >= MAX_FAILS1 {
|
|
Config::update_latency(&host, 0);
|
|
old_latency = 0;
|
|
}
|
|
}
|
|
rz.register_peer(Sink::Framed(&mut socket, &addr)).await?;
|
|
last_register_sent = now;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[inline]
|
|
async fn handle_resp(
|
|
&mut self,
|
|
msg: Option<rendezvous_message::Union>,
|
|
sink: Sink<'_>,
|
|
server: &ServerPtr,
|
|
update_latency: &mut impl FnMut(),
|
|
) -> ResultType<()> {
|
|
match msg {
|
|
Some(rendezvous_message::Union::RegisterPeerResponse(rpr)) => {
|
|
update_latency();
|
|
if rpr.request_pk {
|
|
log::info!("request_pk received from {}", self.host);
|
|
self.register_pk(sink).await?;
|
|
}
|
|
}
|
|
Some(rendezvous_message::Union::RegisterPkResponse(rpr)) => {
|
|
update_latency();
|
|
match rpr.result.enum_value() {
|
|
Ok(register_pk_response::Result::OK) => {
|
|
Config::set_key_confirmed(true);
|
|
Config::set_host_key_confirmed(&self.host_prefix, true);
|
|
*SOLVING_PK_MISMATCH.lock().await = "".to_owned();
|
|
}
|
|
Ok(register_pk_response::Result::UUID_MISMATCH) => {
|
|
self.handle_uuid_mismatch(sink).await?;
|
|
}
|
|
_ => {
|
|
log::error!("unknown RegisterPkResponse");
|
|
}
|
|
}
|
|
if rpr.keep_alive > 0 {
|
|
self.keep_alive = rpr.keep_alive * 1000;
|
|
log::info!("keep_alive: {}ms", self.keep_alive);
|
|
}
|
|
}
|
|
Some(rendezvous_message::Union::PunchHole(ph)) => {
|
|
let rz = self.clone();
|
|
let server = server.clone();
|
|
tokio::spawn(async move {
|
|
allow_err!(rz.handle_punch_hole(ph, server).await);
|
|
});
|
|
}
|
|
Some(rendezvous_message::Union::RequestRelay(rr)) => {
|
|
let rz = self.clone();
|
|
let server = server.clone();
|
|
tokio::spawn(async move {
|
|
allow_err!(rz.handle_request_relay(rr, server).await);
|
|
});
|
|
}
|
|
Some(rendezvous_message::Union::FetchLocalAddr(fla)) => {
|
|
let rz = self.clone();
|
|
let server = server.clone();
|
|
tokio::spawn(async move {
|
|
allow_err!(rz.handle_intranet(fla, server).await);
|
|
});
|
|
}
|
|
Some(rendezvous_message::Union::ConfigureUpdate(cu)) => {
|
|
let v0 = Config::get_rendezvous_servers();
|
|
Config::set_option(
|
|
"rendezvous-servers".to_owned(),
|
|
cu.rendezvous_servers.join(","),
|
|
);
|
|
Config::set_serial(cu.serial);
|
|
if v0 != Config::get_rendezvous_servers() {
|
|
Self::restart();
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn start_tcp(server: ServerPtr, host: String) -> ResultType<()> {
|
|
let host = check_port(&host, RENDEZVOUS_PORT);
|
|
let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?;
|
|
let key = crate::get_key(true).await;
|
|
crate::secure_tcp(&mut conn, &key).await?;
|
|
let mut rz = Self {
|
|
addr: conn.local_addr().into_target_addr()?,
|
|
host: host.clone(),
|
|
host_prefix: Self::get_host_prefix(&host),
|
|
keep_alive: DEFAULT_KEEP_ALIVE,
|
|
};
|
|
let mut timer = crate::rustdesk_interval(interval(TIMER_OUT));
|
|
let mut last_register_sent: Option<Instant> = None;
|
|
let mut last_recv_msg = Instant::now();
|
|
// we won't support connecting to multiple rendzvous servers any more, so we can use a global variable here.
|
|
Config::set_host_key_confirmed(&host, false);
|
|
loop {
|
|
let mut update_latency = || {
|
|
let latency = last_register_sent
|
|
.map(|x| x.elapsed().as_micros() as i64)
|
|
.unwrap_or(0);
|
|
Config::update_latency(&host, latency);
|
|
log::debug!("Latency of {}: {}ms", host, latency as f64 / 1000.);
|
|
};
|
|
select! {
|
|
res = conn.next() => {
|
|
last_recv_msg = Instant::now();
|
|
let bytes = res.ok_or_else(|| anyhow::anyhow!("Rendezvous connection is reset by the peer"))??;
|
|
if bytes.is_empty() {
|
|
conn.send_bytes(bytes::Bytes::new()).await?;
|
|
continue; // heartbeat
|
|
}
|
|
let msg = Message::parse_from_bytes(&bytes)?;
|
|
rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await?
|
|
}
|
|
_ = timer.tick() => {
|
|
if SHOULD_EXIT.load(Ordering::SeqCst) {
|
|
break;
|
|
}
|
|
// https://www.emqx.com/en/blog/mqtt-keep-alive
|
|
if last_recv_msg.elapsed().as_millis() as u64 > rz.keep_alive as u64 * 3 / 2 {
|
|
bail!("Rendezvous connection is timeout");
|
|
}
|
|
if (!Config::get_key_confirmed() ||
|
|
!Config::get_host_key_confirmed(&host)) &&
|
|
last_register_sent.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL) >= REG_INTERVAL {
|
|
rz.register_pk(Sink::Stream(&mut conn)).await?;
|
|
last_register_sent = Some(Instant::now());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn start(server: ServerPtr, host: String) -> ResultType<()> {
|
|
log::info!("start rendezvous mediator of {}", host);
|
|
//If the investment agent type is http or https, then tcp forwarding is enabled.
|
|
let is_http_proxy = if let Some(conf) = Config::get_socks() {
|
|
let proxy = Proxy::from_conf(&conf, None)?;
|
|
proxy.is_http_or_https()
|
|
} else {
|
|
false
|
|
};
|
|
if (cfg!(debug_assertions) && option_env!("TEST_TCP").is_some()) || is_http_proxy {
|
|
Self::start_tcp(server, host).await
|
|
} else {
|
|
Self::start_udp(server, host).await
|
|
}
|
|
}
|
|
|
|
async fn handle_request_relay(&self, rr: RequestRelay, server: ServerPtr) -> ResultType<()> {
|
|
self.create_relay(
|
|
rr.socket_addr.into(),
|
|
rr.relay_server,
|
|
rr.uuid,
|
|
server,
|
|
rr.secure,
|
|
false,
|
|
)
|
|
.await
|
|
}
|
|
|
|
async fn create_relay(
|
|
&self,
|
|
socket_addr: Vec<u8>,
|
|
relay_server: String,
|
|
uuid: String,
|
|
server: ServerPtr,
|
|
secure: bool,
|
|
initiate: bool,
|
|
) -> ResultType<()> {
|
|
let peer_addr = AddrMangle::decode(&socket_addr);
|
|
log::info!(
|
|
"create_relay requested from {:?}, relay_server: {}, uuid: {}, secure: {}",
|
|
peer_addr,
|
|
relay_server,
|
|
uuid,
|
|
secure,
|
|
);
|
|
|
|
let mut socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
|
|
|
|
let mut msg_out = Message::new();
|
|
let mut rr = RelayResponse {
|
|
socket_addr: socket_addr.into(),
|
|
version: crate::VERSION.to_owned(),
|
|
..Default::default()
|
|
};
|
|
if initiate {
|
|
rr.uuid = uuid.clone();
|
|
rr.relay_server = relay_server.clone();
|
|
rr.set_id(Config::get_id());
|
|
}
|
|
msg_out.set_relay_response(rr);
|
|
socket.send(&msg_out).await?;
|
|
crate::create_relay_connection(
|
|
server,
|
|
relay_server,
|
|
uuid,
|
|
peer_addr,
|
|
secure,
|
|
is_ipv4(&self.addr),
|
|
)
|
|
.await;
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_intranet(&self, fla: FetchLocalAddr, server: ServerPtr) -> ResultType<()> {
|
|
let relay_server = self.get_relay_server(fla.relay_server.clone());
|
|
// nat64, go relay directly, because current hbbs will crash if demangle ipv6 address
|
|
if is_ipv4(&self.addr) && !config::is_disable_tcp_listen() && !Config::is_proxy() {
|
|
if let Err(err) = self
|
|
.handle_intranet_(fla.clone(), server.clone(), relay_server.clone())
|
|
.await
|
|
{
|
|
log::debug!("Failed to handle intranet: {:?}, will try relay", err);
|
|
} else {
|
|
return Ok(());
|
|
}
|
|
}
|
|
let uuid = Uuid::new_v4().to_string();
|
|
self.create_relay(
|
|
fla.socket_addr.into(),
|
|
relay_server,
|
|
uuid,
|
|
server,
|
|
true,
|
|
true,
|
|
)
|
|
.await
|
|
}
|
|
|
|
async fn handle_intranet_(
|
|
&self,
|
|
fla: FetchLocalAddr,
|
|
server: ServerPtr,
|
|
relay_server: String,
|
|
) -> ResultType<()> {
|
|
let peer_addr = AddrMangle::decode(&fla.socket_addr);
|
|
log::debug!("Handle intranet from {:?}", peer_addr);
|
|
let mut socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
|
|
let local_addr = socket.local_addr();
|
|
// we saw invalid local_addr while using proxy, local_addr.ip() == "::1"
|
|
let local_addr: SocketAddr =
|
|
format!("{}:{}", local_addr.ip(), local_addr.port()).parse()?;
|
|
let mut msg_out = Message::new();
|
|
msg_out.set_local_addr(LocalAddr {
|
|
id: Config::get_id(),
|
|
socket_addr: AddrMangle::encode(peer_addr).into(),
|
|
local_addr: AddrMangle::encode(local_addr).into(),
|
|
relay_server,
|
|
version: crate::VERSION.to_owned(),
|
|
..Default::default()
|
|
});
|
|
let bytes = msg_out.write_to_bytes()?;
|
|
socket.send_raw(bytes).await?;
|
|
crate::accept_connection(server.clone(), socket, peer_addr, true).await;
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> {
|
|
let relay_server = self.get_relay_server(ph.relay_server);
|
|
if ph.nat_type.enum_value() == Ok(NatType::SYMMETRIC)
|
|
|| Config::get_nat_type() == NatType::SYMMETRIC as i32
|
|
|| config::is_disable_tcp_listen()
|
|
{
|
|
let uuid = Uuid::new_v4().to_string();
|
|
return self
|
|
.create_relay(
|
|
ph.socket_addr.into(),
|
|
relay_server,
|
|
uuid,
|
|
server,
|
|
true,
|
|
true,
|
|
)
|
|
.await;
|
|
}
|
|
let peer_addr = AddrMangle::decode(&ph.socket_addr);
|
|
log::debug!("Punch hole to {:?}", peer_addr);
|
|
let mut socket = {
|
|
let socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
|
|
let local_addr = socket.local_addr();
|
|
// key important here for punch hole to tell my gateway incoming peer is safe.
|
|
// it can not be async here, because local_addr can not be reused, we must close the connection before use it again.
|
|
allow_err!(socket_client::connect_tcp_local(peer_addr, Some(local_addr), 30).await);
|
|
socket
|
|
};
|
|
let mut msg_out = Message::new();
|
|
use hbb_common::protobuf::Enum;
|
|
let nat_type = NatType::from_i32(Config::get_nat_type()).unwrap_or(NatType::UNKNOWN_NAT);
|
|
msg_out.set_punch_hole_sent(PunchHoleSent {
|
|
socket_addr: ph.socket_addr,
|
|
id: Config::get_id(),
|
|
relay_server,
|
|
nat_type: nat_type.into(),
|
|
version: crate::VERSION.to_owned(),
|
|
..Default::default()
|
|
});
|
|
let bytes = msg_out.write_to_bytes()?;
|
|
socket.send_raw(bytes).await?;
|
|
crate::accept_connection(server.clone(), socket, peer_addr, true).await;
|
|
Ok(())
|
|
}
|
|
|
|
async fn register_pk(&mut self, socket: Sink<'_>) -> ResultType<()> {
|
|
let mut msg_out = Message::new();
|
|
let pk = Config::get_key_pair().1;
|
|
let uuid = hbb_common::get_uuid();
|
|
let id = Config::get_id();
|
|
msg_out.set_register_pk(RegisterPk {
|
|
id,
|
|
uuid: uuid.into(),
|
|
pk: pk.into(),
|
|
..Default::default()
|
|
});
|
|
socket.send(&msg_out).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_uuid_mismatch(&mut self, socket: Sink<'_>) -> ResultType<()> {
|
|
{
|
|
let mut solving = SOLVING_PK_MISMATCH.lock().await;
|
|
if solving.is_empty() || *solving == self.host {
|
|
log::info!("UUID_MISMATCH received from {}", self.host);
|
|
Config::set_key_confirmed(false);
|
|
Config::update_id();
|
|
*solving = self.host.clone();
|
|
} else {
|
|
return Ok(());
|
|
}
|
|
}
|
|
self.register_pk(socket).await
|
|
}
|
|
|
|
async fn register_peer(&mut self, socket: Sink<'_>) -> ResultType<()> {
|
|
let solving = SOLVING_PK_MISMATCH.lock().await;
|
|
if !(solving.is_empty() || *solving == self.host) {
|
|
return Ok(());
|
|
}
|
|
drop(solving);
|
|
if !Config::get_key_confirmed() || !Config::get_host_key_confirmed(&self.host_prefix) {
|
|
log::info!(
|
|
"register_pk of {} due to key not confirmed",
|
|
self.host_prefix
|
|
);
|
|
return self.register_pk(socket).await;
|
|
}
|
|
let id = Config::get_id();
|
|
log::trace!(
|
|
"Register my id {:?} to rendezvous server {:?}",
|
|
id,
|
|
self.addr,
|
|
);
|
|
let mut msg_out = Message::new();
|
|
let serial = Config::get_serial();
|
|
msg_out.set_register_peer(RegisterPeer {
|
|
id,
|
|
serial,
|
|
..Default::default()
|
|
});
|
|
socket.send(&msg_out).await?;
|
|
Ok(())
|
|
}
|
|
|
|
fn get_relay_server(&self, provided_by_rendezvous_server: String) -> String {
|
|
let mut relay_server = Config::get_option("relay-server");
|
|
if relay_server.is_empty() {
|
|
relay_server = provided_by_rendezvous_server;
|
|
}
|
|
if relay_server.is_empty() {
|
|
relay_server = crate::increase_port(&self.host, 1);
|
|
}
|
|
relay_server
|
|
}
|
|
}
|
|
|
|
fn get_direct_port() -> i32 {
|
|
let mut port = Config::get_option("direct-access-port")
|
|
.parse::<i32>()
|
|
.unwrap_or(0);
|
|
if port <= 0 {
|
|
port = RENDEZVOUS_PORT + 2;
|
|
}
|
|
port
|
|
}
|
|
|
|
async fn direct_server(server: ServerPtr) {
|
|
let mut listener = None;
|
|
let mut port = 0;
|
|
loop {
|
|
let disabled = Config::get_option("direct-server").is_empty()
|
|
|| !Config::get_option("stop-service").is_empty();
|
|
if !disabled && listener.is_none() {
|
|
port = get_direct_port();
|
|
match hbb_common::tcp::listen_any(port as _).await {
|
|
Ok(l) => {
|
|
listener = Some(l);
|
|
log::info!(
|
|
"Direct server listening on: {:?}",
|
|
listener.as_ref().map(|l| l.local_addr())
|
|
);
|
|
}
|
|
Err(err) => {
|
|
// to-do: pass to ui
|
|
log::error!(
|
|
"Failed to start direct server on port: {}, error: {}",
|
|
port,
|
|
err
|
|
);
|
|
loop {
|
|
if port != get_direct_port() {
|
|
break;
|
|
}
|
|
sleep(1.).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if let Some(l) = listener.as_mut() {
|
|
if disabled || port != get_direct_port() {
|
|
log::info!("Exit direct access listen");
|
|
listener = None;
|
|
continue;
|
|
}
|
|
if let Ok(Ok((stream, addr))) = hbb_common::timeout(1000, l.accept()).await {
|
|
stream.set_nodelay(true).ok();
|
|
log::info!("direct access from {}", addr);
|
|
let local_addr = stream
|
|
.local_addr()
|
|
.unwrap_or(Config::get_any_listen_addr(true));
|
|
let server = server.clone();
|
|
tokio::spawn(async move {
|
|
allow_err!(
|
|
crate::server::create_tcp_connection(
|
|
server,
|
|
hbb_common::Stream::from(stream, local_addr),
|
|
addr,
|
|
false,
|
|
)
|
|
.await
|
|
);
|
|
});
|
|
} else {
|
|
sleep(0.1).await;
|
|
}
|
|
} else {
|
|
sleep(1.).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<String>, f: F) {
|
|
let test = false;
|
|
if test {
|
|
sleep(1.5).await;
|
|
let mut onlines = ids;
|
|
let offlines = onlines.drain((onlines.len() / 2)..).collect();
|
|
f(onlines, offlines)
|
|
} else {
|
|
let query_begin = Instant::now();
|
|
let query_timeout = std::time::Duration::from_millis(3_000);
|
|
loop {
|
|
if SHOULD_EXIT.load(Ordering::SeqCst) {
|
|
break;
|
|
}
|
|
match query_online_states_(&ids, query_timeout).await {
|
|
Ok((onlines, offlines)) => {
|
|
f(onlines, offlines);
|
|
break;
|
|
}
|
|
Err(e) => {
|
|
log::debug!("{}", &e);
|
|
}
|
|
}
|
|
|
|
if query_begin.elapsed() > query_timeout {
|
|
log::debug!(
|
|
"query onlines timeout {:?} ({:?})",
|
|
query_begin.elapsed(),
|
|
query_timeout
|
|
);
|
|
break;
|
|
}
|
|
|
|
sleep(1.5).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn create_online_stream() -> ResultType<FramedStream> {
|
|
let (rendezvous_server, _servers, _contained) =
|
|
crate::get_rendezvous_server(READ_TIMEOUT).await;
|
|
let tmp: Vec<&str> = rendezvous_server.split(":").collect();
|
|
if tmp.len() != 2 {
|
|
bail!("Invalid server address: {}", rendezvous_server);
|
|
}
|
|
let port: u16 = tmp[1].parse()?;
|
|
if port == 0 {
|
|
bail!("Invalid server address: {}", rendezvous_server);
|
|
}
|
|
let online_server = format!("{}:{}", tmp[0], port - 1);
|
|
connect_tcp(online_server, CONNECT_TIMEOUT).await
|
|
}
|
|
|
|
async fn query_online_states_(
|
|
ids: &Vec<String>,
|
|
timeout: std::time::Duration,
|
|
) -> ResultType<(Vec<String>, Vec<String>)> {
|
|
let query_begin = Instant::now();
|
|
|
|
let mut msg_out = RendezvousMessage::new();
|
|
msg_out.set_online_request(OnlineRequest {
|
|
id: Config::get_id(),
|
|
peers: ids.clone(),
|
|
..Default::default()
|
|
});
|
|
|
|
loop {
|
|
if SHOULD_EXIT.load(Ordering::SeqCst) {
|
|
// No need to care about onlines
|
|
return Ok((Vec::new(), Vec::new()));
|
|
}
|
|
|
|
let mut socket = match create_online_stream().await {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
log::debug!("Failed to create peers online stream, {e}");
|
|
return Ok((vec![], ids.clone()));
|
|
}
|
|
};
|
|
if let Err(e) = socket.send(&msg_out).await {
|
|
log::debug!("Failed to send peers online states query, {e}");
|
|
return Ok((vec![], ids.clone()));
|
|
}
|
|
if let Some(msg_in) = crate::common::get_next_nonkeyexchange_msg(&mut socket, None).await {
|
|
match msg_in.union {
|
|
Some(rendezvous_message::Union::OnlineResponse(online_response)) => {
|
|
let states = online_response.states;
|
|
let mut onlines = Vec::new();
|
|
let mut offlines = Vec::new();
|
|
for i in 0..ids.len() {
|
|
// bytes index from left to right
|
|
let bit_value = 0x01 << (7 - i % 8);
|
|
if (states[i / 8] & bit_value) == bit_value {
|
|
onlines.push(ids[i].clone());
|
|
} else {
|
|
offlines.push(ids[i].clone());
|
|
}
|
|
}
|
|
return Ok((onlines, offlines));
|
|
}
|
|
_ => {
|
|
// ignore
|
|
}
|
|
}
|
|
} else {
|
|
// TODO: Make sure socket closed?
|
|
bail!("Online stream receives None");
|
|
}
|
|
|
|
if query_begin.elapsed() > timeout {
|
|
bail!("Try query onlines timeout {:?}", &timeout);
|
|
}
|
|
|
|
sleep(300.0).await;
|
|
}
|
|
}
|
|
|
|
enum Sink<'a> {
|
|
Framed(&'a mut FramedSocket, &'a TargetAddr<'a>),
|
|
Stream(&'a mut FramedStream),
|
|
}
|
|
|
|
impl Sink<'_> {
|
|
async fn send(self, msg: &Message) -> ResultType<()> {
|
|
match self {
|
|
Sink::Framed(socket, addr) => socket.send(msg, addr.to_owned()).await,
|
|
Sink::Stream(stream) => stream.send(msg).await,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use hbb_common::tokio;
|
|
|
|
#[tokio::test]
|
|
async fn test_query_onlines() {
|
|
super::query_online_states(
|
|
vec![
|
|
"152183996".to_owned(),
|
|
"165782066".to_owned(),
|
|
"155323351".to_owned(),
|
|
"460952777".to_owned(),
|
|
],
|
|
|onlines: Vec<String>, offlines: Vec<String>| {
|
|
println!("onlines: {:?}, offlines: {:?}", &onlines, &offlines);
|
|
},
|
|
)
|
|
.await;
|
|
}
|
|
}
|