fix: Query online, remove loop retry (#9326)

Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
fufesou 2024-09-11 17:17:32 +08:00 committed by GitHub
parent cbca0eb340
commit 2e81bcb447
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 44 additions and 63 deletions

View File

@ -3414,7 +3414,6 @@ pub mod peer_online {
tcp::FramedStream,
ResultType,
};
use std::time::Instant;
pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<String>, f: F) {
let test = false;
@ -3424,29 +3423,14 @@ pub mod peer_online {
let offlines = onlines.drain((onlines.len() / 2)..).collect();
f(onlines, offlines)
} else {
let query_begin = Instant::now();
let query_timeout = std::time::Duration::from_millis(3_000);
loop {
match query_online_states_(&ids, query_timeout).await {
Ok((onlines, offlines)) => {
f(onlines, offlines);
break;
}
Err(e) => {
log::debug!("{}", &e);
}
match query_online_states_(&ids, query_timeout).await {
Ok((onlines, offlines)) => {
f(onlines, offlines);
}
if query_begin.elapsed() > query_timeout {
log::debug!(
"query onlines timeout {:?} ({:?})",
query_begin.elapsed(),
query_timeout
);
break;
Err(e) => {
log::debug!("query onlines, {}", &e);
}
sleep(1.5).await;
}
}
}
@ -3470,8 +3454,6 @@ pub mod peer_online {
ids: &Vec<String>,
timeout: std::time::Duration,
) -> ResultType<(Vec<String>, Vec<String>)> {
let query_begin = Instant::now();
let mut msg_out = RendezvousMessage::new();
msg_out.set_online_request(OnlineRequest {
id: Config::get_id(),
@ -3479,24 +3461,28 @@ pub mod peer_online {
..Default::default()
});
loop {
let mut socket = match create_online_stream().await {
Ok(s) => s,
Err(e) => {
log::debug!("Failed to create peers online stream, {e}");
return Ok((vec![], ids.clone()));
}
};
// TODO: Use long connections to avoid socket creation
// If we use a Arc<Mutex<Option<FramedStream>>> to hold and reuse the previous socket,
// we may face the following error:
// An established connection was aborted by the software in your host machine. (os error 10053)
if let Err(e) = socket.send(&msg_out).await {
log::debug!("Failed to send peers online states query, {e}");
let mut socket = match create_online_stream().await {
Ok(s) => s,
Err(e) => {
log::debug!("Failed to create peers online stream, {e}");
return Ok((vec![], ids.clone()));
}
if let Some(msg_in) =
crate::common::get_next_nonkeyexchange_msg(&mut socket, None).await
};
// TODO: Use long connections to avoid socket creation
// If we use a Arc<Mutex<Option<FramedStream>>> to hold and reuse the previous socket,
// we may face the following error:
// An established connection was aborted by the software in your host machine. (os error 10053)
if let Err(e) = socket.send(&msg_out).await {
log::debug!("Failed to send peers online states query, {e}");
return Ok((vec![], ids.clone()));
}
// Retry for 2 times to get the online response
for _ in 0..2 {
if let Some(msg_in) = crate::common::get_next_nonkeyexchange_msg(
&mut socket,
Some(timeout.as_millis() as _),
)
.await
{
match msg_in.union {
Some(rendezvous_message::Union::OnlineResponse(online_response)) => {
@ -3522,13 +3508,9 @@ pub mod peer_online {
// TODO: Make sure socket closed?
bail!("Online stream receives None");
}
if query_begin.elapsed() > timeout {
bail!("Try query onlines timeout {:?}", &timeout);
}
sleep(300.0).await;
}
bail!("Failed to query online states, no online response");
}
#[cfg(test)]

View File

@ -2057,18 +2057,18 @@ pub mod sessions {
pub(super) mod async_tasks {
use hbb_common::{
bail,
tokio::{
self, select,
sync::mpsc::{unbounded_channel, UnboundedSender},
},
tokio::{self, select},
ResultType,
};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
sync::{
mpsc::{sync_channel, SyncSender},
Arc, Mutex,
},
};
type TxQueryOnlines = UnboundedSender<Vec<String>>;
type TxQueryOnlines = SyncSender<Vec<String>>;
lazy_static::lazy_static! {
static ref TX_QUERY_ONLINES: Arc<Mutex<Option<TxQueryOnlines>>> = Default::default();
}
@ -2085,20 +2085,18 @@ pub(super) mod async_tasks {
#[tokio::main(flavor = "current_thread")]
async fn start_flutter_async_runner_() {
let (tx_onlines, mut rx_onlines) = unbounded_channel::<Vec<String>>();
// Only one task is allowed to run at the same time.
let (tx_onlines, rx_onlines) = sync_channel::<Vec<String>>(1);
TX_QUERY_ONLINES.lock().unwrap().replace(tx_onlines);
loop {
select! {
ids = rx_onlines.recv() => {
match ids {
Some(_ids) => {
crate::client::peer_online::query_online_states(_ids, handle_query_onlines).await
}
None => {
break;
}
}
match rx_onlines.recv() {
Ok(ids) => {
crate::client::peer_online::query_online_states(ids, handle_query_onlines).await
}
_ => {
// unreachable!
break;
}
}
}
@ -2106,7 +2104,8 @@ pub(super) mod async_tasks {
pub fn query_onlines(ids: Vec<String>) -> ResultType<()> {
if let Some(tx) = TX_QUERY_ONLINES.lock().unwrap().as_ref() {
let _ = tx.send(ids)?;
// Ignore if the channel is full.
let _ = tx.try_send(ids)?;
} else {
bail!("No tx_query_onlines");
}