lan_discovery_WOL: lan discovery almost done

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou 2022-07-13 11:18:53 +08:00
parent 7e0f7be95c
commit cbb34fb021
2 changed files with 85 additions and 38 deletions

View File

@ -37,6 +37,7 @@ fn new_socket(addr: SocketAddr, reuse: bool, buf_size: usize) -> Result<Socket,
addr,
socket.recv_buffer_size()
);
socket.set_nonblocking(true)?;
socket.bind(&addr.into())?;
Ok(socket)
}

View File

@ -1,7 +1,4 @@
use crate::{
ipc::get_id,
server::{check_zombie, new as new_server, ServerPtr},
};
use crate::server::{check_zombie, new as new_server, ServerPtr};
use hbb_common::{
allow_err,
anyhow::bail,
@ -13,6 +10,7 @@ use hbb_common::{
sleep, socket_client,
tokio::{
self, select,
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
time::{interval, Duration},
},
udp::FramedSocket,
@ -20,6 +18,7 @@ use hbb_common::{
};
use serde_derive::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
@ -548,14 +547,20 @@ pub fn get_broadcast_port() -> u16 {
(RENDEZVOUS_PORT + 3) as _
}
#[derive(Default, Serialize, Deserialize, Clone)]
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct DiscoveryPeer {
id: String,
mac: String,
ip: String,
mac_ips: HashMap<String, String>,
username: String,
hostname: String,
platform: String,
online: bool,
}
impl DiscoveryPeer {
fn is_same_peer(&self, other: &DiscoveryPeer) -> bool {
self.id == other.id && self.username == other.username
}
}
pub fn get_mac(ip: &IpAddr) -> String {
@ -690,9 +695,9 @@ fn send_query() -> ResultType<Vec<UdpSocket>> {
fn wait_response(
socket: UdpSocket,
timeout: Option<std::time::Duration>,
) -> ResultType<Vec<DiscoveryPeer>> {
tx: UnboundedSender<DiscoveryPeer>,
) -> ResultType<()> {
let mut last_recv_time = Instant::now();
let mut peers = Vec::new();
socket.set_read_timeout(timeout)?;
loop {
@ -709,20 +714,18 @@ fn wait_response(
"".to_owned()
};
let is_self = if !mac.is_empty() {
p.mac == mac
} else {
p.id == get_id()
};
if !is_self {
peers.push(DiscoveryPeer {
if mac != p.mac {
allow_err!(tx.send(DiscoveryPeer {
id: p.id.clone(),
mac: p.mac.clone(),
ip: addr.to_string(),
mac_ips: HashMap::from([(
p.mac.clone(),
addr.ip().to_string()
)]),
username: p.username.clone(),
hostname: p.hostname.clone(),
platform: p.platform.clone(),
});
online: true,
}));
}
}
}
@ -734,40 +737,83 @@ fn wait_response(
break;
}
}
Ok(peers)
Ok(())
}
pub fn discover() -> ResultType<()> {
let sockets = send_query()?;
let mut join_handles = Vec::new();
fn spawn_wait_responses(sockets: Vec<UdpSocket>) -> UnboundedReceiver<DiscoveryPeer> {
let (tx, rx) = unbounded_channel::<_>();
for socket in sockets {
let handle = std::thread::spawn(move || {
wait_response(socket, Some(std::time::Duration::from_millis(10)))
let tx_clone = tx.clone();
std::thread::spawn(move || {
allow_err!(wait_response(
socket,
Some(std::time::Duration::from_millis(10)),
tx_clone
));
});
join_handles.push(handle);
}
rx
}
// to-do: load saved peers, and update incrementally (then we can see offline)
async fn handle_received_peers(mut rx: UnboundedReceiver<DiscoveryPeer>) -> ResultType<()> {
let mut peers: Vec<DiscoveryPeer> = match serde_json::from_str(&config::LanPeers::load().peers)
{
Ok(p) => p,
_ => Vec::new(),
};
peers.iter_mut().for_each(|peer| {
peer.online = false;
});
for handle in join_handles {
match handle.join() {
Ok(Ok(mut tmp)) => {
peers.append(&mut tmp);
}
Ok(Err(e)) => {
log::error!("Failed lan discove {e}");
}
Err(e) => {
log::error!("Failed join lan discove thread {e:?}");
// handle received peers
let mut response_set = HashSet::new();
let mut last_write_time = Instant::now() - std::time::Duration::from_secs(4);
loop {
tokio::select! {
data = rx.recv() => match data {
Some(peer) => {
let in_response_set = !response_set.insert(peer.id.clone());
let mut pre_found = false;
for peer1 in &mut peers {
if peer1.is_same_peer(&peer) {
if in_response_set {
peer1.mac_ips.extend(peer.mac_ips.clone());
peer1.hostname = peer.hostname.clone();
peer1.platform = peer.platform.clone();
peer1.online = true;
} else {
*peer1 = peer.clone();
}
pre_found = true;
break
}
}
if !pre_found {
peers.push(peer);
}
if last_write_time.elapsed().as_millis() > 300 {
config::LanPeers::store(serde_json::to_string(&peers)?);
last_write_time = Instant::now();
}
}
None => {
break
}
}
}
}
log::info!("discover ping done");
config::LanPeers::store(serde_json::to_string(&peers)?);
Ok(())
}
#[tokio::main(flavor = "current_thread")]
pub async fn discover() -> ResultType<()> {
let sockets = send_query()?;
let rx = spawn_wait_responses(sockets);
handle_received_peers(rx).await?;
log::info!("discover ping done");
Ok(())
}