seperate video decoding thread for each display (#9968)
Some checks are pending
CI / ${{ matrix.job.target }} (${{ matrix.job.os }}) (map[os:ubuntu-20.04 target:x86_64-unknown-linux-gnu]) (push) Waiting to run
Full Flutter CI / run-ci (push) Waiting to run

* seperate video decoding thread for each display

1. Separate Video Decoding Thread for Each Display
2. Fix Decode Errors When Clearing the Queue
Previously, on-flight frames after clearing the queue could not be decoded successfully. This issue can be resolved by setting a discard_queue flag when sending a refresh message. The flag will be reset upon receiving a keyframe.

Signed-off-by: 21pages <sunboeasy@gmail.com>

* update video format along with fps to flutter

Signed-off-by: 21pages <sunboeasy@gmail.com>

* Fix keyframe interval when auto record outgoing sessions

Signed-off-by: 21pages <sunboeasy@gmail.com>

---------

Signed-off-by: 21pages <sunboeasy@gmail.com>
This commit is contained in:
21pages 2024-11-22 00:02:25 +08:00 committed by GitHub
parent 1c99eb5500
commit 64654ee7cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 253 additions and 299 deletions

View File

@ -908,6 +908,7 @@ impl AudioBuffer {
} }
self.2[i] += 1; self.2[i] += 1;
#[allow(non_upper_case_globals)]
static mut tms: i64 = 0; static mut tms: i64 = 0;
let dt = Local::now().timestamp_millis(); let dt = Local::now().timestamp_millis();
unsafe { unsafe {
@ -2274,74 +2275,60 @@ impl LoginConfigHandler {
/// Media data. /// Media data.
pub enum MediaData { pub enum MediaData {
VideoQueue(usize), VideoQueue,
VideoFrame(Box<VideoFrame>), VideoFrame(Box<VideoFrame>),
AudioFrame(Box<AudioFrame>), AudioFrame(Box<AudioFrame>),
AudioFormat(AudioFormat), AudioFormat(AudioFormat),
Reset(Option<usize>), Reset,
RecordScreen(bool), RecordScreen(bool),
} }
pub type MediaSender = mpsc::Sender<MediaData>; pub type MediaSender = mpsc::Sender<MediaData>;
struct VideoHandlerController { /// Start video thread.
handler: VideoHandler,
skip_beginning: u32,
}
/// Start video and audio thread.
/// Return two [`MediaSender`], they should be given to the media producer.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `video_callback` - The callback for video frame. Being called when a video frame is ready. /// * `video_callback` - The callback for video frame. Being called when a video frame is ready.
pub fn start_video_audio_threads<F, T>( pub fn start_video_thread<F, T>(
session: Session<T>, session: Session<T>,
display: usize,
video_receiver: mpsc::Receiver<MediaData>,
video_queue: Arc<RwLock<ArrayQueue<VideoFrame>>>,
fps: Arc<RwLock<Option<usize>>>,
chroma: Arc<RwLock<Option<Chroma>>>,
discard_queue: Arc<RwLock<bool>>,
video_callback: F, video_callback: F,
) -> ( ) where
MediaSender,
MediaSender,
Arc<RwLock<HashMap<usize, ArrayQueue<VideoFrame>>>>,
Arc<RwLock<Option<usize>>>,
Arc<RwLock<Option<Chroma>>>,
)
where
F: 'static + FnMut(usize, &mut scrap::ImageRgb, *mut c_void, bool) + Send, F: 'static + FnMut(usize, &mut scrap::ImageRgb, *mut c_void, bool) + Send,
T: InvokeUiSession, T: InvokeUiSession,
{ {
let (video_sender, video_receiver) = mpsc::channel::<MediaData>();
let video_queue_map: Arc<RwLock<HashMap<usize, ArrayQueue<VideoFrame>>>> = Default::default();
let video_queue_map_cloned = video_queue_map.clone();
let mut video_callback = video_callback; let mut video_callback = video_callback;
let fps = Arc::new(RwLock::new(None));
let decode_fps_map = fps.clone();
let chroma = Arc::new(RwLock::new(None));
let chroma_cloned = chroma.clone();
let mut last_chroma = None; let mut last_chroma = None;
std::thread::spawn(move || { std::thread::spawn(move || {
#[cfg(windows)] #[cfg(windows)]
sync_cpu_usage(); sync_cpu_usage();
get_hwcodec_config(); get_hwcodec_config();
let mut handler_controller_map = HashMap::new(); let mut video_handler = None;
let mut count = 0; let mut count = 0;
let mut duration = std::time::Duration::ZERO; let mut duration = std::time::Duration::ZERO;
let mut skip_beginning = 0;
loop { loop {
if let Ok(data) = video_receiver.recv() { if let Ok(data) = video_receiver.recv() {
match data { match data {
MediaData::VideoFrame(_) | MediaData::VideoQueue(_) => { MediaData::VideoFrame(_) | MediaData::VideoQueue => {
let vf = match data { let vf = match data {
MediaData::VideoFrame(vf) => *vf, MediaData::VideoFrame(vf) => {
MediaData::VideoQueue(display) => { *discard_queue.write().unwrap() = false;
if let Some(video_queue) = *vf
video_queue_map.read().unwrap().get(&display) }
{ MediaData::VideoQueue => {
if let Some(vf) = video_queue.pop() { if let Some(vf) = video_queue.read().unwrap().pop() {
vf if discard_queue.read().unwrap().clone() {
} else {
continue; continue;
} }
vf
} else { } else {
continue; continue;
} }
@ -2354,36 +2341,25 @@ where
let display = vf.display as usize; let display = vf.display as usize;
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let format = CodecFormat::from(&vf); let format = CodecFormat::from(&vf);
if !handler_controller_map.contains_key(&display) { if video_handler.is_none() {
let mut handler = VideoHandler::new(format, display); let mut handler = VideoHandler::new(format, display);
let record = session.lc.read().unwrap().record; let record = session.lc.read().unwrap().record;
let id = session.lc.read().unwrap().id.clone(); let id = session.lc.read().unwrap().id.clone();
if record { if record {
handler.record_screen(record, id, display); handler.record_screen(record, id, display);
} }
handler_controller_map.insert( video_handler = Some(handler);
display,
VideoHandlerController {
handler,
skip_beginning: 0,
},
);
} }
if let Some(handler_controller) = handler_controller_map.get_mut(&display) { if let Some(handler) = video_handler.as_mut() {
let mut pixelbuffer = true; let mut pixelbuffer = true;
let mut tmp_chroma = None; let mut tmp_chroma = None;
let format_changed = let format_changed = handler.decoder.format() != format;
handler_controller.handler.decoder.format() != format; match handler.handle_frame(vf, &mut pixelbuffer, &mut tmp_chroma) {
match handler_controller.handler.handle_frame(
vf,
&mut pixelbuffer,
&mut tmp_chroma,
) {
Ok(true) => { Ok(true) => {
video_callback( video_callback(
display, display,
&mut handler_controller.handler.rgb, &mut handler.rgb,
handler_controller.handler.texture.texture, handler.texture.texture,
pixelbuffer, pixelbuffer,
); );
@ -2395,7 +2371,7 @@ where
// fps calculation // fps calculation
fps_calculate( fps_calculate(
handler_controller, &mut skip_beginning,
&fps, &fps,
format_changed, format_changed,
start.elapsed(), start.elapsed(),
@ -2417,24 +2393,6 @@ where
// to-do: fix the error // to-do: fix the error
log::error!("handle video frame error, {}", e); log::error!("handle video frame error, {}", e);
session.refresh_video(display as _); session.refresh_video(display as _);
#[cfg(feature = "hwcodec")]
if format == CodecFormat::H265 {
if let Some(&scrap::hwcodec::ERR_HEVC_POC) =
e.downcast_ref::<i32>()
{
for (i, handler_controler) in
handler_controller_map.iter_mut()
{
if *i != display
&& handler_controler.handler.decoder.format()
== CodecFormat::H265
{
log::info!("refresh video {} due to hevc poc not found", i);
session.refresh_video(*i as _);
}
}
}
}
} }
_ => {} _ => {}
} }
@ -2442,53 +2400,36 @@ where
// check invalid decoders // check invalid decoders
let mut should_update_supported = false; let mut should_update_supported = false;
handler_controller_map if let Some(handler) = video_handler.as_mut() {
.iter() if !handler.decoder.valid()
.map(|(_, h)| { || handler.fail_counter >= MAX_DECODE_FAIL_COUNTER
if !h.handler.decoder.valid() || h.handler.fail_counter >= MAX_DECODE_FAIL_COUNTER { {
let mut lc = session.lc.write().unwrap(); let mut lc = session.lc.write().unwrap();
let format = h.handler.decoder.format(); let format = handler.decoder.format();
if !lc.mark_unsupported.contains(&format) { if !lc.mark_unsupported.contains(&format) {
lc.mark_unsupported.push(format); lc.mark_unsupported.push(format);
should_update_supported = true; should_update_supported = true;
log::info!("mark {format:?} decoder as unsupported, valid:{}, fail_counter:{}, all unsupported:{:?}", h.handler.decoder.valid(), h.handler.fail_counter, lc.mark_unsupported); log::info!("mark {format:?} decoder as unsupported, valid:{}, fail_counter:{}, all unsupported:{:?}", handler.decoder.valid(), handler.fail_counter, lc.mark_unsupported);
}
} }
}) }
.count(); }
if should_update_supported { if should_update_supported {
session.send(Data::Message( session.send(Data::Message(
session.lc.read().unwrap().update_supported_decodings(), session.lc.read().unwrap().update_supported_decodings(),
)); ));
} }
} }
MediaData::Reset(display) => { MediaData::Reset => {
if let Some(display) = display { if let Some(handler) = video_handler.as_mut() {
if let Some(handler_controler) = handler.reset(None);
handler_controller_map.get_mut(&display)
{
handler_controler.handler.reset(None);
}
} else {
for (_, handler_controler) in handler_controller_map.iter_mut() {
handler_controler.handler.reset(None);
}
} }
} }
MediaData::RecordScreen(start) => { MediaData::RecordScreen(start) => {
log::info!("record screen command: start: {start}"); log::info!("record screen command: start: {start}");
let record = session.lc.read().unwrap().record;
session.update_record_status(start); session.update_record_status(start);
if record != start { let id = session.lc.read().unwrap().id.clone();
session.lc.write().unwrap().record = start; if let Some(handler) = video_handler.as_mut() {
let id = session.lc.read().unwrap().id.clone(); handler.record_screen(start, id, display);
for (display, handler_controler) in handler_controller_map.iter_mut() {
handler_controler.handler.record_screen(
start,
id.clone(),
*display,
);
}
} }
} }
_ => {} _ => {}
@ -2499,14 +2440,6 @@ where
} }
log::info!("Video decoder loop exits"); log::info!("Video decoder loop exits");
}); });
let audio_sender = start_audio_thread();
return (
video_sender,
audio_sender,
video_queue_map_cloned,
decode_fps_map,
chroma_cloned,
);
} }
/// Start an audio thread /// Start an audio thread
@ -2538,7 +2471,7 @@ pub fn start_audio_thread() -> MediaSender {
#[inline] #[inline]
fn fps_calculate( fn fps_calculate(
handler_controller: &mut VideoHandlerController, skip_beginning: &mut usize,
fps: &Arc<RwLock<Option<usize>>>, fps: &Arc<RwLock<Option<usize>>>,
format_changed: bool, format_changed: bool,
elapsed: std::time::Duration, elapsed: std::time::Duration,
@ -2548,11 +2481,11 @@ fn fps_calculate(
if format_changed { if format_changed {
*count = 0; *count = 0;
*duration = std::time::Duration::ZERO; *duration = std::time::Duration::ZERO;
handler_controller.skip_beginning = 0; *skip_beginning = 0;
} }
// // The first frame will be very slow // // The first frame will be very slow
if handler_controller.skip_beginning < 3 { if *skip_beginning < 3 {
handler_controller.skip_beginning += 1; *skip_beginning += 1;
return; return;
} }
*duration += elapsed; *duration += elapsed;

View File

@ -1,5 +1,6 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
ffi::c_void,
num::NonZeroI64, num::NonZeroI64,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
@ -49,8 +50,6 @@ use scrap::CodecFormat;
pub struct Remote<T: InvokeUiSession> { pub struct Remote<T: InvokeUiSession> {
handler: Session<T>, handler: Session<T>,
video_queue_map: Arc<RwLock<HashMap<usize, ArrayQueue<VideoFrame>>>>,
video_sender: MediaSender,
audio_sender: MediaSender, audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>, receiver: mpsc::UnboundedReceiver<Data>,
sender: mpsc::UnboundedSender<Data>, sender: mpsc::UnboundedSender<Data>,
@ -67,13 +66,11 @@ pub struct Remote<T: InvokeUiSession> {
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))] #[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
client_conn_id: i32, // used for file clipboard client_conn_id: i32, // used for file clipboard
data_count: Arc<AtomicUsize>, data_count: Arc<AtomicUsize>,
frame_count_map: Arc<RwLock<HashMap<usize, usize>>>,
video_format: CodecFormat, video_format: CodecFormat,
elevation_requested: bool, elevation_requested: bool,
fps_control: FpsControl,
decode_fps: Arc<RwLock<Option<usize>>>,
chroma: Arc<RwLock<Option<Chroma>>>,
peer_info: ParsedPeerInfo, peer_info: ParsedPeerInfo,
video_threads: HashMap<usize, VideoThread>,
chroma: Arc<RwLock<Option<Chroma>>>,
} }
#[derive(Default)] #[derive(Default)]
@ -94,20 +91,12 @@ impl ParsedPeerInfo {
impl<T: InvokeUiSession> Remote<T> { impl<T: InvokeUiSession> Remote<T> {
pub fn new( pub fn new(
handler: Session<T>, handler: Session<T>,
video_queue: Arc<RwLock<HashMap<usize, ArrayQueue<VideoFrame>>>>,
video_sender: MediaSender,
audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>, receiver: mpsc::UnboundedReceiver<Data>,
sender: mpsc::UnboundedSender<Data>, sender: mpsc::UnboundedSender<Data>,
frame_count_map: Arc<RwLock<HashMap<usize, usize>>>,
decode_fps: Arc<RwLock<Option<usize>>>,
chroma: Arc<RwLock<Option<Chroma>>>,
) -> Self { ) -> Self {
Self { Self {
handler, handler,
video_queue_map: video_queue, audio_sender: crate::client::start_audio_thread(),
video_sender,
audio_sender,
receiver, receiver,
sender, sender,
read_jobs: Vec::new(), read_jobs: Vec::new(),
@ -120,15 +109,13 @@ impl<T: InvokeUiSession> Remote<T> {
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))] #[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
client_conn_id: 0, client_conn_id: 0,
data_count: Arc::new(AtomicUsize::new(0)), data_count: Arc::new(AtomicUsize::new(0)),
frame_count_map,
video_format: CodecFormat::Unknown, video_format: CodecFormat::Unknown,
stop_voice_call_sender: None, stop_voice_call_sender: None,
voice_call_request_timestamp: None, voice_call_request_timestamp: None,
elevation_requested: false, elevation_requested: false,
fps_control: Default::default(),
decode_fps,
chroma,
peer_info: Default::default(), peer_info: Default::default(),
video_threads: Default::default(),
chroma: Default::default(),
} }
} }
@ -250,7 +237,6 @@ impl<T: InvokeUiSession> Remote<T> {
} }
} }
_ = status_timer.tick() => { _ = status_timer.tick() => {
self.fps_control(direct);
let elapsed = fps_instant.elapsed().as_millis(); let elapsed = fps_instant.elapsed().as_millis();
if elapsed < 1000 { if elapsed < 1000 {
continue; continue;
@ -260,14 +246,14 @@ impl<T: InvokeUiSession> Remote<T> {
speed = speed * 1000 / elapsed as usize; speed = speed * 1000 / elapsed as usize;
let speed = format!("{:.2}kB/s", speed as f32 / 1024 as f32); let speed = format!("{:.2}kB/s", speed as f32 / 1024 as f32);
let mut frame_count_map_write = self.frame_count_map.write().unwrap(); let fps = self.video_threads.iter().map(|(k, v)| {
let frame_count_map = frame_count_map_write.clone();
frame_count_map_write.values_mut().for_each(|v| *v = 0);
drop(frame_count_map_write);
let fps = frame_count_map.iter().map(|(k, v)| {
// Correcting the inaccuracy of status_timer // Correcting the inaccuracy of status_timer
(k.clone(), (*v as i32) * 1000 / elapsed as i32) (k.clone(), (*v.frame_count.read().unwrap() as i32) * 1000 / elapsed as i32)
}).collect::<HashMap<usize, i32>>(); }).collect::<HashMap<usize, i32>>();
self.video_threads.iter().for_each(|(_, v)| {
*v.frame_count.write().unwrap() = 0;
});
self.fps_control(direct, fps.clone());
let chroma = self.chroma.read().unwrap().clone(); let chroma = self.chroma.read().unwrap().clone();
let chroma = match chroma { let chroma = match chroma {
Some(Chroma::I444) => "4:4:4", Some(Chroma::I444) => "4:4:4",
@ -275,10 +261,16 @@ impl<T: InvokeUiSession> Remote<T> {
None => "-", None => "-",
}; };
let chroma = Some(chroma.to_string()); let chroma = Some(chroma.to_string());
let codec_format = if self.video_format == CodecFormat::Unknown {
None
} else {
Some(self.video_format.clone())
};
self.handler.update_quality_status(QualityStatus { self.handler.update_quality_status(QualityStatus {
speed: Some(speed), speed: Some(speed),
fps, fps,
chroma, chroma,
codec_format,
..Default::default() ..Default::default()
}); });
} }
@ -498,6 +490,22 @@ impl<T: InvokeUiSession> Remote<T> {
self.check_clipboard_file_context(); self.check_clipboard_file_context();
} }
Data::Message(msg) => { Data::Message(msg) => {
match &msg.union {
Some(message::Union::Misc(misc)) => match misc.union {
Some(misc::Union::RefreshVideo(_)) => {
self.video_threads.iter().for_each(|(_, v)| {
*v.discard_queue.write().unwrap() = true;
});
}
Some(misc::Union::RefreshVideoDisplay(display)) => {
if let Some(v) = self.video_threads.get_mut(&(display as usize)) {
*v.discard_queue.write().unwrap() = true;
}
}
_ => {}
},
_ => {}
}
allow_err!(peer.send(&msg).await); allow_err!(peer.send(&msg).await);
} }
Data::SendFiles((id, path, to, file_num, include_hidden, is_remote)) => { Data::SendFiles((id, path, to, file_num, include_hidden, is_remote)) => {
@ -838,7 +846,10 @@ impl<T: InvokeUiSession> Remote<T> {
} }
} }
Data::RecordScreen(start) => { Data::RecordScreen(start) => {
let _ = self.video_sender.send(MediaData::RecordScreen(start)); self.handler.lc.write().unwrap().record = start;
for (_, v) in self.video_threads.iter_mut() {
v.video_sender.send(MediaData::RecordScreen(start)).ok();
}
} }
Data::ElevateDirect => { Data::ElevateDirect => {
let mut request = ElevationRequest::new(); let mut request = ElevationRequest::new();
@ -881,9 +892,18 @@ impl<T: InvokeUiSession> Remote<T> {
.on_voice_call_closed("Closed manually by the peer"); .on_voice_call_closed("Closed manually by the peer");
allow_err!(peer.send(&msg).await); allow_err!(peer.send(&msg).await);
} }
Data::ResetDecoder(display) => { Data::ResetDecoder(display) => match display {
self.video_sender.send(MediaData::Reset(display)).ok(); Some(display) => {
} if let Some(v) = self.video_threads.get_mut(&display) {
v.video_sender.send(MediaData::Reset).ok();
}
}
None => {
for (_, v) in self.video_threads.iter_mut() {
v.video_sender.send(MediaData::Reset).ok();
}
}
},
_ => {} _ => {}
} }
true true
@ -1011,100 +1031,115 @@ impl<T: InvokeUiSession> Remote<T> {
} }
} }
// Currently, this function only considers decoding speed and queue length, not network delay.
// The controlled end can consider auto fps as the maximum decoding fps.
#[inline] #[inline]
fn fps_control(&mut self, direct: bool) { fn fps_control(&mut self, direct: bool, real_fps_map: HashMap<usize, i32>) {
self.video_threads.iter_mut().for_each(|(k, v)| {
let real_fps = real_fps_map.get(k).cloned().unwrap_or_default();
if real_fps == 0 {
v.fps_control.inactive_counter += 1;
} else {
v.fps_control.inactive_counter = 0;
}
});
let custom_fps = self.handler.lc.read().unwrap().custom_fps.clone(); let custom_fps = self.handler.lc.read().unwrap().custom_fps.clone();
let custom_fps = custom_fps.lock().unwrap().clone(); let custom_fps = custom_fps.lock().unwrap().clone();
let mut custom_fps = custom_fps.unwrap_or(30); let mut custom_fps = custom_fps.unwrap_or(30);
if custom_fps < 5 || custom_fps > 120 { if custom_fps < 5 || custom_fps > 120 {
custom_fps = 30; custom_fps = 30;
} }
let ctl = &mut self.fps_control; let inactive_threshold = 15;
let len = self let max_queue_len = self
.video_queue_map .video_threads
.read()
.unwrap()
.iter() .iter()
.map(|v| v.1.len()) .map(|v| v.1.video_queue.read().unwrap().len())
.max() .max()
.unwrap_or_default(); .unwrap_or_default();
let decode_fps = self.decode_fps.read().unwrap().clone(); let min_decode_fps = self
let Some(mut decode_fps) = decode_fps else { .video_threads
.iter()
.filter(|v| v.1.fps_control.inactive_counter < inactive_threshold)
.map(|v| *v.1.decode_fps.read().unwrap())
.min()
.flatten();
let Some(min_decode_fps) = min_decode_fps else {
return; return;
}; };
if cfg!(feature = "flutter") {
let active_displays = ctl
.last_active_time
.iter()
.filter(|t| t.1.elapsed().as_secs() < 5)
.count();
if active_displays > 1 {
decode_fps = decode_fps / active_displays;
}
}
let mut limited_fps = if direct { let mut limited_fps = if direct {
decode_fps * 9 / 10 // 30 got 27 min_decode_fps * 9 / 10 // 30 got 27
} else { } else {
decode_fps * 4 / 5 // 30 got 24 min_decode_fps * 4 / 5 // 30 got 24
}; };
if limited_fps > custom_fps { if limited_fps > custom_fps {
limited_fps = custom_fps; limited_fps = custom_fps;
} }
let last_auto_fps = self.handler.lc.read().unwrap().last_auto_fps.clone(); let last_auto_fps = self.handler.lc.read().unwrap().last_auto_fps.clone();
let should_decrease = (len > 1 let displays = self.video_threads.keys().cloned().collect::<Vec<_>>();
&& last_auto_fps.clone().unwrap_or(custom_fps as _) > limited_fps) let mut fps_trending = |display: usize| {
|| len > std::cmp::max(1, limited_fps / 2); let thread = self.video_threads.get_mut(&display)?;
let ctl = &mut thread.fps_control;
// increase judgement let len = thread.video_queue.read().unwrap().len();
if len <= 1 { let decode_fps = thread.decode_fps.read().unwrap().clone()?;
if ctl.idle_counter < usize::MAX { let last_auto_fps = last_auto_fps.clone().unwrap_or(custom_fps as _);
if ctl.inactive_counter > inactive_threshold {
return None;
}
if len > 1 && last_auto_fps > limited_fps || len > std::cmp::max(1, decode_fps / 2) {
ctl.idle_counter = 0;
return Some(false);
}
if len <= 1 {
ctl.idle_counter += 1; ctl.idle_counter += 1;
if ctl.idle_counter > 3 && last_auto_fps + 3 <= limited_fps {
return Some(true);
}
} }
} else { if len > 1 {
ctl.idle_counter = 0; ctl.idle_counter = 0;
}
let mut should_increase = false;
if let Some(last_auto_fps) = last_auto_fps.clone() {
// ever set
if last_auto_fps + 3 <= limited_fps && ctl.idle_counter > 3 {
// limited_fps is 3 larger than last set, and idle time is more than 3 seconds
should_increase = true;
} }
} None
};
let trendings: Vec<_> = displays.iter().map(|k| fps_trending(*k)).collect();
let should_decrease = trendings.iter().any(|v| *v == Some(false));
let should_increase = !should_decrease && trendings.iter().any(|v| *v == Some(true));
if last_auto_fps.is_none() || should_decrease || should_increase { if last_auto_fps.is_none() || should_decrease || should_increase {
// limited_fps to ensure decoding is faster than encoding // limited_fps to ensure decoding is faster than encoding
let mut auto_fps = limited_fps; let mut auto_fps = limited_fps;
if should_decrease && limited_fps < len { if should_decrease && limited_fps < max_queue_len {
auto_fps = limited_fps / 2; auto_fps = limited_fps / 2;
} }
if auto_fps < 1 { if auto_fps < 1 {
auto_fps = 1; auto_fps = 1;
} }
let mut misc = Misc::new(); if Some(auto_fps) != last_auto_fps {
misc.set_option(OptionMessage { let mut misc = Misc::new();
custom_fps: auto_fps as _, misc.set_option(OptionMessage {
..Default::default() custom_fps: auto_fps as _,
}); ..Default::default()
let mut msg = Message::new(); });
msg.set_misc(misc); let mut msg = Message::new();
self.sender.send(Data::Message(msg)).ok(); msg.set_misc(misc);
log::info!("Set fps to {}", auto_fps); self.sender.send(Data::Message(msg)).ok();
ctl.last_queue_size = len; log::info!("Set fps to {}", auto_fps);
self.handler.lc.write().unwrap().last_auto_fps = Some(auto_fps); self.handler.lc.write().unwrap().last_auto_fps = Some(auto_fps);
}
} }
// send refresh // send refresh
for (display, video_queue) in self.video_queue_map.read().unwrap().iter() { for (display, thread) in self.video_threads.iter_mut() {
let tolerable = std::cmp::min(decode_fps, video_queue.capacity() / 2); let ctl = &mut thread.fps_control;
let video_queue = thread.video_queue.read().unwrap();
let tolerable = std::cmp::min(min_decode_fps, video_queue.capacity() / 2);
if ctl.refresh_times < 20 // enough if ctl.refresh_times < 20 // enough
&& (video_queue.len() > tolerable && (video_queue.len() > tolerable
&& (ctl.refresh_times == 0 || ctl.last_refresh_instant.elapsed().as_secs() > 10)) && (ctl.refresh_times == 0 || ctl.last_refresh_instant.map(|t|t.elapsed().as_secs() > 10).unwrap_or(false)))
{ {
// Refresh causes client set_display, left frames cause flickering. // Refresh causes client set_display, left frames cause flickering.
while let Some(_) = video_queue.pop() {} drop(video_queue);
self.handler.refresh_video(*display as _); self.handler.refresh_video(*display as _);
log::info!("Refresh display {} to reduce delay", display); log::info!("Refresh display {} to reduce delay", display);
ctl.refresh_times += 1; ctl.refresh_times += 1;
ctl.last_refresh_instant = Instant::now(); ctl.last_refresh_instant = Some(Instant::now());
} }
} }
} }
@ -1120,43 +1155,29 @@ impl<T: InvokeUiSession> Remote<T> {
self.send_toggle_virtual_display_msg(peer).await; self.send_toggle_virtual_display_msg(peer).await;
self.send_toggle_privacy_mode_msg(peer).await; self.send_toggle_privacy_mode_msg(peer).await;
} }
let incoming_format = CodecFormat::from(&vf); self.video_format = CodecFormat::from(&vf);
if self.video_format != incoming_format {
self.video_format = incoming_format.clone();
self.handler.update_quality_status(QualityStatus {
codec_format: Some(incoming_format),
..Default::default()
})
};
let display = vf.display as usize; let display = vf.display as usize;
let mut video_queue_write = self.video_queue_map.write().unwrap(); if !self.video_threads.contains_key(&display) {
if !video_queue_write.contains_key(&display) { self.new_video_thread(display);
video_queue_write.insert(
display,
ArrayQueue::<VideoFrame>::new(crate::client::VIDEO_QUEUE_SIZE),
);
} }
let Some(thread) = self.video_threads.get_mut(&display) else {
return true;
};
if Self::contains_key_frame(&vf) { if Self::contains_key_frame(&vf) {
if let Some(video_queue) = video_queue_write.get_mut(&display) { thread
while let Some(_) = video_queue.pop() {} .video_sender
}
self.video_sender
.send(MediaData::VideoFrame(Box::new(vf))) .send(MediaData::VideoFrame(Box::new(vf)))
.ok(); .ok();
} else { } else {
if let Some(video_queue) = video_queue_write.get_mut(&display) { let video_queue = thread.video_queue.read().unwrap();
if video_queue.force_push(vf).is_some() { if video_queue.force_push(vf).is_some() {
while let Some(_) = video_queue.pop() {} drop(video_queue);
self.handler.refresh_video(display as _); self.handler.refresh_video(display as _);
} else { } else {
self.video_sender.send(MediaData::VideoQueue(display)).ok(); thread.video_sender.send(MediaData::VideoQueue).ok();
}
} }
} }
self.fps_control
.last_active_time
.insert(display, Instant::now());
} }
Some(message::Union::Hash(hash)) => { Some(message::Union::Hash(hash)) => {
self.handler self.handler
@ -1470,9 +1491,10 @@ impl<T: InvokeUiSession> Remote<T> {
} }
Some(misc::Union::SwitchDisplay(s)) => { Some(misc::Union::SwitchDisplay(s)) => {
self.handler.handle_peer_switch_display(&s); self.handler.handle_peer_switch_display(&s);
self.video_sender if let Some(thread) = self.video_threads.get_mut(&(s.display as usize)) {
.send(MediaData::Reset(Some(s.display as _))) thread.video_sender.send(MediaData::Reset).ok();
.ok(); }
if s.width > 0 && s.height > 0 { if s.width > 0 && s.height > 0 {
self.handler.set_display( self.handler.set_display(
s.x, s.x,
@ -1920,6 +1942,53 @@ impl<T: InvokeUiSession> Remote<T> {
}); });
} }
} }
fn new_video_thread(&mut self, display: usize) {
let video_queue = Arc::new(RwLock::new(ArrayQueue::new(client::VIDEO_QUEUE_SIZE)));
let (video_sender, video_receiver) = std::sync::mpsc::channel::<MediaData>();
let decode_fps = Arc::new(RwLock::new(None));
let frame_count = Arc::new(RwLock::new(0));
let discard_queue = Arc::new(RwLock::new(false));
let video_thread = VideoThread {
video_queue: video_queue.clone(),
video_sender,
decode_fps: decode_fps.clone(),
frame_count: frame_count.clone(),
fps_control: Default::default(),
discard_queue: discard_queue.clone(),
};
let handler = self.handler.ui_handler.clone();
crate::client::start_video_thread(
self.handler.clone(),
display,
video_receiver,
video_queue,
decode_fps,
self.chroma.clone(),
discard_queue,
move |display: usize,
data: &mut scrap::ImageRgb,
_texture: *mut c_void,
pixelbuffer: bool| {
*frame_count.write().unwrap() += 1;
if pixelbuffer {
handler.on_rgba(display, data);
} else {
#[cfg(all(feature = "vram", feature = "flutter"))]
handler.on_texture(display, _texture);
}
},
);
self.video_threads.insert(display, video_thread);
let auto_record = self.handler.lc.read().unwrap().record;
if auto_record && self.video_threads.len() == 1 {
let mut misc = Misc::new();
misc.set_client_record_status(true);
let mut msg = Message::new();
msg.set_misc(misc);
self.sender.send(Data::Message(msg)).ok();
}
}
} }
struct RemoveJob { struct RemoveJob {
@ -1952,22 +2021,19 @@ impl RemoveJob {
} }
} }
#[derive(Debug, Default)]
struct FpsControl { struct FpsControl {
last_queue_size: usize,
refresh_times: usize, refresh_times: usize,
last_refresh_instant: Instant, last_refresh_instant: Option<Instant>,
idle_counter: usize, idle_counter: usize,
last_active_time: HashMap<usize, Instant>, inactive_counter: usize,
} }
impl Default for FpsControl { struct VideoThread {
fn default() -> Self { video_queue: Arc<RwLock<ArrayQueue<VideoFrame>>>,
Self { video_sender: MediaSender,
last_queue_size: Default::default(), decode_fps: Arc<RwLock<Option<usize>>>,
refresh_times: Default::default(), frame_count: Arc<RwLock<usize>>,
last_refresh_instant: Instant::now(), discard_queue: Arc<RwLock<bool>>,
idle_counter: 0, fps_control: FpsControl,
last_active_time: Default::default(),
}
}
} }

View File

@ -728,13 +728,7 @@ fn setup_encoder(
); );
Encoder::set_fallback(&encoder_cfg); Encoder::set_fallback(&encoder_cfg);
let codec_format = Encoder::negotiated_codec(); let codec_format = Encoder::negotiated_codec();
let recorder = get_recorder( let recorder = get_recorder(record_incoming, display_idx);
c.width,
c.height,
&codec_format,
record_incoming,
display_idx,
);
let use_i444 = Encoder::use_i444(&encoder_cfg); let use_i444 = Encoder::use_i444(&encoder_cfg);
let encoder = Encoder::new(encoder_cfg.clone(), use_i444)?; let encoder = Encoder::new(encoder_cfg.clone(), use_i444)?;
Ok((encoder, encoder_cfg, codec_format, use_i444, recorder)) Ok((encoder, encoder_cfg, codec_format, use_i444, recorder))
@ -816,13 +810,7 @@ fn get_encoder_config(
} }
} }
fn get_recorder( fn get_recorder(record_incoming: bool, display: usize) -> Arc<Mutex<Option<Recorder>>> {
width: usize,
height: usize,
codec_format: &CodecFormat,
record_incoming: bool,
display: usize,
) -> Arc<Mutex<Option<Recorder>>> {
#[cfg(windows)] #[cfg(windows)]
let root = crate::platform::is_root(); let root = crate::platform::is_root();
#[cfg(not(windows))] #[cfg(not(windows))]

View File

@ -35,8 +35,8 @@ use hbb_common::{
use crate::client::io_loop::Remote; use crate::client::io_loop::Remote;
use crate::client::{ use crate::client::{
check_if_retry, handle_hash, handle_login_error, handle_login_from_ui, handle_test_delay, check_if_retry, handle_hash, handle_login_error, handle_login_from_ui, handle_test_delay,
input_os_password, send_mouse, send_pointer_device_event, start_video_audio_threads, input_os_password, send_mouse, send_pointer_device_event, FileManager, Key, LoginConfigHandler,
FileManager, Key, LoginConfigHandler, QualityStatus, KEY_MAP, QualityStatus, KEY_MAP,
}; };
#[cfg(not(any(target_os = "android", target_os = "ios")))] #[cfg(not(any(target_os = "android", target_os = "ios")))]
use crate::common::GrabState; use crate::common::GrabState;
@ -1848,40 +1848,7 @@ pub async fn io_loop<T: InvokeUiSession>(handler: Session<T>, round: u32) {
} }
return; return;
} }
let frame_count_map: Arc<RwLock<HashMap<usize, usize>>> = Default::default(); let mut remote = Remote::new(handler, receiver, sender);
let frame_count_map_cl = frame_count_map.clone();
let ui_handler = handler.ui_handler.clone();
let (video_sender, audio_sender, video_queue_map, decode_fps, chroma) =
start_video_audio_threads(
handler.clone(),
move |display: usize,
data: &mut scrap::ImageRgb,
_texture: *mut c_void,
pixelbuffer: bool| {
let mut write_lock = frame_count_map_cl.write().unwrap();
let count = write_lock.get(&display).unwrap_or(&0) + 1;
write_lock.insert(display, count);
drop(write_lock);
if pixelbuffer {
ui_handler.on_rgba(display, data);
} else {
#[cfg(all(feature = "vram", feature = "flutter"))]
ui_handler.on_texture(display, _texture);
}
},
);
let mut remote = Remote::new(
handler,
video_queue_map,
video_sender,
audio_sender,
receiver,
sender,
frame_count_map,
decode_fps,
chroma,
);
remote.io_loop(&key, &token, round).await; remote.io_loop(&key, &token, round).await;
remote.sync_jobs_status_to_local().await; remote.sync_jobs_status_to_local().await;
} }