use std::sync::{Arc, RwLock}; use crate::client::*; use hbb_common::{ allow_err, bail, config::READ_TIMEOUT, futures::{SinkExt, StreamExt}, log, message_proto::*, protobuf::Message as _, rendezvous_proto::ConnType, tcp, timeout, tokio::{self, net::TcpStream, sync::mpsc}, tokio_util::codec::{BytesCodec, Framed}, ResultType, Stream, }; fn run_rdp(port: u16) { std::process::Command::new("cmdkey") .arg("/delete:localhost") .output() .ok(); let username = std::env::var("rdp_username").unwrap_or_default(); let password = std::env::var("rdp_password").unwrap_or_default(); if !username.is_empty() || !password.is_empty() { let mut args = vec!["/generic:localhost".to_owned()]; if !username.is_empty() { args.push(format!("/user:{}", username)); } if !password.is_empty() { args.push(format!("/pass:{}", password)); } println!("{:?}", args); std::process::Command::new("cmdkey") .args(&args) .output() .ok(); } std::process::Command::new("mstsc") .arg(format!("/v:localhost:{}", port)) .spawn() .ok(); } pub async fn listen( id: String, password: String, port: i32, interface: impl Interface, ui_receiver: mpsc::UnboundedReceiver, key: &str, token: &str, lc: Arc>, remote_host: String, remote_port: i32, ) -> ResultType<()> { let listener = tcp::new_listener(format!("0.0.0.0:{}", port), true).await?; let addr = listener.local_addr()?; log::info!("listening on port {:?}", addr); let is_rdp = port == 0; if is_rdp { run_rdp(addr.port()); } let mut ui_receiver = ui_receiver; loop { tokio::select! { Ok((forward, addr)) = listener.accept() => { log::info!("new connection from {:?}", addr); lc.write().unwrap().port_forward = (remote_host.clone(), remote_port); let id = id.clone(); let password = password.clone(); let mut forward = Framed::new(forward, BytesCodec::new()); match connect_and_login(&id, &password, &mut ui_receiver, interface.clone(), &mut forward, key, token, is_rdp).await { Ok(Some(stream)) => { let interface = interface.clone(); tokio::spawn(async move { if let Err(err) = run_forward(forward, stream).await { interface.msgbox("error", "Error", &err.to_string(), ""); } log::info!("connection from {:?} closed", addr); }); } Err(err) => { interface.on_establish_connection_error(err.to_string()); } _ => {} } } d = ui_receiver.recv() => { match d { Some(Data::Close) => { break; } Some(Data::NewRDP) => { println!("receive run_rdp from ui_receiver"); run_rdp(addr.port()); } _ => {} } } } } Ok(()) } async fn connect_and_login( id: &str, password: &str, ui_receiver: &mut mpsc::UnboundedReceiver, interface: impl Interface, forward: &mut Framed, key: &str, token: &str, is_rdp: bool, ) -> ResultType> { let conn_type = if is_rdp { ConnType::RDP } else { ConnType::PORT_FORWARD }; let (mut stream, direct, _pk) = Client::start(id, key, token, conn_type, interface.clone()).await?; interface.update_direct(Some(direct)); let mut buffer = Vec::new(); let mut received = false; loop { tokio::select! { res = timeout(READ_TIMEOUT, stream.next()) => match res { Err(_) => { bail!("Timeout"); } Ok(Some(Ok(bytes))) => { if !received { received = true; interface.update_received(true); } let msg_in = Message::parse_from_bytes(&bytes)?; match msg_in.union { Some(message::Union::Hash(hash)) => { interface.handle_hash(password, hash, &mut stream).await; } Some(message::Union::LoginResponse(lr)) => match lr.union { Some(login_response::Union::Error(err)) => { if !interface.handle_login_error(&err) { return Ok(None); } } Some(login_response::Union::PeerInfo(pi)) => { interface.handle_peer_info(pi); break; } _ => {} } Some(message::Union::TestDelay(t)) => { interface.handle_test_delay(t, &mut stream).await; } _ => {} } } Ok(Some(Err(err))) => { bail!("Connection closed: {}", err); } _ => { bail!("Reset by the peer"); } }, d = ui_receiver.recv() => { match d { Some(Data::Login((os_username, os_password, password, remember))) => { interface.handle_login_from_ui(os_username, os_password, password, remember, &mut stream).await; } Some(Data::Message(msg)) => { allow_err!(stream.send(&msg).await); } _ => {} } }, res = forward.next() => { if let Some(Ok(bytes)) = res { buffer.extend(bytes); } else { return Ok(None); } }, } } stream.set_raw(); if !buffer.is_empty() { allow_err!(stream.send_bytes(buffer.into()).await); } Ok(Some(stream)) } async fn run_forward(forward: Framed, stream: Stream) -> ResultType<()> { log::info!("new port forwarding connection started"); let mut forward = forward; let mut stream = stream; loop { tokio::select! { res = forward.next() => { if let Some(Ok(bytes)) = res { allow_err!(stream.send_bytes(bytes.into()).await); } else { break; } }, res = stream.next() => { if let Some(Ok(bytes)) = res { allow_err!(forward.send(bytes).await); } else { break; } }, } } Ok(()) }