debug, query_onlines_block_thread

Signed-off-by: dignow <linlong1265@gmail.com>
This commit is contained in:
dignow 2023-11-16 10:15:51 +08:00
parent 5a51284550
commit 02bc5e3111
3 changed files with 40 additions and 30 deletions

View File

@ -82,7 +82,7 @@ class _PeersViewState extends State<_PeersView> with WindowListener {
final _curPeers = <String>{}; final _curPeers = <String>{};
var _lastChangeTime = DateTime.now(); var _lastChangeTime = DateTime.now();
var _lastQueryPeers = <String>{}; var _lastQueryPeers = <String>{};
var _lastQueryTime = DateTime.now().subtract(const Duration(hours: 1)); var _lastQueryTime = DateTime.now().add(const Duration(seconds: 30));
var _queryCount = 0; var _queryCount = 0;
var _exit = false; var _exit = false;
@ -272,8 +272,7 @@ class _PeersViewState extends State<_PeersView> with WindowListener {
if (_queryCount < _maxQueryCount) { if (_queryCount < _maxQueryCount) {
if (now.difference(_lastQueryTime) >= _queryInterval) { if (now.difference(_lastQueryTime) >= _queryInterval) {
if (_curPeers.isNotEmpty) { if (_curPeers.isNotEmpty) {
platformFFI.ffiBind bind.queryOnlines(ids: _curPeers.toList(growable: false));
.queryOnlines(ids: _curPeers.toList(growable: false));
_lastQueryTime = DateTime.now(); _lastQueryTime = DateTime.now();
_queryCount += 1; _queryCount += 1;
} }
@ -287,7 +286,7 @@ class _PeersViewState extends State<_PeersView> with WindowListener {
_queryOnlines(bool isLoadEvent) { _queryOnlines(bool isLoadEvent) {
if (_curPeers.isNotEmpty) { if (_curPeers.isNotEmpty) {
platformFFI.ffiBind.queryOnlines(ids: _curPeers.toList(growable: false)); bind.queryOnlines(ids: _curPeers.toList(growable: false));
_lastQueryPeers = {..._curPeers}; _lastQueryPeers = {..._curPeers};
if (isLoadEvent) { if (isLoadEvent) {
_lastChangeTime = DateTime.now(); _lastChangeTime = DateTime.now();

View File

@ -1602,7 +1602,7 @@ pub(super) mod async_tasks {
use hbb_common::{ use hbb_common::{
bail, bail,
tokio::{ tokio::{
select, self, select,
sync::mpsc::{unbounded_channel, UnboundedSender}, sync::mpsc::{unbounded_channel, UnboundedSender},
}, },
ResultType, ResultType,
@ -1617,17 +1617,28 @@ pub(super) mod async_tasks {
static ref TX_QUERY_ONLINES: Arc<Mutex<Option<TxQueryOnlines>>> = Default::default(); static ref TX_QUERY_ONLINES: Arc<Mutex<Option<TxQueryOnlines>>> = Default::default();
} }
#[inline]
pub fn start_flutter_async_runner() { pub fn start_flutter_async_runner() {
std::thread::spawn(|| async { std::thread::spawn(start_flutter_async_runner_);
}
#[allow(dead_code)]
pub fn stop_flutter_async_runner() {
let _ = TX_QUERY_ONLINES.lock().unwrap().take();
}
#[tokio::main(flavor = "current_thread")]
async fn start_flutter_async_runner_() {
let (tx_onlines, mut rx_onlines) = unbounded_channel::<Vec<String>>(); let (tx_onlines, mut rx_onlines) = unbounded_channel::<Vec<String>>();
TX_QUERY_ONLINES.lock().unwrap().replace(tx_onlines); TX_QUERY_ONLINES.lock().unwrap().replace(tx_onlines);
loop { loop {
select! { select! {
ids = rx_onlines.recv() => { ids = rx_onlines.recv() => {
match ids { match ids {
Some(_ids) => { Some(_ids) => {
#[cfg(not(any(target_os = "ios")))] #[cfg(not(any(target_os = "ios")))]
crate::rendezvous_mediator::query_online_states(_ids, handle_query_onlines) crate::rendezvous_mediator::query_online_states(_ids, handle_query_onlines).await
} }
None => { None => {
break; break;
@ -1636,17 +1647,11 @@ pub(super) mod async_tasks {
} }
} }
} }
});
}
#[allow(dead_code)]
pub fn stop_flutter_async_runner() {
let _ = TX_QUERY_ONLINES.lock().unwrap().take();
} }
pub fn query_onlines(ids: Vec<String>) -> ResultType<()> { pub fn query_onlines(ids: Vec<String>) -> ResultType<()> {
if let Some(tx) = TX_QUERY_ONLINES.lock().unwrap().as_ref() { if let Some(tx) = TX_QUERY_ONLINES.lock().unwrap().as_ref() {
tx.send(ids)?; let _ = tx.send(ids)?;
} else { } else {
bail!("No tx_query_onlines"); bail!("No tx_query_onlines");
} }

View File

@ -572,7 +572,6 @@ async fn direct_server(server: ServerPtr) {
} }
} }
#[tokio::main(flavor = "current_thread")]
pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<String>, f: F) { pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<String>, f: F) {
let test = false; let test = false;
if test { if test {
@ -598,7 +597,11 @@ pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<S
} }
if query_begin.elapsed() > query_timeout { if query_begin.elapsed() > query_timeout {
log::debug!("query onlines timeout {:?} ({:?})", query_begin.elapsed(), query_timeout); log::debug!(
"query onlines timeout {:?} ({:?})",
query_begin.elapsed(),
query_timeout
);
break; break;
} }
@ -679,8 +682,10 @@ async fn query_online_states_(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#[test] use hbb_common::tokio;
fn test_query_onlines() {
#[tokio::test]
async fn test_query_onlines() {
super::query_online_states( super::query_online_states(
vec![ vec![
"152183996".to_owned(), "152183996".to_owned(),
@ -691,6 +696,7 @@ mod tests {
|onlines: Vec<String>, offlines: Vec<String>| { |onlines: Vec<String>, offlines: Vec<String>| {
println!("onlines: {:?}, offlines: {:?}", &onlines, &offlines); println!("onlines: {:?}, offlines: {:?}", &onlines, &offlines);
}, },
); )
.await;
} }
} }