diff --git a/src/server/connection.rs b/src/server/connection.rs index e8eae4fe1..5ae0b7aca 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -26,6 +26,7 @@ lazy_static::lazy_static! { pub struct ConnInner { id: i32, tx: Option, + tx_video: Option, } pub struct Connection { @@ -65,6 +66,12 @@ impl Subscriber for ConnInner { allow_err!(tx.send((Instant::now(), msg))); }); } + + fn send_video_frame(&mut self, tm: std::time::Instant, msg: Arc) { + self.tx_video.as_mut().map(|tx| { + allow_err!(tx.send((tm.into(), msg))); + }); + } } const TEST_DELAY_TIMEOUT: Duration = Duration::from_secs(3); @@ -87,8 +94,13 @@ impl Connection { let (tx_from_cm, mut rx_from_cm) = mpsc::unbounded_channel::(); let (tx_to_cm, rx_to_cm) = mpsc::unbounded_channel::(); let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc)>(); + let (tx_video, mut rx_video) = mpsc::unbounded_channel::<(Instant, Arc)>(); let mut conn = Self { - inner: ConnInner { id, tx: Some(tx) }, + inner: ConnInner { + id, + tx: Some(tx), + tx_video: Some(tx_video), + }, stream, server, hash, @@ -131,8 +143,11 @@ impl Connection { let mut test_delay_timer = time::interval_at(Instant::now() + TEST_DELAY_TIMEOUT, TEST_DELAY_TIMEOUT); let mut last_recv_time = Instant::now(); + loop { tokio::select! { + biased; + Some(data) = rx_from_cm.recv() => { match data { ipc::Data::Authorize => { @@ -193,26 +208,6 @@ impl Connection { } _ => {} } - } - Some((instant, value)) = rx.recv() => { - let latency = instant.elapsed().as_millis() as i64; - super::video_service::update_internal_latency(id, latency); - let msg: &Message = &value; - if latency > 1000 { - match &msg.union { - Some(message::Union::video_frame(_)) => { - continue; - } - Some(message::Union::audio_frame(_)) => { - continue; - } - _ => {} - } - } - if let Err(err) = conn.stream.send(msg).await { - conn.on_close(&err.to_string(), false); - break; - } }, res = conn.stream.next() => { if let Some(res) = res { @@ -244,7 +239,32 @@ impl Connection { } else { conn.timer = time::interval_at(Instant::now() + SEC30, SEC30); } - } + }, + Some((instant, value)) = rx_video.recv() => { + video_service::notify_video_frame_feched(id, Some(instant.into())); + if let Err(err) = conn.stream.send(&value as &Message).await { + conn.on_close(&err.to_string(), false); + break; + } + }, + Some((instant, value)) = rx.recv() => { + let latency = instant.elapsed().as_millis() as i64; + let msg: &Message = &value; + + if latency > 1000 { + match &msg.union { + Some(message::Union::audio_frame(_)) => { + log::info!("audio frame latency {}", instant.elapsed().as_secs_f32()); + continue; + } + _ => {} + } + } + if let Err(err) = conn.stream.send(msg).await { + conn.on_close(&err.to_string(), false); + break; + } + }, _ = test_delay_timer.tick() => { if last_recv_time.elapsed() >= SEC30 { conn.on_close("Timeout", true); @@ -263,7 +283,8 @@ impl Connection { } } } - super::video_service::update_internal_latency(id, 0); + video_service::notify_video_frame_feched(id, None); + super::video_service::update_test_latency(id, 0); super::video_service::update_image_quality(id, None); if let Some(forward) = conn.port_forward_socket.as_mut() { diff --git a/src/server/input_service.rs b/src/server/input_service.rs index 056267c5b..aee13304b 100644 --- a/src/server/input_service.rs +++ b/src/server/input_service.rs @@ -79,6 +79,8 @@ impl Subscriber for MouseCursorSub { self.inner.send(msg); } } + + fn send_video_frame(&mut self, _: std::time::Instant, _: Arc) {} } pub const NAME_CURSOR: &'static str = "mouse_cursor"; diff --git a/src/server/service.rs b/src/server/service.rs index 150cccb75..7f280130f 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -1,5 +1,6 @@ use super::*; use std::{ + collections::HashSet, thread::{self, JoinHandle}, time, }; @@ -15,6 +16,7 @@ pub trait Service: Send + Sync { pub trait Subscriber: Default + Send + Sync + 'static { fn id(&self) -> i32; fn send(&mut self, msg: Arc); + fn send_video_frame(&mut self, tm: time::Instant, msg: Arc); } #[derive(Default)] @@ -143,6 +145,20 @@ impl> ServiceTmpl { } } + pub fn send_video_frame(&self, tm: time::Instant, msg: Message) -> HashSet { + self.send_video_frame_shared(tm, Arc::new(msg)) + } + + pub fn send_video_frame_shared(&self, tm: time::Instant, msg: Arc) -> HashSet { + let mut conn_ids = HashSet::new(); + let mut lock = self.0.write().unwrap(); + for s in lock.subscribes.values_mut() { + s.send_video_frame(tm, msg.clone()); + conn_ids.insert(s.id()); + } + conn_ids + } + pub fn send_without(&self, msg: Message, sub: i32) { let mut lock = self.0.write().unwrap(); let msg = Arc::new(msg); diff --git a/src/server/video_service.rs b/src/server/video_service.rs index 1d698b9df..f3b8327fd 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -7,7 +7,7 @@ // how to capture with mouse cursor: // https://docs.microsoft.com/zh-cn/windows/win32/direct3ddxgi/desktop-dup-api?redirectedfrom=MSDN -// 实现了硬件编解码和音频抓取,还绘制了鼠标 +// RECORD: The following Project has implemented audio capture, hardware codec and mouse cursor drawn. // https://github.com/PHZ76/DesktopSharing // dxgi memory leak issue @@ -19,10 +19,18 @@ // https://slhck.info/video/2017/03/01/rate-control.html use super::*; +use hbb_common::tokio::{ + runtime::Runtime, + sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + Mutex as TokioMutex, + }, +}; use scrap::{Capturer, Config, Display, EncodeFrame, Encoder, VideoCodecId, STRIDE_ALIGN}; use std::{ + collections::HashSet, io::ErrorKind::WouldBlock, - time::{self, Instant}, + time::{self, Duration, Instant}, }; const WAIT_BASE: i32 = 17; @@ -32,9 +40,85 @@ lazy_static::lazy_static! { static ref CURRENT_DISPLAY: Arc> = Arc::new(Mutex::new(usize::MAX)); static ref LAST_ACTIVE: Arc> = Arc::new(Mutex::new(Instant::now())); static ref SWITCH: Arc> = Default::default(); - static ref INTERNAL_LATENCIES: Arc>> = Default::default(); static ref TEST_LATENCIES: Arc>> = Default::default(); static ref IMAGE_QUALITIES: Arc>> = Default::default(); + static ref FRAME_FETCHED_NOTIFIER: (UnboundedSender<(i32, Option)>, Arc)>>>) = { + let (tx, rx) = unbounded_channel(); + (tx, Arc::new(TokioMutex::new(rx))) + }; +} + +pub fn notify_video_frame_feched(conn_id: i32, frame_tm: Option) { + FRAME_FETCHED_NOTIFIER.0.send((conn_id, frame_tm)).unwrap() +} + +struct VideoFrameController { + cur: Instant, + send_conn_ids: HashSet, + rt: Runtime, +} + +impl VideoFrameController { + fn new() -> Self { + Self { + cur: Instant::now(), + send_conn_ids: HashSet::new(), + rt: Runtime::new().unwrap(), + } + } + + fn reset(&mut self) { + self.send_conn_ids.clear(); + } + + fn set_send(&mut self, tm: Instant, conn_ids: HashSet) { + if !conn_ids.is_empty() { + self.cur = tm; + self.send_conn_ids = conn_ids; + } + } + + fn blocking_wait_next(&mut self, timeout_millis: u128) { + if self.send_conn_ids.is_empty() { + return; + } + + let send_conn_ids = self.send_conn_ids.clone(); + self.rt.block_on(async move { + let mut fetched_conn_ids = HashSet::new(); + let begin = Instant::now(); + while begin.elapsed().as_millis() < timeout_millis { + let timeout_dur = + Duration::from_millis((timeout_millis - begin.elapsed().as_millis()) as u64); + match tokio::time::timeout( + timeout_dur, + FRAME_FETCHED_NOTIFIER.1.lock().await.recv(), + ) + .await + { + Err(_) => { + // break if timeout + log::error!("blocking wait frame receiving timeout {}", timeout_millis); + break; + } + Ok(Some((id, instant))) => { + if let Some(tm) = instant { + log::trace!("channel recv latency: {}", tm.elapsed().as_secs_f32()); + } + fetched_conn_ids.insert(id); + + // break if all connections have received current frame + if fetched_conn_ids.is_superset(&send_conn_ids) { + break; + } + } + Ok(None) => { + // this branch would nerver be reached + } + } + } + }); + } } pub fn new() -> GenericService { @@ -129,9 +213,11 @@ fn run(sp: GenericService) -> ResultType<()> { *SWITCH.lock().unwrap() = false; sp.send(msg_out); } + + let mut frame_controller = VideoFrameController::new(); + let mut crc = (0, 0); let start = time::Instant::now(); - let mut last_sent = time::Instant::now(); let mut last_check_displays = time::Instant::now(); #[cfg(windows)] let mut try_gdi = true; @@ -164,43 +250,47 @@ fn run(sp: GenericService) -> ResultType<()> { } } *LAST_ACTIVE.lock().unwrap() = now; - if get_latency() < 1000 || last_sent.elapsed().as_millis() > 1000 { - match c.frame(wait as _) { - Ok(frame) => { - let time = now - start; - let ms = (time.as_secs() * 1000 + time.subsec_millis() as u64) as i64; - handle_one_frame(&sp, &frame, ms, &mut crc, &mut vpx)?; - last_sent = now; - #[cfg(windows)] - { - try_gdi = false; - } - } - Err(ref e) if e.kind() == WouldBlock => { - // https://github.com/NVIDIA/video-sdk-samples/tree/master/nvEncDXGIOutputDuplicationSample - wait = WAIT_BASE - now.elapsed().as_millis() as i32; - if wait < 0 { - wait = 0 - } - #[cfg(windows)] - if try_gdi && !c.is_gdi() { - c.set_gdi(); - try_gdi = false; - log::info!("No image, fall back to gdi"); - } - continue; - } - Err(err) => { - if check_display_changed(ndisplay, current, width, height) { - log::info!("Displays changed"); - *SWITCH.lock().unwrap() = true; - bail!("SWITCH"); - } - return Err(err.into()); + frame_controller.reset(); + + match c.frame(wait as _) { + Ok(frame) => { + let time = now - start; + let ms = (time.as_secs() * 1000 + time.subsec_millis() as u64) as i64; + let send_conn_ids = handle_one_frame(&sp, now, &frame, ms, &mut crc, &mut vpx)?; + frame_controller.set_send(now, send_conn_ids); + #[cfg(windows)] + { + try_gdi = false; } } + Err(ref e) if e.kind() == WouldBlock => { + // https://github.com/NVIDIA/video-sdk-samples/tree/master/nvEncDXGIOutputDuplicationSample + wait = WAIT_BASE - now.elapsed().as_millis() as i32; + if wait < 0 { + wait = 0 + } + #[cfg(windows)] + if try_gdi && !c.is_gdi() { + c.set_gdi(); + try_gdi = false; + log::info!("No image, fall back to gdi"); + } + continue; + } + Err(err) => { + if check_display_changed(ndisplay, current, width, height) { + log::info!("Displays changed"); + *SWITCH.lock().unwrap() = true; + bail!("SWITCH"); + } + + return Err(err.into()); + } } + + frame_controller.blocking_wait_next(1000 * 5); + let elapsed = now.elapsed(); // may need to enable frame(timeout) log::trace!("{:?} {:?}", time::Instant::now(), elapsed); @@ -236,11 +326,12 @@ fn create_frame(frame: &EncodeFrame) -> VP9 { #[inline] fn handle_one_frame( sp: &GenericService, + now: Instant, frame: &[u8], ms: i64, crc: &mut (u32, u32), vpx: &mut Encoder, -) -> ResultType<()> { +) -> ResultType> { sp.snapshot(|sps| { // so that new sub and old sub share the same encoder after switch if sps.has_subscribes() { @@ -257,6 +348,8 @@ fn handle_one_frame( } else { crc.1 += 1; } + + let mut send_conn_ids: HashSet = Default::default(); if crc.1 <= 180 && crc.1 % 5 == 0 { let mut frames = Vec::new(); for ref frame in vpx @@ -268,12 +361,13 @@ fn handle_one_frame( for ref frame in vpx.flush().with_context(|| "Failed to flush")? { frames.push(create_frame(frame)); } + // to-do: flush periodically, e.g. 1 second if frames.len() > 0 { - sp.send(create_msg(frames)); + send_conn_ids = sp.send_video_frame(now, create_msg(frames)); } } - Ok(()) + Ok(send_conn_ids) } fn get_display_num() -> usize { @@ -373,20 +467,6 @@ pub fn update_test_latency(id: i32, latency: i64) { update_latency(id, latency, &mut *TEST_LATENCIES.lock().unwrap()); } -pub fn update_internal_latency(id: i32, latency: i64) { - update_latency(id, latency, &mut *INTERNAL_LATENCIES.lock().unwrap()); -} - -pub fn get_latency() -> i64 { - INTERNAL_LATENCIES - .lock() - .unwrap() - .values() - .max() - .unwrap_or(&0) - .clone() -} - fn convert_quality(q: i32) -> i32 { let q = { if q == ImageQuality::Balanced.value() {