rustdesk/src/port_forward.rs
21pages 2e16a2be56
fix port forward 2fa (#6956)
Signed-off-by: 21pages <pages21@163.com>
2024-01-22 19:57:23 +08:00

218 lines
7.3 KiB
Rust

use std::sync::{Arc, RwLock};
use crate::client::*;
use hbb_common::{
allow_err, bail,
config::READ_TIMEOUT,
futures::{SinkExt, StreamExt},
log,
message_proto::*,
protobuf::Message as _,
rendezvous_proto::ConnType,
tcp, timeout,
tokio::{self, net::TcpStream, sync::mpsc},
tokio_util::codec::{BytesCodec, Framed},
ResultType, Stream,
};
fn run_rdp(port: u16) {
std::process::Command::new("cmdkey")
.arg("/delete:localhost")
.output()
.ok();
let username = std::env::var("rdp_username").unwrap_or_default();
let password = std::env::var("rdp_password").unwrap_or_default();
if !username.is_empty() || !password.is_empty() {
let mut args = vec!["/generic:localhost".to_owned()];
if !username.is_empty() {
args.push(format!("/user:{}", username));
}
if !password.is_empty() {
args.push(format!("/pass:{}", password));
}
println!("{:?}", args);
std::process::Command::new("cmdkey")
.args(&args)
.output()
.ok();
}
std::process::Command::new("mstsc")
.arg(format!("/v:localhost:{}", port))
.spawn()
.ok();
}
pub async fn listen(
id: String,
password: String,
port: i32,
interface: impl Interface,
ui_receiver: mpsc::UnboundedReceiver<Data>,
key: &str,
token: &str,
lc: Arc<RwLock<LoginConfigHandler>>,
remote_host: String,
remote_port: i32,
) -> ResultType<()> {
let listener = tcp::new_listener(format!("0.0.0.0:{}", port), true).await?;
let addr = listener.local_addr()?;
log::info!("listening on port {:?}", addr);
let is_rdp = port == 0;
if is_rdp {
run_rdp(addr.port());
}
let mut ui_receiver = ui_receiver;
loop {
tokio::select! {
Ok((forward, addr)) = listener.accept() => {
log::info!("new connection from {:?}", addr);
lc.write().unwrap().port_forward = (remote_host.clone(), remote_port);
let id = id.clone();
let password = password.clone();
let mut forward = Framed::new(forward, BytesCodec::new());
match connect_and_login(&id, &password, &mut ui_receiver, interface.clone(), &mut forward, key, token, is_rdp).await {
Ok(Some(stream)) => {
let interface = interface.clone();
tokio::spawn(async move {
if let Err(err) = run_forward(forward, stream).await {
interface.msgbox("error", "Error", &err.to_string(), "");
}
log::info!("connection from {:?} closed", addr);
});
}
Err(err) => {
interface.on_establish_connection_error(err.to_string());
}
_ => {}
}
}
d = ui_receiver.recv() => {
match d {
Some(Data::Close) => {
break;
}
Some(Data::NewRDP) => {
println!("receive run_rdp from ui_receiver");
run_rdp(addr.port());
}
_ => {}
}
}
}
}
Ok(())
}
async fn connect_and_login(
id: &str,
password: &str,
ui_receiver: &mut mpsc::UnboundedReceiver<Data>,
interface: impl Interface,
forward: &mut Framed<TcpStream, BytesCodec>,
key: &str,
token: &str,
is_rdp: bool,
) -> ResultType<Option<Stream>> {
let conn_type = if is_rdp {
ConnType::RDP
} else {
ConnType::PORT_FORWARD
};
let (mut stream, direct, _pk) =
Client::start(id, key, token, conn_type, interface.clone()).await?;
interface.update_direct(Some(direct));
let mut buffer = Vec::new();
let mut received = false;
loop {
tokio::select! {
res = timeout(READ_TIMEOUT, stream.next()) => match res {
Err(_) => {
bail!("Timeout");
}
Ok(Some(Ok(bytes))) => {
if !received {
received = true;
interface.update_received(true);
}
let msg_in = Message::parse_from_bytes(&bytes)?;
match msg_in.union {
Some(message::Union::Hash(hash)) => {
interface.handle_hash(password, hash, &mut stream).await;
}
Some(message::Union::LoginResponse(lr)) => match lr.union {
Some(login_response::Union::Error(err)) => {
if !interface.handle_login_error(&err) {
return Ok(None);
}
}
Some(login_response::Union::PeerInfo(pi)) => {
interface.handle_peer_info(pi);
break;
}
_ => {}
}
Some(message::Union::TestDelay(t)) => {
interface.handle_test_delay(t, &mut stream).await;
}
_ => {}
}
}
Ok(Some(Err(err))) => {
bail!("Connection closed: {}", err);
}
_ => {
bail!("Reset by the peer");
}
},
d = ui_receiver.recv() => {
match d {
Some(Data::Login((os_username, os_password, password, remember))) => {
interface.handle_login_from_ui(os_username, os_password, password, remember, &mut stream).await;
}
Some(Data::Message(msg)) => {
allow_err!(stream.send(&msg).await);
}
_ => {}
}
},
res = forward.next() => {
if let Some(Ok(bytes)) = res {
buffer.extend(bytes);
} else {
return Ok(None);
}
},
}
}
stream.set_raw();
if !buffer.is_empty() {
allow_err!(stream.send_bytes(buffer.into()).await);
}
Ok(Some(stream))
}
async fn run_forward(forward: Framed<TcpStream, BytesCodec>, stream: Stream) -> ResultType<()> {
log::info!("new port forwarding connection started");
let mut forward = forward;
let mut stream = stream;
loop {
tokio::select! {
res = forward.next() => {
if let Some(Ok(bytes)) = res {
allow_err!(stream.send_bytes(bytes.into()).await);
} else {
break;
}
},
res = stream.next() => {
if let Some(Ok(bytes)) = res {
allow_err!(forward.send(bytes).await);
} else {
break;
}
},
}
}
Ok(())
}