From e53119a01a0a4be043e56f9970db834a3837d45b Mon Sep 17 00:00:00 2001 From: csf Date: Sat, 30 Jul 2022 21:12:08 +0800 Subject: [PATCH] add mobile quality monitor --- flutter/lib/models/model.dart | 41 +++++ flutter/lib/pages/remote_page.dart | 45 +++++ src/client/helper.rs | 14 +- src/mobile.rs | 279 ++++++++++++++++++----------- src/ui/remote.rs | 9 - 5 files changed, 272 insertions(+), 116 deletions(-) diff --git a/flutter/lib/models/model.dart b/flutter/lib/models/model.dart index 2f6007400..7255c6baa 100644 --- a/flutter/lib/models/model.dart +++ b/flutter/lib/models/model.dart @@ -162,6 +162,8 @@ class FfiModel with ChangeNotifier { FFI.serverModel.onClientAuthorized(evt); } else if (name == 'on_client_remove') { FFI.serverModel.onClientRemove(evt); + } else if (name == 'update_quality_status') { + FFI.qualityMonitorModel.updateQualityStatus(evt); } }; PlatformFFI.setEventCallback(cb); @@ -655,6 +657,44 @@ class CursorModel with ChangeNotifier { } } +class QualityMonitorData { + String? speed; + String? fps; + String? delay; + String? targetBitrate; + String? codecFormat; +} + +class QualityMonitorModel with ChangeNotifier { + var _show = FFI.getByName('toggle_option', 'show-quality-monitor') == 'true'; + final _data = QualityMonitorData(); + + bool get show => _show; + QualityMonitorData get data => _data; + + checkShowQualityMonitor() { + final show = + FFI.getByName('toggle_option', 'show-quality-monitor') == 'true'; + if (_show != show) { + _show = show; + notifyListeners(); + } + } + + updateQualityStatus(Map evt) { + try { + if ((evt["speed"] as String).isNotEmpty) _data.speed = evt["speed"]; + if ((evt["fps"] as String).isNotEmpty) _data.fps = evt["fps"]; + if ((evt["delay"] as String).isNotEmpty) _data.delay = evt["delay"]; + if ((evt["target_bitrate"] as String).isNotEmpty) + _data.targetBitrate = evt["target_bitrate"]; + if ((evt["codec_format"] as String).isNotEmpty) + _data.codecFormat = evt["codec_format"]; + notifyListeners(); + } catch (e) {} + } +} + enum MouseButtons { left, right, wheel } extension ToString on MouseButtons { @@ -684,6 +724,7 @@ class FFI { static final serverModel = ServerModel(); static final chatModel = ChatModel(); static final fileModel = FileModel(); + static final qualityMonitorModel = QualityMonitorModel(); static String getId() { return getByName('remote_id'); diff --git a/flutter/lib/pages/remote_page.dart b/flutter/lib/pages/remote_page.dart index 265222837..bb196f0cf 100644 --- a/flutter/lib/pages/remote_page.dart +++ b/flutter/lib/pages/remote_page.dart @@ -592,6 +592,7 @@ class _RemotePageState extends State { child: Stack(children: [ ImagePaint(), CursorPaint(), + QualityMonitor(), getHelpTools(), SizedBox( width: 0, @@ -948,6 +949,47 @@ class ImagePainter extends CustomPainter { } } +class QualityMonitor extends StatelessWidget { + @override + Widget build(BuildContext context) => ChangeNotifierProvider.value( + value: FFI.qualityMonitorModel, + child: Consumer( + builder: (context, qualityMonitorModel, child) => Positioned( + top: 10, + right: 10, + child: qualityMonitorModel.show + ? Container( + padding: EdgeInsets.all(8), + color: MyTheme.canvasColor.withAlpha(120), + child: Column( + crossAxisAlignment: CrossAxisAlignment.start, + children: [ + Text( + "Speed: ${qualityMonitorModel.data.speed}", + style: TextStyle(color: MyTheme.grayBg), + ), + Text( + "FPS: ${qualityMonitorModel.data.fps}", + style: TextStyle(color: MyTheme.grayBg), + ), + Text( + "Delay: ${qualityMonitorModel.data.delay} ms", + style: TextStyle(color: MyTheme.grayBg), + ), + Text( + "Target Bitrate: ${qualityMonitorModel.data.targetBitrate}kb", + style: TextStyle(color: MyTheme.grayBg), + ), + Text( + "Codec: ${qualityMonitorModel.data.codecFormat}", + style: TextStyle(color: MyTheme.grayBg), + ), + ], + ), + ) + : SizedBox.shrink()))); +} + CheckboxListTile getToggle( void Function(void Function()) setState, option, name) { return CheckboxListTile( @@ -956,6 +998,9 @@ CheckboxListTile getToggle( setState(() { FFI.setByName('toggle_option', option); }); + if (option == "show-quality-monitor") { + FFI.qualityMonitorModel.checkShowQualityMonitor(); + } }, dense: true, title: Text(translate(name))); diff --git a/src/client/helper.rs b/src/client/helper.rs index 26dc37ba4..5274a7c55 100644 --- a/src/client/helper.rs +++ b/src/client/helper.rs @@ -3,7 +3,10 @@ use std::{ time::Instant, }; -use hbb_common::{log, message_proto::{VideoFrame, video_frame}}; +use hbb_common::{ + log, + message_proto::{video_frame, VideoFrame}, +}; const MAX_LATENCY: i64 = 500; const MIN_LATENCY: i64 = 100; @@ -87,3 +90,12 @@ impl ToString for CodecFormat { } } } + +#[derive(Debug, Default)] +pub struct QualityStatus { + pub speed: Option, + pub fps: Option, + pub delay: Option, + pub target_bitrate: Option, + pub codec_format: Option, +} diff --git a/src/mobile.rs b/src/mobile.rs index b21618d46..a8777cf39 100644 --- a/src/mobile.rs +++ b/src/mobile.rs @@ -1,12 +1,16 @@ use crate::client::*; -use crate::common::{make_fd_to_json}; +use crate::common::make_fd_to_json; use flutter_rust_bridge::{StreamSink, ZeroCopyBuffer}; use hbb_common::{ allow_err, compress::decompress, config::{Config, LocalConfig}, - fs, log, - fs::{can_enable_overwrite_detection, new_send_confirm, DigestCheckResult, get_string, transform_windows_path}, + fs, + fs::{ + can_enable_overwrite_detection, get_string, new_send_confirm, transform_windows_path, + DigestCheckResult, + }, + log, message_proto::*, protobuf::Message as _, rendezvous_proto::ConnType, @@ -17,6 +21,7 @@ use hbb_common::{ }, Stream, }; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::{ collections::{HashMap, VecDeque}, sync::{Arc, Mutex, RwLock}, @@ -397,6 +402,26 @@ impl Session { log::debug!("{:?}", msg_out); self.send_msg(msg_out); } + + fn update_quality_status(&self, status: QualityStatus) { + const NULL: String = String::new(); + self.push_event( + "update_quality_status", + vec![ + ("speed", &status.speed.map_or(NULL, |it| it)), + ("fps", &status.fps.map_or(NULL, |it| it.to_string())), + ("delay", &status.delay.map_or(NULL, |it| it.to_string())), + ( + "target_bitrate", + &status.target_bitrate.map_or(NULL, |it| it.to_string()), + ), + ( + "codec_format", + &status.codec_format.map_or(NULL, |it| it.to_string()), + ), + ], + ); + } } impl FileManager for Session {} @@ -438,7 +463,11 @@ impl Interface for Session { if lc.is_file_transfer { if pi.username.is_empty() { - self.msgbox("error", "Error", "No active console user logged on, please connect and logon first."); + self.msgbox( + "error", + "Error", + "No active console user logged on, please connect and logon first.", + ); return; } } else { @@ -487,7 +516,14 @@ impl Interface for Session { } async fn handle_test_delay(&mut self, t: TestDelay, peer: &mut Stream) { - handle_test_delay(t, peer).await; + if !t.from_client { + self.update_quality_status(QualityStatus { + delay: Some(t.last_delay as _), + target_bitrate: Some(t.target_bitrate as _), + ..Default::default() + }); + handle_test_delay(t, peer).await; + } } } @@ -502,6 +538,9 @@ struct Connection { write_jobs: Vec, timer: Interval, last_update_jobs_status: (Instant, HashMap), + data_count: Arc, + frame_count: Arc, + video_format: CodecFormat, } impl Connection { @@ -528,6 +567,9 @@ impl Connection { write_jobs: Vec::new(), timer: time::interval(SEC30), last_update_jobs_status: (Instant::now(), Default::default()), + data_count: Arc::new(AtomicUsize::new(0)), + frame_count: Arc::new(AtomicUsize::new(0)), + video_format: CodecFormat::Unknown, }; let key = Config::get_option("key"); let token = Config::get_option("access_token"); @@ -541,6 +583,9 @@ impl Connection { ("direct", &direct.to_string()), ], ); + + let mut status_timer = time::interval(Duration::new(1, 0)); + loop { tokio::select! { res = peer.next() => { @@ -553,6 +598,7 @@ impl Connection { } Ok(ref bytes) => { last_recv_time = Instant::now(); + conn.data_count.fetch_add(bytes.len(), Ordering::Relaxed); if !conn.handle_msg_from_peer(bytes, &mut peer).await { break } @@ -586,6 +632,16 @@ impl Connection { conn.timer = time::interval_at(Instant::now() + SEC30, SEC30); } } + _ = status_timer.tick() => { + let speed = conn.data_count.swap(0, Ordering::Relaxed); + let speed = format!("{:.2}kB/s", speed as f32 / 1024 as f32); + let fps = conn.frame_count.swap(0, Ordering::Relaxed) as _; + conn.session.update_quality_status(QualityStatus { + speed:Some(speed), + fps:Some(fps), + ..Default::default() + }); + } } } log::debug!("Exit io_loop of id={}", session.id); @@ -603,10 +659,19 @@ impl Connection { if !self.first_frame { self.first_frame = true; } + let incomming_format = CodecFormat::from(&vf); + if self.video_format != incomming_format { + self.video_format = incomming_format.clone(); + self.session.update_quality_status(QualityStatus { + codec_format: Some(incomming_format), + ..Default::default() + }) + }; if let (Ok(true), Some(s)) = ( self.video_handler.handle_frame(vf), RGBA_STREAM.read().unwrap().as_ref(), ) { + self.frame_count.fetch_add(1, Ordering::Relaxed); s.add(ZeroCopyBuffer(self.video_handler.rgb.clone())); } } @@ -664,113 +729,114 @@ impl Connection { vec![("x", &cp.x.to_string()), ("y", &cp.y.to_string())], ); } - Some(message::Union::FileResponse(fr)) => match fr.union { - Some(file_response::Union::Dir(fd)) => { - let mut entries = fd.entries.to_vec(); - if self.session.peer_platform() == "Windows" { - fs::transform_windows_path(&mut entries); - } - let id = fd.id; - self.session.push_event( - "file_dir", - vec![("value", &make_fd_to_json(fd)), ("is_local", "false")], - ); - if let Some(job) = fs::get_job(id, &mut self.write_jobs) { - job.set_files(entries); - } - } - Some(file_response::Union::Block(block)) => { - if let Some(job) = fs::get_job(block.id, &mut self.write_jobs) { - if let Err(_err) = job.write(block, None).await { - // to-do: add "skip" for writing job + Some(message::Union::FileResponse(fr)) => { + match fr.union { + Some(file_response::Union::Dir(fd)) => { + let mut entries = fd.entries.to_vec(); + if self.session.peer_platform() == "Windows" { + fs::transform_windows_path(&mut entries); + } + let id = fd.id; + self.session.push_event( + "file_dir", + vec![("value", &make_fd_to_json(fd)), ("is_local", "false")], + ); + if let Some(job) = fs::get_job(id, &mut self.write_jobs) { + job.set_files(entries); } - self.update_jobs_status(); } - } - Some(file_response::Union::Done(d)) => { - if let Some(job) = fs::get_job(d.id, &mut self.write_jobs) { - job.modify_time(); - fs::remove_job(d.id, &mut self.write_jobs); + Some(file_response::Union::Block(block)) => { + if let Some(job) = fs::get_job(block.id, &mut self.write_jobs) { + if let Err(_err) = job.write(block, None).await { + // to-do: add "skip" for writing job + } + self.update_jobs_status(); + } } - self.handle_job_status(d.id, d.file_num, None); - } - Some(file_response::Union::Error(e)) => { - self.handle_job_status(e.id, e.file_num, Some(e.error)); - } - Some(file_response::Union::Digest(digest)) => { - if digest.is_upload { - if let Some(job) = fs::get_job(digest.id, &mut self.read_jobs) { - if let Some(file) = job.files().get(digest.file_num as usize) { - let read_path = get_string(&job.join(&file.name)); - let overwrite_strategy = job.default_overwrite_strategy(); - if let Some(overwrite) = overwrite_strategy { - let req = FileTransferSendConfirmRequest { - id: digest.id, - file_num: digest.file_num, - union: Some(if overwrite { - file_transfer_send_confirm_request::Union::OffsetBlk(0) - } else { - file_transfer_send_confirm_request::Union::Skip( - true, - ) - }), - ..Default::default() - }; - job.confirm(&req); - let msg = new_send_confirm(req); - allow_err!(peer.send(&msg).await); - } else { - self.handle_override_file_confirm( - digest.id, - digest.file_num, - read_path, - true, - ); + Some(file_response::Union::Done(d)) => { + if let Some(job) = fs::get_job(d.id, &mut self.write_jobs) { + job.modify_time(); + fs::remove_job(d.id, &mut self.write_jobs); + } + self.handle_job_status(d.id, d.file_num, None); + } + Some(file_response::Union::Error(e)) => { + self.handle_job_status(e.id, e.file_num, Some(e.error)); + } + Some(file_response::Union::Digest(digest)) => { + if digest.is_upload { + if let Some(job) = fs::get_job(digest.id, &mut self.read_jobs) { + if let Some(file) = job.files().get(digest.file_num as usize) { + let read_path = get_string(&job.join(&file.name)); + let overwrite_strategy = job.default_overwrite_strategy(); + if let Some(overwrite) = overwrite_strategy { + let req = FileTransferSendConfirmRequest { + id: digest.id, + file_num: digest.file_num, + union: Some(if overwrite { + file_transfer_send_confirm_request::Union::OffsetBlk(0) + } else { + file_transfer_send_confirm_request::Union::Skip( + true, + ) + }), + ..Default::default() + }; + job.confirm(&req); + let msg = new_send_confirm(req); + allow_err!(peer.send(&msg).await); + } else { + self.handle_override_file_confirm( + digest.id, + digest.file_num, + read_path, + true, + ); + } } } - } - } else { - if let Some(job) = fs::get_job(digest.id, &mut self.write_jobs) { - if let Some(file) = job.files().get(digest.file_num as usize) { - let write_path = get_string(&job.join(&file.name)); - let overwrite_strategy = job.default_overwrite_strategy(); - match fs::is_write_need_confirmation(&write_path, &digest) { - Ok(res) => match res { - DigestCheckResult::IsSame => { - let msg= new_send_confirm(FileTransferSendConfirmRequest { + } else { + if let Some(job) = fs::get_job(digest.id, &mut self.write_jobs) { + if let Some(file) = job.files().get(digest.file_num as usize) { + let write_path = get_string(&job.join(&file.name)); + let overwrite_strategy = job.default_overwrite_strategy(); + match fs::is_write_need_confirmation(&write_path, &digest) { + Ok(res) => match res { + DigestCheckResult::IsSame => { + let msg= new_send_confirm(FileTransferSendConfirmRequest { id: digest.id, file_num: digest.file_num, union: Some(file_transfer_send_confirm_request::Union::Skip(true)), ..Default::default() }); - self.session.send_msg(msg); - } - DigestCheckResult::NeedConfirm(digest) => { - if let Some(overwrite) = overwrite_strategy { - let msg = new_send_confirm( - FileTransferSendConfirmRequest { - id: digest.id, - file_num: digest.file_num, - union: Some(if overwrite { - file_transfer_send_confirm_request::Union::OffsetBlk(0) - } else { - file_transfer_send_confirm_request::Union::Skip(true) - }), - ..Default::default() - }, - ); self.session.send_msg(msg); - } else { - self.handle_override_file_confirm( - digest.id, - digest.file_num, - write_path.to_string(), - false, - ); } - } - DigestCheckResult::NoSuchFile => { - let msg = new_send_confirm( + DigestCheckResult::NeedConfirm(digest) => { + if let Some(overwrite) = overwrite_strategy { + let msg = new_send_confirm( + FileTransferSendConfirmRequest { + id: digest.id, + file_num: digest.file_num, + union: Some(if overwrite { + file_transfer_send_confirm_request::Union::OffsetBlk(0) + } else { + file_transfer_send_confirm_request::Union::Skip(true) + }), + ..Default::default() + }, + ); + self.session.send_msg(msg); + } else { + self.handle_override_file_confirm( + digest.id, + digest.file_num, + write_path.to_string(), + false, + ); + } + } + DigestCheckResult::NoSuchFile => { + let msg = new_send_confirm( FileTransferSendConfirmRequest { id: digest.id, file_num: digest.file_num, @@ -778,19 +844,20 @@ impl Connection { ..Default::default() }, ); - self.session.send_msg(msg); + self.session.send_msg(msg); + } + }, + Err(err) => { + println!("error recving digest: {}", err); } - }, - Err(err) => { - println!("error recving digest: {}", err); } } } } } + _ => {} } - _ => {} - }, + } Some(message::Union::Misc(misc)) => match misc.union { Some(misc::Union::AudioFormat(f)) => { self.audio_handler.handle_format(f); // diff --git a/src/ui/remote.rs b/src/ui/remote.rs index ad6b3cbd5..f1b2df46b 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -239,15 +239,6 @@ impl sciter::EventHandler for Handler { } } -#[derive(Debug, Default)] -struct QualityStatus { - speed: Option, - fps: Option, - delay: Option, - target_bitrate: Option, - codec_format: Option, -} - impl Handler { pub fn new(cmd: String, id: String, password: String, args: Vec) -> Self { let me = Self {