rustdesk/src/client/io_loop.rs

1825 lines
81 KiB
Rust
Raw Normal View History

use std::{
collections::HashMap,
num::NonZeroI64,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
},
};
2022-09-01 09:48:53 +08:00
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
use clipboard::ContextSend;
use crossbeam_queue::ArrayQueue;
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use hbb_common::sleep;
#[cfg(not(target_os = "ios"))]
2023-02-06 11:42:25 +08:00
use hbb_common::tokio::sync::mpsc::error::TryRecvError;
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
2023-02-06 11:42:25 +08:00
use hbb_common::tokio::sync::Mutex as TokioMutex;
use hbb_common::{
allow_err,
config::{PeerConfig, TransferSerde},
fs,
fs::{
can_enable_overwrite_detection, get_job, get_string, new_send_confirm, DigestCheckResult,
RemoveJobMeta,
},
get_time, log,
message_proto::permission_info::Permission,
message_proto::*,
protobuf::Message as _,
rendezvous_proto::ConnType,
tokio::{
self,
sync::mpsc,
time::{self, Duration, Instant, Interval},
},
ResultType, Stream,
2022-09-01 09:48:53 +08:00
};
use scrap::CodecFormat;
2022-09-01 09:48:53 +08:00
2023-02-05 23:47:06 +08:00
use crate::client::{
new_voice_call_request, Client, MediaData, MediaSender, QualityStatus, MILLI1, SEC30,
2023-02-05 23:47:06 +08:00
};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use crate::common::{self, update_clipboard};
2023-02-06 11:42:25 +08:00
use crate::common::{get_default_sound_input, set_sound_input};
2023-02-05 23:47:06 +08:00
use crate::ui_session_interface::{InvokeUiSession, Session};
2023-04-17 19:26:39 +08:00
#[cfg(not(any(target_os = "ios")))]
use crate::{audio_service, ConnInner, CLIENT_SERVER};
2023-02-06 11:42:25 +08:00
use crate::{client::Data, client::Interface};
2022-09-01 09:48:53 +08:00
pub struct Remote<T: InvokeUiSession> {
2022-09-01 09:48:53 +08:00
handler: Session<T>,
video_queue_map: Arc<RwLock<HashMap<usize, ArrayQueue<VideoFrame>>>>,
2022-09-01 09:48:53 +08:00
video_sender: MediaSender,
audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>,
sender: mpsc::UnboundedSender<Data>,
// Stop sending local audio to remote client.
2023-02-06 11:42:25 +08:00
stop_voice_call_sender: Option<std::sync::mpsc::Sender<()>>,
voice_call_request_timestamp: Option<NonZeroI64>,
2022-09-01 09:48:53 +08:00
read_jobs: Vec<fs::TransferJob>,
write_jobs: Vec<fs::TransferJob>,
remove_jobs: HashMap<i32, RemoveJob>,
timer: Interval,
last_update_jobs_status: (Instant, HashMap<i32, u64>),
is_connected: bool,
2022-09-01 09:48:53 +08:00
first_frame: bool,
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
client_conn_id: i32, // used for file clipboard
2022-09-01 09:48:53 +08:00
data_count: Arc<AtomicUsize>,
frame_count_map: Arc<RwLock<HashMap<usize, usize>>>,
2022-09-01 09:48:53 +08:00
video_format: CodecFormat,
elevation_requested: bool,
fps_control_map: HashMap<usize, FpsControl>,
decode_fps_map: Arc<RwLock<HashMap<usize, usize>>>,
chroma: Arc<RwLock<Option<Chroma>>>,
2022-09-01 09:48:53 +08:00
}
impl<T: InvokeUiSession> Remote<T> {
2022-09-01 09:48:53 +08:00
pub fn new(
handler: Session<T>,
video_queue: Arc<RwLock<HashMap<usize, ArrayQueue<VideoFrame>>>>,
2022-09-01 09:48:53 +08:00
video_sender: MediaSender,
audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>,
sender: mpsc::UnboundedSender<Data>,
frame_count_map: Arc<RwLock<HashMap<usize, usize>>>,
decode_fps: Arc<RwLock<HashMap<usize, usize>>>,
chroma: Arc<RwLock<Option<Chroma>>>,
2022-09-01 09:48:53 +08:00
) -> Self {
Self {
handler,
video_queue_map: video_queue,
2022-09-01 09:48:53 +08:00
video_sender,
audio_sender,
receiver,
sender,
read_jobs: Vec::new(),
write_jobs: Vec::new(),
remove_jobs: Default::default(),
timer: time::interval(SEC30),
last_update_jobs_status: (Instant::now(), Default::default()),
is_connected: false,
2022-09-01 09:48:53 +08:00
first_frame: false,
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
client_conn_id: 0,
2022-09-01 09:48:53 +08:00
data_count: Arc::new(AtomicUsize::new(0)),
frame_count_map,
2022-09-01 09:48:53 +08:00
video_format: CodecFormat::Unknown,
2023-02-06 11:42:25 +08:00
stop_voice_call_sender: None,
voice_call_request_timestamp: None,
elevation_requested: false,
fps_control_map: Default::default(),
decode_fps_map: decode_fps,
chroma,
2022-09-01 09:48:53 +08:00
}
}
pub async fn io_loop(&mut self, key: &str, token: &str, round: u32) {
2022-09-01 09:48:53 +08:00
let mut last_recv_time = Instant::now();
let mut received = false;
let conn_type = if self.handler.is_file_transfer() {
ConnType::FILE_TRANSFER
} else {
ConnType::default()
};
2022-11-04 12:02:17 +08:00
2022-09-01 09:48:53 +08:00
match Client::start(
&self.handler.id,
key,
token,
conn_type,
self.handler.clone(),
)
.await
{
Ok((mut peer, direct, pk)) => {
self.handler
.connection_round_state
.lock()
.unwrap()
.set_connected();
2022-09-01 09:48:53 +08:00
self.handler.set_connection_type(peer.is_secured(), direct); // flutter -> connection_ready
self.handler.update_direct(Some(direct));
if conn_type == ConnType::DEFAULT_CONN {
self.handler
.set_fingerprint(crate::common::pk_to_fingerprint(pk.unwrap_or_default()));
}
2022-09-01 09:48:53 +08:00
// just build for now
#[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
2022-09-01 09:48:53 +08:00
let (_tx_holder, mut rx_clip_client) = mpsc::unbounded_channel::<i32>();
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
let (_tx_holder, rx) = mpsc::unbounded_channel();
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
let mut rx_clip_client_lock = Arc::new(TokioMutex::new(rx));
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
{
let is_conn_not_default = self.handler.is_file_transfer()
|| self.handler.is_port_forward()
|| self.handler.is_rdp();
if !is_conn_not_default {
log::debug!("get cliprdr client for conn_id {}", self.client_conn_id);
(self.client_conn_id, rx_clip_client_lock) =
clipboard::get_rx_cliprdr_client(&self.handler.id);
};
}
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
let mut rx_clip_client = rx_clip_client_lock.lock().await;
2022-09-01 09:48:53 +08:00
let mut status_timer = time::interval(Duration::new(1, 0));
let mut fps_instant = Instant::now();
2022-09-01 09:48:53 +08:00
loop {
tokio::select! {
res = peer.next() => {
if let Some(res) = res {
match res {
Err(err) => {
self.handler.on_establish_connection_error(err.to_string());
2022-09-01 09:48:53 +08:00
break;
}
Ok(ref bytes) => {
last_recv_time = Instant::now();
if !received {
received = true;
self.handler.update_received(true);
}
2022-09-01 09:48:53 +08:00
self.data_count.fetch_add(bytes.len(), Ordering::Relaxed);
if !self.handle_msg_from_peer(bytes, &mut peer).await {
break
}
}
}
} else {
if self.handler.is_restarting_remote_device() {
log::info!("Restart remote device");
self.handler.msgbox("restarting", "Restarting Remote Device", "remote_restarting_tip", "");
2022-09-01 09:48:53 +08:00
} else {
log::info!("Reset by the peer");
self.handler.msgbox("error", "Connection Error", "Reset by the peer", "");
2022-09-01 09:48:53 +08:00
}
break;
}
}
d = self.receiver.recv() => {
if let Some(d) = d {
if !self.handle_msg_from_ui(d, &mut peer).await {
break;
}
}
}
_msg = rx_clip_client.recv() => {
#[cfg(any(target_os="windows", target_os="linux", target_os = "macos"))]
self.handle_local_clipboard_msg(&mut peer, _msg).await;
2022-09-01 09:48:53 +08:00
}
_ = self.timer.tick() => {
if last_recv_time.elapsed() >= SEC30 {
self.handler.msgbox("error", "Connection Error", "Timeout", "");
2022-09-01 09:48:53 +08:00
break;
}
if !self.read_jobs.is_empty() {
if let Err(err) = fs::handle_read_jobs(&mut self.read_jobs, &mut peer).await {
self.handler.msgbox("error", "Connection Error", &err.to_string(), "");
2022-09-01 09:48:53 +08:00
break;
}
self.update_jobs_status();
} else {
self.timer = time::interval_at(Instant::now() + SEC30, SEC30);
}
}
_ = status_timer.tick() => {
self.fps_control(direct);
let elapsed = fps_instant.elapsed().as_millis();
if elapsed < 1000 {
continue;
}
fps_instant = Instant::now();
let mut speed = self.data_count.swap(0, Ordering::Relaxed);
speed = speed * 1000 / elapsed as usize;
2022-09-01 09:48:53 +08:00
let speed = format!("{:.2}kB/s", speed as f32 / 1024 as f32);
let mut frame_count_map_write = self.frame_count_map.write().unwrap();
let frame_count_map = frame_count_map_write.clone();
frame_count_map_write.values_mut().for_each(|v| *v = 0);
drop(frame_count_map_write);
let fps = frame_count_map.iter().map(|(k, v)| {
// Correcting the inaccuracy of status_timer
(k.clone(), (*v as i32) * 1000 / elapsed as i32)
}).collect::<HashMap<usize, i32>>();
let chroma = self.chroma.read().unwrap().clone();
let chroma = match chroma {
Some(Chroma::I444) => "4:4:4",
Some(Chroma::I420) => "4:2:0",
None => "-",
};
let chroma = Some(chroma.to_string());
2022-09-01 09:48:53 +08:00
self.handler.update_quality_status(QualityStatus {
speed: Some(speed),
fps,
chroma,
2022-09-01 09:48:53 +08:00
..Default::default()
});
}
}
}
log::debug!("Exit io_loop of id={}", self.handler.id);
2023-01-29 22:23:18 +08:00
// Stop client audio server.
2023-02-06 11:42:25 +08:00
if let Some(s) = self.stop_voice_call_sender.take() {
s.send(()).ok();
2023-01-29 22:23:18 +08:00
}
2022-09-01 09:48:53 +08:00
}
Err(err) => {
self.handler.on_establish_connection_error(err.to_string());
2022-09-01 09:48:53 +08:00
}
}
// set_disconnected_ok is used to check if new connection round is started.
let _set_disconnected_ok = self
.handler
.connection_round_state
.lock()
.unwrap()
.set_disconnected(round);
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if _set_disconnected_ok {
Client::try_stop_clipboard(&self.handler.id);
}
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
if _set_disconnected_ok {
let conn_id = self.client_conn_id;
log::debug!("try empty cliprdr for conn_id {}", conn_id);
let _ = ContextSend::proc(|context| -> ResultType<()> {
context.empty_clipboard(conn_id)?;
Ok(())
});
}
2022-09-01 09:48:53 +08:00
}
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
async fn handle_local_clipboard_msg(
&self,
peer: &mut crate::client::FramedStream,
msg: Option<clipboard::ClipboardFile>,
) {
match msg {
Some(clip) => match clip {
clipboard::ClipboardFile::NotifyCallback {
r#type,
title,
text,
} => {
self.handler.msgbox(&r#type, &title, &text, "");
}
_ => {
let is_stopping_allowed = clip.is_stopping_allowed();
let server_file_transfer_enabled =
*self.handler.server_file_transfer_enabled.read().unwrap();
let file_transfer_enabled =
self.handler.lc.read().unwrap().enable_file_transfer.v;
let stop = is_stopping_allowed
&& (!self.is_connected
|| !(server_file_transfer_enabled && file_transfer_enabled));
log::debug!("Process clipboard message from system, stop: {}, is_stopping_allowed: {}, server_file_transfer_enabled: {}, file_transfer_enabled: {}", stop, is_stopping_allowed, server_file_transfer_enabled, file_transfer_enabled);
if stop {
ContextSend::set_is_stopped();
} else {
if let Err(e) = ContextSend::make_sure_enabled() {
log::error!("failed to restart clipboard context: {}", e);
};
log::debug!("Send system clipboard message to remote");
let msg = crate::clipboard_file::clip_2_msg(clip);
allow_err!(peer.send(&msg).await);
}
}
},
None => {
// unreachable!()
}
}
}
2022-09-01 09:48:53 +08:00
fn handle_job_status(&mut self, id: i32, file_num: i32, err: Option<String>) {
if let Some(job) = self.remove_jobs.get_mut(&id) {
if job.no_confirm {
let file_num = (file_num + 1) as usize;
if file_num < job.files.len() {
let path = format!("{}{}{}", job.path, job.sep, job.files[file_num].name);
self.sender
.send(Data::RemoveFile((id, path, file_num as i32, job.is_remote)))
.ok();
let elapsed = job.last_update_job_status.elapsed().as_millis() as i32;
if elapsed >= 1000 {
job.last_update_job_status = Instant::now();
} else {
return;
}
} else {
self.remove_jobs.remove(&id);
}
}
}
if let Some(err) = err {
self.handler.job_error(id, err, file_num);
} else {
self.handler.job_done(id, file_num);
}
}
2023-02-06 11:42:25 +08:00
fn stop_voice_call(&mut self) {
let voice_call_sender = std::mem::replace(&mut self.stop_voice_call_sender, None);
if let Some(stopper) = voice_call_sender {
let _ = stopper.send(());
}
}
// Start a voice call recorder, records audio and send to remote
fn start_voice_call(&mut self) -> Option<std::sync::mpsc::Sender<()>> {
2022-11-03 21:09:37 +08:00
if self.handler.is_file_transfer() || self.handler.is_port_forward() {
return None;
}
// Switch to default input device
let default_sound_device = get_default_sound_input();
if let Some(device) = default_sound_device {
set_sound_input(device);
}
2023-04-17 19:26:39 +08:00
// iOS does not have this server.
#[cfg(not(any(target_os = "ios")))]
{
// Create a channel to receive error or closed message
let (tx, rx) = std::sync::mpsc::channel();
let (tx_audio_data, mut rx_audio_data) =
hbb_common::tokio::sync::mpsc::unbounded_channel();
2023-04-17 19:26:39 +08:00
// Create a stand-alone inner, add subscribe to audio service
let conn_id = CLIENT_SERVER.write().unwrap().get_new_id();
let client_conn_inner = ConnInner::new(conn_id.clone(), Some(tx_audio_data), None);
// now we subscribe
CLIENT_SERVER.write().unwrap().subscribe(
audio_service::NAME,
client_conn_inner.clone(),
true,
);
let tx_audio = self.sender.clone();
std::thread::spawn(move || {
loop {
// check if client is closed
match rx.try_recv() {
Ok(_) | Err(std::sync::mpsc::TryRecvError::Disconnected) => {
log::debug!("Exit voice call audio service of client");
// unsubscribe
CLIENT_SERVER.write().unwrap().subscribe(
audio_service::NAME,
client_conn_inner,
false,
);
break;
2022-11-04 12:02:17 +08:00
}
_ => {}
2023-04-17 19:26:39 +08:00
}
match rx_audio_data.try_recv() {
Ok((_instant, msg)) => match &msg.union {
Some(message::Union::AudioFrame(frame)) => {
let mut msg = Message::new();
msg.set_audio_frame(frame.clone());
tx_audio.send(Data::Message(msg)).ok();
}
Some(message::Union::Misc(misc)) => {
let mut msg = Message::new();
msg.set_misc(misc.clone());
tx_audio.send(Data::Message(msg)).ok();
}
_ => {}
},
Err(err) => {
if err == TryRecvError::Empty {
// ignore
} else {
log::debug!("Failed to record local audio channel: {}", err);
}
2022-11-03 21:09:37 +08:00
}
}
}
2023-04-17 19:26:39 +08:00
});
return Some(tx);
}
#[cfg(target_os = "ios")]
{
None
}
2022-11-03 21:09:37 +08:00
}
2022-09-01 09:48:53 +08:00
async fn handle_msg_from_ui(&mut self, data: Data, peer: &mut Stream) -> bool {
match data {
Data::Close => {
let mut misc = Misc::new();
misc.set_close_reason("".to_owned());
let mut msg = Message::new();
msg.set_misc(misc);
allow_err!(peer.send(&msg).await);
return false;
}
Data::Login((os_username, os_password, password, remember)) => {
2022-09-01 09:48:53 +08:00
self.handler
.handle_login_from_ui(os_username, os_password, password, remember, peer)
2022-09-01 09:48:53 +08:00
.await;
}
#[cfg(not(feature = "flutter"))]
2022-09-01 09:48:53 +08:00
Data::ToggleClipboardFile => {
self.check_clipboard_file_context();
}
Data::Message(msg) => {
allow_err!(peer.send(&msg).await);
}
Data::SendFiles((id, path, to, file_num, include_hidden, is_remote)) => {
log::info!("send files, is remote {}", is_remote);
let od = can_enable_overwrite_detection(self.handler.lc.read().unwrap().version);
if is_remote {
log::debug!("New job {}, write to {} from remote {}", id, to, path);
self.write_jobs.push(fs::TransferJob::new_write(
id,
path.clone(),
to,
file_num,
include_hidden,
is_remote,
Vec::new(),
od,
));
allow_err!(
peer.send(&fs::new_send(id, path, file_num, include_hidden))
.await
);
} else {
match fs::TransferJob::new_read(
id,
to.clone(),
path.clone(),
file_num,
include_hidden,
is_remote,
od,
) {
Err(err) => {
self.handle_job_status(id, -1, Some(err.to_string()));
}
Ok(job) => {
log::debug!(
"New job {}, read {} to remote {}, {} files",
id,
path,
to,
job.files().len()
);
2022-09-05 20:32:21 +08:00
self.handler.update_folder_files(
job.id(),
job.files(),
path,
!is_remote,
true,
);
2022-09-01 09:48:53 +08:00
#[cfg(not(windows))]
let files = job.files().clone();
#[cfg(windows)]
let mut files = job.files().clone();
#[cfg(windows)]
if self.handler.peer_platform() != "Windows" {
// peer is not windows, need transform \ to /
fs::transform_windows_path(&mut files);
}
let total_size = job.total_size();
2022-09-01 09:48:53 +08:00
self.read_jobs.push(job);
self.timer = time::interval(MILLI1);
allow_err!(
peer.send(&fs::new_receive(id, to, file_num, files, total_size))
.await
);
2022-09-01 09:48:53 +08:00
}
}
}
}
Data::AddJob((id, path, to, file_num, include_hidden, is_remote)) => {
let od = can_enable_overwrite_detection(self.handler.lc.read().unwrap().version);
if is_remote {
log::debug!(
"new write waiting job {}, write to {} from remote {}",
id,
to,
path
);
let mut job = fs::TransferJob::new_write(
id,
path.clone(),
to,
file_num,
include_hidden,
is_remote,
Vec::new(),
od,
);
job.is_last_job = true;
self.write_jobs.push(job);
} else {
match fs::TransferJob::new_read(
id,
to.clone(),
path.clone(),
file_num,
include_hidden,
is_remote,
od,
) {
Err(err) => {
self.handle_job_status(id, -1, Some(err.to_string()));
}
Ok(mut job) => {
log::debug!(
"new read waiting job {}, read {} to remote {}, {} files",
id,
path,
to,
job.files().len()
);
2022-09-05 20:32:21 +08:00
self.handler.update_folder_files(
job.id(),
job.files(),
path,
!is_remote,
true,
);
2022-09-01 09:48:53 +08:00
job.is_last_job = true;
self.read_jobs.push(job);
self.timer = time::interval(MILLI1);
}
}
}
}
Data::ResumeJob((id, is_remote)) => {
if is_remote {
if let Some(job) = get_job(id, &mut self.write_jobs) {
job.is_last_job = false;
allow_err!(
peer.send(&fs::new_send(
id,
job.remote.clone(),
job.file_num,
job.show_hidden
))
.await
);
}
} else {
if let Some(job) = get_job(id, &mut self.read_jobs) {
job.is_last_job = false;
allow_err!(
peer.send(&fs::new_receive(
id,
job.path.to_string_lossy().to_string(),
job.file_num,
job.files.clone(),
job.total_size(),
2022-09-01 09:48:53 +08:00
))
.await
);
}
}
}
Data::SetNoConfirm(id) => {
if let Some(job) = self.remove_jobs.get_mut(&id) {
job.no_confirm = true;
}
}
Data::ConfirmDeleteFiles((id, file_num)) => {
if let Some(job) = self.remove_jobs.get_mut(&id) {
let i = file_num as usize;
if i < job.files.len() {
self.handler.ui_handler.confirm_delete_files(
id,
file_num,
job.files[i].name.clone(),
);
}
}
}
Data::SetConfirmOverrideFile((id, file_num, need_override, remember, is_upload)) => {
if is_upload {
if let Some(job) = fs::get_job(id, &mut self.read_jobs) {
if remember {
job.set_overwrite_strategy(Some(need_override));
}
job.confirm(&FileTransferSendConfirmRequest {
id,
file_num,
union: if need_override {
Some(file_transfer_send_confirm_request::Union::OffsetBlk(0))
} else {
Some(file_transfer_send_confirm_request::Union::Skip(true))
},
..Default::default()
});
}
} else {
if let Some(job) = fs::get_job(id, &mut self.write_jobs) {
if remember {
job.set_overwrite_strategy(Some(need_override));
}
let mut msg = Message::new();
let mut file_action = FileAction::new();
2022-12-05 19:40:49 +08:00
let req = FileTransferSendConfirmRequest {
2022-09-01 09:48:53 +08:00
id,
file_num,
union: if need_override {
Some(file_transfer_send_confirm_request::Union::OffsetBlk(0))
} else {
Some(file_transfer_send_confirm_request::Union::Skip(true))
},
..Default::default()
2022-12-05 19:40:49 +08:00
};
job.confirm(&req);
file_action.set_send_confirm(req);
2022-09-01 09:48:53 +08:00
msg.set_file_action(file_action);
allow_err!(peer.send(&msg).await);
}
}
}
Data::RemoveDirAll((id, path, is_remote, include_hidden)) => {
let sep = self.handler.get_path_sep(is_remote);
if is_remote {
let mut msg_out = Message::new();
let mut file_action = FileAction::new();
file_action.set_all_files(ReadAllFiles {
id,
path: path.clone(),
include_hidden,
..Default::default()
});
msg_out.set_file_action(file_action);
allow_err!(peer.send(&msg_out).await);
self.remove_jobs
.insert(id, RemoveJob::new(Vec::new(), path, sep, is_remote));
} else {
match fs::get_recursive_files(&path, include_hidden) {
Ok(entries) => {
2022-09-05 20:32:21 +08:00
self.handler.update_folder_files(
id,
&entries,
path.clone(),
!is_remote,
false,
);
2022-09-01 09:48:53 +08:00
self.remove_jobs
.insert(id, RemoveJob::new(entries, path, sep, is_remote));
}
Err(err) => {
self.handle_job_status(id, -1, Some(err.to_string()));
}
}
}
}
Data::CancelJob(id) => {
let mut msg_out = Message::new();
let mut file_action = FileAction::new();
file_action.set_cancel(FileTransferCancel {
id: id,
..Default::default()
});
msg_out.set_file_action(file_action);
allow_err!(peer.send(&msg_out).await);
if let Some(job) = fs::get_job(id, &mut self.write_jobs) {
job.remove_download_file();
fs::remove_job(id, &mut self.write_jobs);
}
fs::remove_job(id, &mut self.read_jobs);
self.remove_jobs.remove(&id);
}
Data::RemoveDir((id, path)) => {
let mut msg_out = Message::new();
let mut file_action = FileAction::new();
file_action.set_remove_dir(FileRemoveDir {
id,
path,
recursive: true,
..Default::default()
});
msg_out.set_file_action(file_action);
allow_err!(peer.send(&msg_out).await);
}
Data::RemoveFile((id, path, file_num, is_remote)) => {
if is_remote {
let mut msg_out = Message::new();
let mut file_action = FileAction::new();
file_action.set_remove_file(FileRemoveFile {
id,
path,
file_num,
..Default::default()
});
msg_out.set_file_action(file_action);
allow_err!(peer.send(&msg_out).await);
} else {
match fs::remove_file(&path) {
Err(err) => {
self.handle_job_status(id, file_num, Some(err.to_string()));
}
Ok(()) => {
self.handle_job_status(id, file_num, None);
}
}
}
}
Data::CreateDir((id, path, is_remote)) => {
if is_remote {
let mut msg_out = Message::new();
let mut file_action = FileAction::new();
file_action.set_create(FileDirCreate {
id,
path,
..Default::default()
});
msg_out.set_file_action(file_action);
allow_err!(peer.send(&msg_out).await);
} else {
match fs::create_dir(&path) {
Err(err) => {
self.handle_job_status(id, -1, Some(err.to_string()));
}
Ok(()) => {
self.handle_job_status(id, -1, None);
}
}
}
}
Data::RecordScreen(start, display, w, h, id) => {
let _ = self
.video_sender
.send(MediaData::RecordScreen(start, display, w, h, id));
}
Data::ElevateDirect => {
let mut request = ElevationRequest::new();
request.set_direct(true);
let mut misc = Misc::new();
misc.set_elevation_request(request);
let mut msg = Message::new();
msg.set_misc(misc);
allow_err!(peer.send(&msg).await);
self.elevation_requested = true;
}
Data::ElevateWithLogon(username, password) => {
let mut request = ElevationRequest::new();
request.set_logon(ElevationRequestWithLogon {
username,
password,
..Default::default()
});
let mut misc = Misc::new();
misc.set_elevation_request(request);
let mut msg = Message::new();
msg.set_misc(misc);
allow_err!(peer.send(&msg).await);
self.elevation_requested = true;
}
2023-02-05 23:47:06 +08:00
Data::NewVoiceCall => {
2023-02-06 11:42:25 +08:00
let msg = new_voice_call_request(true);
// Save the voice call request timestamp for the further validation.
self.voice_call_request_timestamp = Some(
NonZeroI64::new(msg.voice_call_request().req_timestamp)
.unwrap_or(NonZeroI64::new(get_time()).unwrap()),
);
2023-02-05 23:47:06 +08:00
allow_err!(peer.send(&msg).await);
2023-02-06 12:14:20 +08:00
self.handler.on_voice_call_waiting();
2023-02-05 23:47:06 +08:00
}
Data::CloseVoiceCall => {
2023-02-06 11:42:25 +08:00
self.stop_voice_call();
let msg = new_voice_call_request(false);
self.handler
.on_voice_call_closed("Closed manually by the peer");
2023-02-05 23:47:06 +08:00
allow_err!(peer.send(&msg).await);
}
2022-09-01 09:48:53 +08:00
_ => {}
}
true
}
#[inline]
fn update_job_status(
job: &fs::TransferJob,
elapsed: i32,
last_update_jobs_status: &mut (Instant, HashMap<i32, u64>),
handler: &Session<T>,
2022-09-01 09:48:53 +08:00
) {
if elapsed <= 0 {
return;
}
let transferred = job.transferred();
let last_transferred = {
if let Some(v) = last_update_jobs_status.1.get(&job.id()) {
v.to_owned()
} else {
0
}
};
last_update_jobs_status.1.insert(job.id(), transferred);
let speed = (transferred - last_transferred) as f64 / (elapsed as f64 / 1000.);
let file_num = job.file_num() - 1;
handler.job_progress(job.id(), file_num, speed, job.finished_size() as f64);
}
fn update_jobs_status(&mut self) {
let elapsed = self.last_update_jobs_status.0.elapsed().as_millis() as i32;
if elapsed >= 1000 {
for job in self.read_jobs.iter() {
Self::update_job_status(
job,
elapsed,
&mut self.last_update_jobs_status,
&self.handler,
2022-09-01 09:48:53 +08:00
);
}
for job in self.write_jobs.iter() {
Self::update_job_status(
job,
elapsed,
&mut self.last_update_jobs_status,
&mut self.handler,
);
}
self.last_update_jobs_status.0 = Instant::now();
}
}
pub async fn sync_jobs_status_to_local(&mut self) -> bool {
log::info!("sync transfer job status");
let mut config: PeerConfig = self.handler.load_config();
let mut transfer_metas = TransferSerde::default();
for job in self.read_jobs.iter() {
let json_str = serde_json::to_string(&job.gen_meta()).unwrap_or_default();
transfer_metas.read_jobs.push(json_str);
}
for job in self.write_jobs.iter() {
let json_str = serde_json::to_string(&job.gen_meta()).unwrap_or_default();
transfer_metas.write_jobs.push(json_str);
}
log::info!("meta: {:?}", transfer_metas);
if config.transfer != transfer_metas {
config.transfer = transfer_metas;
self.handler.save_config(config);
}
2022-09-01 09:48:53 +08:00
true
}
async fn send_opts_after_login(&self, peer: &mut Stream) {
if let Some(opts) = self
.handler
.lc
.read()
.unwrap()
.get_option_message_after_login()
{
let mut misc = Misc::new();
misc.set_option(opts);
let mut msg_out = Message::new();
msg_out.set_misc(misc);
allow_err!(peer.send(&msg_out).await);
}
}
fn contains_key_frame(vf: &VideoFrame) -> bool {
use video_frame::Union::*;
match &vf.union {
Some(vf) => match vf {
Vp8s(f) | Vp9s(f) | Av1s(f) | H264s(f) | H265s(f) => f.frames.iter().any(|e| e.key),
_ => false,
},
None => false,
}
}
#[inline]
fn fps_control(&mut self, direct: bool) {
let decode_fps_read = self.decode_fps_map.read().unwrap();
for (display, decode_fps) in decode_fps_read.iter() {
let video_queue_map_read = self.video_queue_map.read().unwrap();
let Some(video_queue) = video_queue_map_read.get(display) else {
continue;
};
if !self.fps_control_map.contains_key(display) {
self.fps_control_map.insert(*display, FpsControl::default());
}
let Some(ctl) = self.fps_control_map.get_mut(display) else {
return;
};
let len = video_queue.len();
let decode_fps = *decode_fps;
let limited_fps = if direct {
decode_fps * 9 / 10 // 30 got 27
} else {
decode_fps * 4 / 5 // 30 got 24
};
// send full speed fps
let version = self.handler.lc.read().unwrap().version;
let max_encode_speed = 144 * 10 / 9;
if version >= hbb_common::get_version_number("1.2.1")
&& (ctl.last_full_speed_fps.is_none() // First time
|| ((ctl.last_full_speed_fps.unwrap_or_default() - decode_fps as i32).abs() >= 5 // diff 5
&& !(decode_fps > max_encode_speed // already exceed max encoding speed
&& ctl.last_full_speed_fps.unwrap_or_default() > max_encode_speed as i32)))
{
let mut misc = Misc::new();
misc.set_full_speed_fps(decode_fps as _);
let mut msg = Message::new();
msg.set_misc(misc);
self.sender.send(Data::Message(msg)).ok();
ctl.last_full_speed_fps = Some(decode_fps as _);
}
// decrease judgement
let debounce = if decode_fps > 10 { decode_fps / 2 } else { 5 }; // 500ms
let should_decrease = len >= debounce // exceed debounce
&& len > ctl.last_queue_size + 5 // still caching
&& !ctl.last_custom_fps.unwrap_or(i32::MAX) < limited_fps as i32; // NOT already set a smaller one
// increase judgement
if len <= 1 {
ctl.idle_counter += 1;
} else {
ctl.idle_counter = 0;
}
let mut should_increase = false;
if let Some(last_custom_fps) = ctl.last_custom_fps {
// ever set
if last_custom_fps + 5 < limited_fps as i32 && ctl.idle_counter > 3 {
// limited_fps is 5 larger than last set, and idle time is more than 3 seconds
should_increase = true;
}
}
if should_decrease || should_increase {
// limited_fps to ensure decoding is faster than encoding
let mut custom_fps = limited_fps as i32;
if custom_fps < 1 {
custom_fps = 1;
}
// send custom fps
let mut misc = Misc::new();
if version > hbb_common::get_version_number("1.2.1") {
// avoid confusion with custom image quality fps
misc.set_auto_adjust_fps(custom_fps as _);
} else {
misc.set_option(OptionMessage {
custom_fps,
..Default::default()
});
}
let mut msg = Message::new();
msg.set_misc(misc);
self.sender.send(Data::Message(msg)).ok();
ctl.last_queue_size = len;
ctl.last_custom_fps = Some(custom_fps);
}
// send refresh
if ctl.refresh_times < 10 // enough
&& (len > video_queue.capacity() / 2
&& (ctl.refresh_times == 0 || ctl.last_refresh_instant.elapsed().as_secs() > 30))
{
// Refresh causes client set_display, left frames cause flickering.
while let Some(_) = video_queue.pop() {}
self.handler.refresh_video(*display as _);
ctl.refresh_times += 1;
ctl.last_refresh_instant = Instant::now();
}
}
}
2022-09-01 09:48:53 +08:00
async fn handle_msg_from_peer(&mut self, data: &[u8], peer: &mut Stream) -> bool {
if let Ok(msg_in) = Message::parse_from_bytes(&data) {
match msg_in.union {
Some(message::Union::VideoFrame(vf)) => {
if !self.first_frame {
self.first_frame = true;
self.handler.close_success();
self.handler.adapt_size();
self.send_opts_after_login(peer).await;
}
let incoming_format = CodecFormat::from(&vf);
if self.video_format != incoming_format {
self.video_format = incoming_format.clone();
2022-09-01 09:48:53 +08:00
self.handler.update_quality_status(QualityStatus {
codec_format: Some(incoming_format),
2022-09-01 09:48:53 +08:00
..Default::default()
})
};
let display = vf.display as usize;
let mut video_queue_write = self.video_queue_map.write().unwrap();
if !video_queue_write.contains_key(&display) {
video_queue_write.insert(
display,
ArrayQueue::<VideoFrame>::new(crate::client::VIDEO_QUEUE_SIZE),
);
}
if Self::contains_key_frame(&vf) {
if let Some(video_queue) = video_queue_write.get_mut(&display) {
while let Some(_) = video_queue.pop() {}
}
self.video_sender
.send(MediaData::VideoFrame(Box::new(vf)))
.ok();
} else {
if let Some(video_queue) = video_queue_write.get_mut(&display) {
video_queue.force_push(vf);
}
self.video_sender.send(MediaData::VideoQueue(display)).ok();
}
2022-09-01 09:48:53 +08:00
}
Some(message::Union::Hash(hash)) => {
self.handler
.handle_hash(&self.handler.password.clone(), hash, peer)
.await;
}
Some(message::Union::LoginResponse(lr)) => match lr.union {
Some(login_response::Union::Error(err)) => {
if !self.handler.handle_login_error(&err) {
return false;
}
}
Some(login_response::Union::PeerInfo(pi)) => {
self.handler.handle_peer_info(pi);
2022-09-05 10:27:33 +08:00
self.check_clipboard_file_context();
if !(self.handler.is_file_transfer() || self.handler.is_port_forward()) {
#[cfg(feature = "flutter")]
#[cfg(not(any(target_os = "android", target_os = "ios")))]
Client::try_start_clipboard(None);
#[cfg(not(feature = "flutter"))]
#[cfg(not(any(target_os = "android", target_os = "ios")))]
Client::try_start_clipboard(Some(
crate::client::ClientClipboardContext {
cfg: self.handler.get_permission_config(),
tx: self.sender.clone(),
},
));
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if let Some(msg_out) = Client::get_current_text_clipboard_msg() {
let sender = self.sender.clone();
let permission_config = self.handler.get_permission_config();
tokio::spawn(async move {
// due to clipboard service interval time
sleep(common::CLIPBOARD_INTERVAL as f32 / 1_000.).await;
if permission_config.is_text_clipboard_required() {
sender.send(Data::Message(msg_out)).ok();
}
});
}
// on connection established client
#[cfg(all(feature = "flutter", feature = "plugin_framework"))]
#[cfg(not(any(target_os = "android", target_os = "ios")))]
crate::plugin::handle_listen_event(
crate::plugin::EVENT_ON_CONN_CLIENT.to_owned(),
self.handler.id.clone(),
)
2022-09-05 10:27:33 +08:00
}
2022-09-01 09:48:53 +08:00
2022-09-05 10:27:33 +08:00
if self.handler.is_file_transfer() {
self.handler.load_last_jobs();
2022-09-05 10:27:33 +08:00
}
self.is_connected = true;
2022-09-01 09:48:53 +08:00
}
_ => {}
},
Some(message::Union::CursorData(cd)) => {
self.handler.set_cursor_data(cd);
}
Some(message::Union::CursorId(id)) => {
self.handler.set_cursor_id(id.to_string());
}
Some(message::Union::CursorPosition(cp)) => {
self.handler.set_cursor_position(cp);
}
Some(message::Union::Clipboard(cb)) => {
if !self.handler.lc.read().unwrap().disable_clipboard.v {
2022-09-01 09:48:53 +08:00
#[cfg(not(any(target_os = "android", target_os = "ios")))]
update_clipboard(cb, Some(&crate::client::get_old_clipboard_text()));
2022-09-01 09:48:53 +08:00
#[cfg(any(target_os = "android", target_os = "ios"))]
{
let content = if cb.compress {
hbb_common::compress::decompress(&cb.content)
} else {
cb.content.into()
};
if let Ok(content) = String::from_utf8(content) {
self.handler.clipboard(content);
}
}
}
}
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
2022-09-01 09:48:53 +08:00
Some(message::Union::Cliprdr(clip)) => {
self.handle_cliprdr_msg(clip);
2022-09-01 09:48:53 +08:00
}
Some(message::Union::FileResponse(fr)) => {
match fr.union {
Some(file_response::Union::Dir(fd)) => {
#[cfg(windows)]
let entries = fd.entries.to_vec();
#[cfg(not(windows))]
let mut entries = fd.entries.to_vec();
#[cfg(not(windows))]
{
if self.handler.peer_platform() == "Windows" {
fs::transform_windows_path(&mut entries);
}
}
self.handler
.update_folder_files(fd.id, &entries, fd.path, false, false);
2022-09-01 09:48:53 +08:00
if let Some(job) = fs::get_job(fd.id, &mut self.write_jobs) {
log::info!("job set_files: {:?}", entries);
job.set_files(entries);
} else if let Some(job) = self.remove_jobs.get_mut(&fd.id) {
job.files = entries;
}
}
Some(file_response::Union::Digest(digest)) => {
if digest.is_upload {
if let Some(job) = fs::get_job(digest.id, &mut self.read_jobs) {
if let Some(file) = job.files().get(digest.file_num as usize) {
let read_path = get_string(&job.join(&file.name));
let overwrite_strategy = job.default_overwrite_strategy();
if let Some(overwrite) = overwrite_strategy {
let req = FileTransferSendConfirmRequest {
id: digest.id,
file_num: digest.file_num,
union: Some(if overwrite {
file_transfer_send_confirm_request::Union::OffsetBlk(0)
} else {
file_transfer_send_confirm_request::Union::Skip(
true,
)
}),
..Default::default()
};
job.confirm(&req);
let msg = new_send_confirm(req);
allow_err!(peer.send(&msg).await);
} else {
self.handler.override_file_confirm(
digest.id,
digest.file_num,
read_path,
true,
digest.is_identical,
2022-09-01 09:48:53 +08:00
);
}
}
}
} else {
if let Some(job) = fs::get_job(digest.id, &mut self.write_jobs) {
if let Some(file) = job.files().get(digest.file_num as usize) {
let write_path = get_string(&job.join(&file.name));
let overwrite_strategy = job.default_overwrite_strategy();
match fs::is_write_need_confirmation(&write_path, &digest) {
Ok(res) => match res {
DigestCheckResult::IsSame => {
2022-12-05 19:40:49 +08:00
let req = FileTransferSendConfirmRequest {
2022-09-01 09:48:53 +08:00
id: digest.id,
file_num: digest.file_num,
union: Some(file_transfer_send_confirm_request::Union::Skip(true)),
..Default::default()
2022-12-05 19:40:49 +08:00
};
job.confirm(&req);
let msg = new_send_confirm(req);
2022-09-01 09:48:53 +08:00
allow_err!(peer.send(&msg).await);
}
DigestCheckResult::NeedConfirm(digest) => {
if let Some(overwrite) = overwrite_strategy {
2022-12-05 19:40:49 +08:00
let req = FileTransferSendConfirmRequest {
id: digest.id,
file_num: digest.file_num,
union: Some(if overwrite {
file_transfer_send_confirm_request::Union::OffsetBlk(0)
} else {
file_transfer_send_confirm_request::Union::Skip(true)
}),
..Default::default()
};
job.confirm(&req);
let msg = new_send_confirm(req);
2022-09-01 09:48:53 +08:00
allow_err!(peer.send(&msg).await);
} else {
self.handler.override_file_confirm(
digest.id,
digest.file_num,
write_path,
false,
digest.is_identical,
2022-09-01 09:48:53 +08:00
);
}
}
DigestCheckResult::NoSuchFile => {
2022-12-05 19:40:49 +08:00
let req = FileTransferSendConfirmRequest {
2022-09-01 09:48:53 +08:00
id: digest.id,
file_num: digest.file_num,
union: Some(file_transfer_send_confirm_request::Union::OffsetBlk(0)),
..Default::default()
2022-12-05 19:40:49 +08:00
};
job.confirm(&req);
let msg = new_send_confirm(req);
2022-09-01 09:48:53 +08:00
allow_err!(peer.send(&msg).await);
}
},
Err(err) => {
println!("error receiving digest: {}", err);
2022-09-01 09:48:53 +08:00
}
}
}
}
}
}
Some(file_response::Union::Block(block)) => {
if let Some(job) = fs::get_job(block.id, &mut self.write_jobs) {
if let Err(_err) = job.write(block).await {
2022-09-01 09:48:53 +08:00
// to-do: add "skip" for writing job
}
self.update_jobs_status();
}
}
Some(file_response::Union::Done(d)) => {
2022-12-05 19:40:49 +08:00
let mut err: Option<String> = None;
2022-09-01 09:48:53 +08:00
if let Some(job) = fs::get_job(d.id, &mut self.write_jobs) {
job.modify_time();
2022-12-05 19:40:49 +08:00
err = job.job_error();
2022-09-01 09:48:53 +08:00
fs::remove_job(d.id, &mut self.write_jobs);
}
2022-12-05 19:40:49 +08:00
self.handle_job_status(d.id, d.file_num, err);
2022-09-01 09:48:53 +08:00
}
Some(file_response::Union::Error(e)) => {
if let Some(_job) = fs::get_job(e.id, &mut self.write_jobs) {
2022-12-06 15:09:57 +08:00
fs::remove_job(e.id, &mut self.write_jobs);
}
2022-09-01 09:48:53 +08:00
self.handle_job_status(e.id, e.file_num, Some(e.error));
}
_ => {}
}
}
Some(message::Union::Misc(misc)) => match misc.union {
Some(misc::Union::AudioFormat(f)) => {
self.audio_sender.send(MediaData::AudioFormat(f)).ok();
}
Some(misc::Union::ChatMessage(c)) => {
self.handler.new_message(c.text);
}
Some(misc::Union::PermissionInfo(p)) => {
log::info!("Change permission {:?} -> {}", p.permission, p.enabled);
// https://github.com/rustdesk/rustdesk/issues/3703#issuecomment-1474734754
match p.permission.enum_value() {
Ok(Permission::Keyboard) => {
#[cfg(feature = "flutter")]
#[cfg(not(any(target_os = "android", target_os = "ios")))]
crate::flutter::update_text_clipboard_required();
*self.handler.server_keyboard_enabled.write().unwrap() = p.enabled;
2022-09-01 09:48:53 +08:00
self.handler.set_permission("keyboard", p.enabled);
}
Ok(Permission::Clipboard) => {
#[cfg(feature = "flutter")]
#[cfg(not(any(target_os = "android", target_os = "ios")))]
crate::flutter::update_text_clipboard_required();
*self.handler.server_clipboard_enabled.write().unwrap() = p.enabled;
2022-09-01 09:48:53 +08:00
self.handler.set_permission("clipboard", p.enabled);
}
Ok(Permission::Audio) => {
2022-09-01 09:48:53 +08:00
self.handler.set_permission("audio", p.enabled);
}
Ok(Permission::File) => {
*self.handler.server_file_transfer_enabled.write().unwrap() =
p.enabled;
2022-09-01 09:48:53 +08:00
if !p.enabled && self.handler.is_file_transfer() {
return true;
}
self.handler.set_permission("file", p.enabled);
}
Ok(Permission::Restart) => {
2022-09-01 09:48:53 +08:00
self.handler.set_permission("restart", p.enabled);
}
Ok(Permission::Recording) => {
self.handler.set_permission("recording", p.enabled);
}
Ok(Permission::BlockInput) => {
self.handler.set_permission("block_input", p.enabled);
}
_ => {}
2022-09-01 09:48:53 +08:00
}
}
Some(misc::Union::SwitchDisplay(s)) => {
self.handler.handle_peer_switch_display(&s);
self.video_sender
.send(MediaData::Reset(s.display as _))
.ok();
2022-09-01 09:48:53 +08:00
if s.width > 0 && s.height > 0 {
self.handler.set_display(
s.x,
s.y,
s.width,
s.height,
s.cursor_embedded,
);
2022-09-01 09:48:53 +08:00
}
}
Some(misc::Union::CloseReason(c)) => {
self.handler.msgbox("error", "Connection Error", &c, "");
2022-09-01 09:48:53 +08:00
return false;
}
Some(misc::Union::BackNotification(notification)) => {
if !self.handle_back_notification(notification).await {
return false;
}
}
Some(misc::Union::Uac(uac)) => {
let keyboard = self.handler.server_keyboard_enabled.read().unwrap().clone();
#[cfg(feature = "flutter")]
{
if uac && keyboard {
self.handler.msgbox(
"on-uac",
"Prompt",
"Please wait for confirmation of UAC...",
"",
);
} else {
self.handler.cancel_msgbox("on-uac");
self.handler.cancel_msgbox("wait-uac");
self.handler.cancel_msgbox("elevation-error");
}
}
#[cfg(not(feature = "flutter"))]
{
let msgtype = "custom-uac-nocancel";
let title = "Prompt";
let text = "Please wait for confirmation of UAC...";
let link = "";
if uac && keyboard {
self.handler.msgbox(msgtype, title, text, link);
} else {
self.handler.cancel_msgbox(&format!(
"{}-{}-{}-{}",
msgtype, title, text, link,
));
}
}
}
Some(misc::Union::ForegroundWindowElevated(elevated)) => {
let keyboard = self.handler.server_keyboard_enabled.read().unwrap().clone();
#[cfg(feature = "flutter")]
{
if elevated && keyboard {
self.handler.msgbox(
"on-foreground-elevated",
"Prompt",
"elevated_foreground_window_tip",
"",
);
} else {
self.handler.cancel_msgbox("on-foreground-elevated");
self.handler.cancel_msgbox("wait-uac");
self.handler.cancel_msgbox("elevation-error");
}
}
#[cfg(not(feature = "flutter"))]
{
let msgtype = "custom-elevated-foreground-nocancel";
let title = "Prompt";
let text = "elevated_foreground_window_tip";
let link = "";
if elevated && keyboard {
self.handler.msgbox(msgtype, title, text, link);
} else {
self.handler.cancel_msgbox(&format!(
"{}-{}-{}-{}",
msgtype, title, text, link,
));
}
}
}
Some(misc::Union::ElevationResponse(err)) => {
if err.is_empty() {
self.handler.msgbox("wait-uac", "", "", "");
} else {
self.handler.cancel_msgbox("wait-uac");
self.handler
.msgbox("elevation-error", "Elevation Error", &err, "");
}
}
Some(misc::Union::PortableServiceRunning(b)) => {
self.handler.portable_service_running(b);
if self.elevation_requested && b {
self.handler.msgbox(
"custom-nocancel-success",
"Successful",
"Elevate successfully",
"",
);
}
}
Some(misc::Union::SwitchBack(_)) => {
#[cfg(feature = "flutter")]
self.handler.switch_back(&self.handler.id);
}
#[cfg(all(feature = "flutter", feature = "plugin_framework"))]
#[cfg(not(any(target_os = "android", target_os = "ios")))]
Some(misc::Union::PluginRequest(p)) => {
allow_err!(crate::plugin::handle_server_event(
&p.id,
&self.handler.id,
&p.content
));
// to-do: show message box on UI when error occurs?
}
#[cfg(all(feature = "flutter", feature = "plugin_framework"))]
#[cfg(not(any(target_os = "android", target_os = "ios")))]
Some(misc::Union::PluginFailure(p)) => {
let name = if p.name.is_empty() {
"plugin".to_string()
} else {
p.name
};
self.handler.msgbox("custom-nocancel", &name, &p.msg, "");
}
2022-09-01 09:48:53 +08:00
_ => {}
},
Some(message::Union::TestDelay(t)) => {
self.handler.handle_test_delay(t, peer).await;
}
Some(message::Union::AudioFrame(frame)) => {
if !self.handler.lc.read().unwrap().disable_audio.v {
self.audio_sender
.send(MediaData::AudioFrame(Box::new(frame)))
.ok();
2022-09-01 09:48:53 +08:00
}
}
Some(message::Union::FileAction(action)) => match action.union {
Some(file_action::Union::SendConfirm(c)) => {
if let Some(job) = fs::get_job(c.id, &mut self.read_jobs) {
job.confirm(&c);
}
}
_ => {}
},
Some(message::Union::MessageBox(msgbox)) => {
let mut link = msgbox.link;
// Links from the remote side must be verified.
if !link.starts_with("rustdesk://") {
if let Some(v) = hbb_common::config::HELPER_URL.get(&link as &str) {
link = v.to_string();
} else {
log::warn!("Message box ignore link {} for security", &link);
link = "".to_string();
}
}
self.handler
.msgbox(&msgbox.msgtype, &msgbox.title, &msgbox.text, &link);
}
2023-02-07 19:09:22 +08:00
Some(message::Union::VoiceCallRequest(request)) => {
if request.is_connect {
2023-02-07 19:40:50 +08:00
// TODO: maybe we will do a voice call from the peer in the future.
2023-02-07 19:09:22 +08:00
} else {
2023-02-07 19:40:50 +08:00
log::debug!("The remote has requested to close the voice call");
2023-02-07 19:09:22 +08:00
if let Some(sender) = self.stop_voice_call_sender.take() {
allow_err!(sender.send(()));
self.handler.on_voice_call_closed("");
}
}
2023-02-05 23:47:06 +08:00
}
Some(message::Union::VoiceCallResponse(response)) => {
2023-02-06 11:42:25 +08:00
let ts = std::mem::replace(&mut self.voice_call_request_timestamp, None);
if let Some(ts) = ts {
if response.req_timestamp != ts.get() {
log::debug!("Possible encountering a voice call attack.");
} else {
if response.accepted {
2023-02-07 17:09:36 +08:00
// The peer accepted the voice call.
2023-02-06 15:36:36 +08:00
self.handler.on_voice_call_started();
2023-02-06 11:42:25 +08:00
self.stop_voice_call_sender = self.start_voice_call();
} else {
// The peer refused the voice call.
2023-02-07 16:53:46 +08:00
self.handler.on_voice_call_closed("");
2023-02-06 11:42:25 +08:00
}
}
}
2023-02-05 23:47:06 +08:00
}
Some(message::Union::PeerInfo(pi)) => {
self.handler.set_displays(&pi.displays);
self.handler.set_platform_additions(&pi.platform_additions);
}
2022-09-01 09:48:53 +08:00
_ => {}
}
}
true
}
async fn handle_back_notification(&mut self, notification: BackNotification) -> bool {
match notification.union {
Some(back_notification::Union::BlockInputState(state)) => {
self.handle_back_msg_block_input(
state.enum_value_or(back_notification::BlockInputState::BlkStateUnknown),
notification.details,
2022-09-01 09:48:53 +08:00
)
.await;
}
Some(back_notification::Union::PrivacyModeState(state)) => {
if !self
.handle_back_msg_privacy_mode(
state.enum_value_or(back_notification::PrivacyModeState::PrvStateUnknown),
notification.details,
2022-09-01 09:48:53 +08:00
)
.await
{
return false;
}
}
_ => {}
}
true
}
#[inline(always)]
fn update_block_input_state(&mut self, on: bool) {
self.handler.update_block_input_state(on);
}
async fn handle_back_msg_block_input(
&mut self,
state: back_notification::BlockInputState,
details: String,
) {
2022-09-01 09:48:53 +08:00
match state {
back_notification::BlockInputState::BlkOnSucceeded => {
self.update_block_input_state(true);
}
back_notification::BlockInputState::BlkOnFailed => {
self.handler.msgbox(
"custom-error",
"Block user input",
if details.is_empty() {
"Failed"
} else {
&details
},
"",
);
2022-09-01 09:48:53 +08:00
self.update_block_input_state(false);
}
back_notification::BlockInputState::BlkOffSucceeded => {
self.update_block_input_state(false);
}
back_notification::BlockInputState::BlkOffFailed => {
self.handler.msgbox(
"custom-error",
"Unblock user input",
if details.is_empty() {
"Failed"
} else {
&details
},
"",
);
2022-09-01 09:48:53 +08:00
}
_ => {}
}
}
#[inline(always)]
fn update_privacy_mode(&mut self, on: bool) {
let mut config = self.handler.load_config();
config.privacy_mode.v = on;
2022-09-01 09:48:53 +08:00
self.handler.save_config(config);
self.handler.update_privacy_mode();
}
async fn handle_back_msg_privacy_mode(
&mut self,
state: back_notification::PrivacyModeState,
details: String,
2022-09-01 09:48:53 +08:00
) -> bool {
match state {
back_notification::PrivacyModeState::PrvOnByOther => {
self.handler.msgbox(
"error",
"Connecting...",
"Someone turns on privacy mode, exit",
"",
2022-09-01 09:48:53 +08:00
);
return false;
}
back_notification::PrivacyModeState::PrvNotSupported => {
self.handler
.msgbox("custom-error", "Privacy mode", "Unsupported", "");
2022-09-01 09:48:53 +08:00
self.update_privacy_mode(false);
}
back_notification::PrivacyModeState::PrvOnSucceeded => {
self.handler
.msgbox("custom-nocancel", "Privacy mode", "In privacy mode", "");
2022-09-01 09:48:53 +08:00
self.update_privacy_mode(true);
}
back_notification::PrivacyModeState::PrvOnFailedDenied => {
self.handler
.msgbox("custom-error", "Privacy mode", "Peer denied", "");
2022-09-01 09:48:53 +08:00
self.update_privacy_mode(false);
}
back_notification::PrivacyModeState::PrvOnFailedPlugin => {
self.handler
.msgbox("custom-error", "Privacy mode", "Please install plugins", "");
2022-09-01 09:48:53 +08:00
self.update_privacy_mode(false);
}
back_notification::PrivacyModeState::PrvOnFailed => {
self.handler.msgbox(
"custom-error",
"Privacy mode",
if details.is_empty() {
"Failed"
} else {
&details
},
"",
);
2022-09-01 09:48:53 +08:00
self.update_privacy_mode(false);
}
back_notification::PrivacyModeState::PrvOffSucceeded => {
self.handler
.msgbox("custom-nocancel", "Privacy mode", "Out privacy mode", "");
2022-09-01 09:48:53 +08:00
self.update_privacy_mode(false);
}
back_notification::PrivacyModeState::PrvOffByPeer => {
self.handler
.msgbox("custom-error", "Privacy mode", "Peer exit", "");
2022-09-01 09:48:53 +08:00
self.update_privacy_mode(false);
}
back_notification::PrivacyModeState::PrvOffFailed => {
self.handler.msgbox(
"custom-error",
"Privacy mode",
if details.is_empty() {
"Failed to turn off"
} else {
&details
},
"",
);
2022-09-01 09:48:53 +08:00
}
back_notification::PrivacyModeState::PrvOffUnknown => {
self.handler
.msgbox("custom-error", "Privacy mode", "Turned off", "");
2022-09-01 09:48:53 +08:00
// log::error!("Privacy mode is turned off with unknown reason");
self.update_privacy_mode(false);
}
_ => {}
}
true
}
fn check_clipboard_file_context(&self) {
#[cfg(any(
target_os = "windows",
all(
feature = "unix-file-copy-paste",
any(target_os = "linux", target_os = "macos")
)
))]
2022-09-01 09:48:53 +08:00
{
let enabled = *self.handler.server_file_transfer_enabled.read().unwrap()
&& self.handler.lc.read().unwrap().enable_file_transfer.v;
ContextSend::enable(enabled);
}
}
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
fn handle_cliprdr_msg(&self, clip: hbb_common::message_proto::Cliprdr) {
log::debug!("handling cliprdr msg from server peer");
#[cfg(feature = "flutter")]
if let Some(hbb_common::message_proto::cliprdr::Union::FormatList(_)) = &clip.union {
if self.client_conn_id
!= clipboard::get_client_conn_id(&crate::flutter::get_cur_peer_id()).unwrap_or(0)
{
return;
}
}
let Some(clip) = crate::clipboard_file::msg_2_clip(clip) else {
log::warn!("failed to decode cliprdr msg from server peer");
return;
};
let is_stopping_allowed = clip.is_stopping_allowed_from_peer();
let file_transfer_enabled = self.handler.lc.read().unwrap().enable_file_transfer.v;
let stop = is_stopping_allowed && !file_transfer_enabled;
log::debug!(
"Process clipboard message from server peer, stop: {}, is_stopping_allowed: {}, file_transfer_enabled: {}",
stop, is_stopping_allowed, file_transfer_enabled);
if !stop {
if let Err(e) = ContextSend::make_sure_enabled() {
log::error!("failed to restart clipboard context: {}", e);
};
let _ = ContextSend::proc(|context| -> ResultType<()> {
context
.server_clip_file(self.client_conn_id, clip)
.map_err(|e| e.into())
});
2022-09-01 09:48:53 +08:00
}
}
}
struct RemoveJob {
files: Vec<FileEntry>,
path: String,
sep: &'static str,
is_remote: bool,
no_confirm: bool,
last_update_job_status: Instant,
}
impl RemoveJob {
fn new(files: Vec<FileEntry>, path: String, sep: &'static str, is_remote: bool) -> Self {
Self {
files,
path,
sep,
is_remote,
no_confirm: false,
last_update_job_status: Instant::now(),
}
}
pub fn _gen_meta(&self) -> RemoveJobMeta {
RemoveJobMeta {
path: self.path.clone(),
is_remote: self.is_remote,
no_confirm: self.no_confirm,
}
}
}
struct FpsControl {
last_queue_size: usize,
refresh_times: usize,
last_refresh_instant: Instant,
last_full_speed_fps: Option<i32>,
last_custom_fps: Option<i32>,
idle_counter: usize,
}
impl Default for FpsControl {
fn default() -> Self {
Self {
last_queue_size: Default::default(),
refresh_times: Default::default(),
last_refresh_instant: Instant::now(),
last_full_speed_fps: None,
last_custom_fps: None,
idle_counter: 0,
}
}
}