image tearing fixed

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou 2021-12-20 22:21:15 +08:00
parent 3ffd8ac146
commit 8a6b33b841

View File

@ -30,7 +30,7 @@ use scrap::{Capturer, Config, Display, EncodeFrame, Encoder, VideoCodecId, STRID
use std::{
collections::HashSet,
io::ErrorKind::WouldBlock,
time::{self, Instant},
time::{self, Duration, Instant},
};
const WAIT_BASE: i32 = 17;
@ -55,7 +55,6 @@ pub fn notify_video_frame_feched(conn_id: i32, frame_tm: Option<Instant>) {
struct VideoFrameController {
cur: Instant,
send_conn_ids: HashSet<i32>,
fetched_conn_ids: HashSet<i32>,
rt: Runtime,
}
@ -64,14 +63,12 @@ impl VideoFrameController {
Self {
cur: Instant::now(),
send_conn_ids: HashSet::new(),
fetched_conn_ids: HashSet::new(),
rt: Runtime::new().unwrap(),
}
}
fn reset(&mut self) {
self.send_conn_ids.clear();
self.fetched_conn_ids.clear();
}
fn set_send(&mut self, tm: Instant, conn_ids: HashSet<i32>) {
@ -81,23 +78,46 @@ impl VideoFrameController {
}
}
fn blocking_wait_next(&mut self) -> bool {
match self
.rt
.block_on(async move { FRAME_FETCHED_NOTIFIER.1.lock().await.recv().await })
{
Some((id, instant)) => {
if let Some(tm) = instant {
log::trace!("channel recv latency: {}", tm.elapsed().as_secs_f32());
}
self.fetched_conn_ids.insert(id);
}
_ => {
// this branch would nerver be reached
}
fn blocking_wait_next(&mut self, timeout_millis: u128) {
if self.send_conn_ids.is_empty() {
return;
}
return self.fetched_conn_ids.is_superset(&self.send_conn_ids);
let send_conn_ids = self.send_conn_ids.clone();
self.rt.block_on(async move {
let mut fetched_conn_ids = HashSet::new();
let begin = Instant::now();
while begin.elapsed().as_millis() < timeout_millis {
let timeout_dur =
Duration::from_millis((timeout_millis - begin.elapsed().as_millis()) as u64);
match tokio::time::timeout(
timeout_dur,
FRAME_FETCHED_NOTIFIER.1.lock().await.recv(),
)
.await
{
Err(_) => {
// break if timeout
log::error!("blocking wait frame receiving timeout {}", timeout_millis);
break;
}
Ok(Some((id, instant))) => {
if let Some(tm) = instant {
log::trace!("channel recv latency: {}", tm.elapsed().as_secs_f32());
}
fetched_conn_ids.insert(id);
// break if all connections have received current frame
if fetched_conn_ids.is_superset(&send_conn_ids) {
break;
}
}
Ok(None) => {
// this branch would nerver be reached
}
}
}
});
}
}
@ -269,9 +289,7 @@ fn run(sp: GenericService) -> ResultType<()> {
}
}
while !frame_controller.blocking_wait_next() {
// just wait until all connection send the frame
}
frame_controller.blocking_wait_next(1000 * 5);
let elapsed = now.elapsed();
// may need to enable frame(timeout)