Merge pull request #1390 from Heap-Hop/master

Update port-forward
This commit is contained in:
RustDesk 2022-08-29 19:54:14 +08:00 committed by GitHub
commit 8858d0342d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 179 additions and 33 deletions

View File

@ -57,7 +57,7 @@ class _FileManagerTabPageState extends State<FileManagerTabPage> {
} else if (call.method == "onDestroy") {
tabController.state.value.tabs.forEach((tab) {
print("executing onDestroy hook, closing ${tab.label}}");
final tag = tab.label;
final tag = 'ft_${tab.label}';
ffi(tag).close().then((_) {
Get.delete<FFI>(tag: tag);
});

View File

@ -48,7 +48,7 @@ class _PortForwardPageState extends State<PortForwardPage>
void initState() {
super.initState();
_ffi = FFI();
// _ffi.connect(widget.id, isPortForward: true);
_ffi.connect(widget.id, isPortForward: true);
Get.put(_ffi, tag: 'pf_${widget.id}');
if (!Platform.isLinux) {
Wakelock.enable();
@ -190,8 +190,8 @@ class _PortForwardPageState extends State<PortForwardPage>
remotePort != null &&
(remoteHostController.text.isEmpty ||
remoteHostController.text.trim().isNotEmpty)) {
await bind.mainAddPortForward(
id: widget.id,
await bind.sessionAddPortForward(
id: 'pf_${widget.id}',
localPort: localPort,
remoteHost: remoteHostController.text.trim().isEmpty
? 'localhost'
@ -261,8 +261,8 @@ class _PortForwardPageState extends State<PortForwardPage>
child: IconButton(
icon: const Icon(Icons.close),
onPressed: () async {
await bind.mainRemovePortForward(
id: widget.id, localPort: pf.localPort);
await bind.sessionRemovePortForward(
id: 'pf_${widget.id}', localPort: pf.localPort);
refreshTunnelConfig();
},
),

View File

@ -26,7 +26,7 @@ class _PortForwardTabPageState extends State<PortForwardTabPage> {
_PortForwardTabPageState(Map<String, dynamic> params) {
tabController.add(TabInfo(
key: params['id'] + params['isRDP'].toString(),
key: params['id'],
label: params['id'],
selectedIcon: selectedIcon,
unselectedIcon: unselectedIcon,
@ -61,7 +61,7 @@ class _PortForwardTabPageState extends State<PortForwardTabPage> {
} else if (call.method == "onDestroy") {
tabController.state.value.tabs.forEach((tab) {
print("executing onDestroy hook, closing ${tab.label}}");
final tag = tab.label;
final tag = 'pf_${tab.label}';
ffi(tag).close().then((_) {
Get.delete<FFI>(tag: tag);
});

View File

@ -539,6 +539,38 @@ impl Session {
],
);
}
pub fn remove_port_forward(&mut self, port: i32) {
let mut config = self.load_config();
config.port_forwards = config
.port_forwards
.drain(..)
.filter(|x| x.0 != port)
.collect();
self.save_config(&config);
self.send(Data::RemovePortForward(port));
}
pub fn add_port_forward(&mut self, port: i32, remote_host: String, remote_port: i32) {
let mut config = self.load_config();
if config
.port_forwards
.iter()
.filter(|x| x.0 == port)
.next()
.is_some()
{
return;
}
let pf = (port, remote_host, remote_port);
config.port_forwards.push(pf.clone());
self.save_config(&config);
self.send(Data::AddPortForward(pf));
}
fn on_error(&self, err: &str) {
self.msgbox("error", "Error", err);
}
}
impl FileManager for Session {}
@ -734,7 +766,7 @@ impl Connection {
if !is_file_transfer && !is_port_forward {
stop_clipboard = Self::start_clipboard(sender.clone(), session.lc.clone());
}
*session.sender.write().unwrap() = Some(sender);
*session.sender.write().unwrap() = Some(sender.clone());
let conn_type = if is_file_transfer {
session.lc.write().unwrap().is_file_transfer = true;
ConnType::FILE_TRANSFER
@ -743,6 +775,99 @@ impl Connection {
} else {
ConnType::DEFAULT_CONN
};
let key = Config::get_option("key");
let token = Config::get_option("access_token");
// TODO rdp & cli args
let is_rdp = false;
let args: Vec<i32> = Vec::new();
if is_port_forward {
if is_rdp {
// let port = handler
// .get_option("rdp_port".to_owned())
// .parse::<i32>()
// .unwrap_or(3389);
// std::env::set_var(
// "rdp_username",
// handler.get_option("rdp_username".to_owned()),
// );
// std::env::set_var(
// "rdp_password",
// handler.get_option("rdp_password".to_owned()),
// );
// log::info!("Remote rdp port: {}", port);
// start_one_port_forward(handler, 0, "".to_owned(), port, receiver, &key, &token).await;
} else if args.len() == 0 {
let pfs = session.lc.read().unwrap().port_forwards.clone();
let mut queues = HashMap::<i32, mpsc::UnboundedSender<Data>>::new();
for d in pfs {
sender.send(Data::AddPortForward(d)).ok();
}
loop {
match receiver.recv().await {
Some(Data::AddPortForward((port, remote_host, remote_port))) => {
if port <= 0 || remote_port <= 0 {
continue;
}
let (sender, receiver) = mpsc::unbounded_channel::<Data>();
queues.insert(port, sender);
let handler = session.clone();
let key = key.clone();
let token = token.clone();
tokio::spawn(async move {
start_one_port_forward(
handler,
port,
remote_host,
remote_port,
receiver,
&key,
&token,
)
.await;
});
}
Some(Data::RemovePortForward(port)) => {
if let Some(s) = queues.remove(&port) {
s.send(Data::Close).ok();
}
}
Some(Data::Close) => {
break;
}
Some(d) => {
for (_, s) in queues.iter() {
s.send(d.clone()).ok();
}
}
_ => {}
}
}
} else {
// let port = handler.args[0].parse::<i32>().unwrap_or(0);
// if handler.args.len() != 3
// || handler.args[2].parse::<i32>().unwrap_or(0) <= 0
// || port <= 0
// {
// handler.on_error("Invalid arguments, usage:<br><br> rustdesk --port-forward remote-id listen-port remote-host remote-port");
// }
// let remote_host = handler.args[1].clone();
// let remote_port = handler.args[2].parse::<i32>().unwrap_or(0);
// start_one_port_forward(
// handler,
// port,
// remote_host,
// remote_port,
// receiver,
// &key,
// &token,
// )
// .await;
}
return;
}
let latency_controller = LatencyController::new();
let latency_controller_cl = latency_controller.clone();
@ -759,8 +884,6 @@ impl Connection {
frame_count: Arc::new(AtomicUsize::new(0)),
video_format: CodecFormat::Unknown,
};
let key = Config::get_option("key");
let token = Config::get_option("access_token");
match Client::start(&session.id, &key, &token, conn_type, session.clone()).await {
Ok((mut peer, direct)) => {
@ -2288,3 +2411,31 @@ pub fn get_session_id(id: String) -> String {
id
};
}
async fn start_one_port_forward(
handler: Session,
port: i32,
remote_host: String,
remote_port: i32,
receiver: mpsc::UnboundedReceiver<Data>,
key: &str,
token: &str,
) {
if let Err(err) = crate::port_forward::listen(
handler.id.clone(),
String::new(), // TODO
port,
handler.clone(),
receiver,
key,
token,
handler.lc.clone(),
remote_host,
remote_port,
)
.await
{
handler.on_error(&format!("Failed to listen on {}: {}", port, err));
}
log::info!("port forward (:{}) exit", port);
}

View File

@ -602,30 +602,16 @@ pub fn main_load_lan_peers() {
};
}
pub fn main_add_port_forward(id: String, local_port: i32, remote_host: String, remote_port: i32) {
let mut config = get_peer(id.clone());
if config
.port_forwards
.iter()
.filter(|x| x.0 == local_port)
.next()
.is_some()
{
return;
pub fn session_add_port_forward(id: String, local_port: i32, remote_host: String, remote_port: i32) {
if let Some(session) = SESSIONS.write().unwrap().get_mut(&id) {
session.add_port_forward(local_port, remote_host, remote_port);
}
let pf = (local_port, remote_host, remote_port);
config.port_forwards.push(pf);
config.store(&id);
}
pub fn main_remove_port_forward(id: String, local_port: i32) {
let mut config = get_peer(id.clone());
config.port_forwards = config
.port_forwards
.drain(..)
.filter(|x| x.0 != local_port)
.collect();
config.store(&id);
pub fn session_remove_port_forward(id: String, local_port: i32) {
if let Some(session) = SESSIONS.write().unwrap().get_mut(&id) {
session.remove_port_forward(local_port);
}
}
pub fn main_get_last_remote_id() -> String {

View File

@ -1,3 +1,5 @@
use std::sync::{Arc, RwLock};
use crate::client::*;
use hbb_common::{
allow_err, bail,
@ -48,6 +50,9 @@ pub async fn listen(
ui_receiver: mpsc::UnboundedReceiver<Data>,
key: &str,
token: &str,
lc: Arc<RwLock<LoginConfigHandler>>,
remote_host: String,
remote_port: i32,
) -> ResultType<()> {
let listener = tcp::new_listener(format!("0.0.0.0:{}", port), true).await?;
let addr = listener.local_addr()?;
@ -61,6 +66,7 @@ pub async fn listen(
tokio::select! {
Ok((forward, addr)) = listener.accept() => {
log::info!("new connection from {:?}", addr);
lc.write().unwrap().port_forward = (remote_host.clone(), remote_port);
let id = id.clone();
let password = password.clone();
let mut forward = Framed::new(forward, BytesCodec::new());

View File

@ -949,6 +949,7 @@ impl Connection {
addr
))
.await;
return false;
}
}
}

View File

@ -1253,7 +1253,6 @@ async fn start_one_port_forward(
key: &str,
token: &str,
) {
handler.lc.write().unwrap().port_forward = (remote_host, remote_port);
if let Err(err) = crate::port_forward::listen(
handler.id.clone(),
handler.password.clone(),
@ -1262,6 +1261,9 @@ async fn start_one_port_forward(
receiver,
key,
token,
handler.lc.clone(),
remote_host,
remote_port,
)
.await
{