mirror of
https://github.com/rustdesk/rustdesk.git
synced 2024-12-03 03:19:27 +08:00
Merge pull request #335 from fufesou/fix_video_image_tearing
Fix video image tearing
This commit is contained in:
commit
b9d8de1ec1
@ -26,6 +26,7 @@ lazy_static::lazy_static! {
|
||||
pub struct ConnInner {
|
||||
id: i32,
|
||||
tx: Option<Sender>,
|
||||
tx_video: Option<Sender>,
|
||||
}
|
||||
|
||||
pub struct Connection {
|
||||
@ -65,6 +66,12 @@ impl Subscriber for ConnInner {
|
||||
allow_err!(tx.send((Instant::now(), msg)));
|
||||
});
|
||||
}
|
||||
|
||||
fn send_video_frame(&mut self, tm: std::time::Instant, msg: Arc<Message>) {
|
||||
self.tx_video.as_mut().map(|tx| {
|
||||
allow_err!(tx.send((tm.into(), msg)));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const TEST_DELAY_TIMEOUT: Duration = Duration::from_secs(3);
|
||||
@ -87,8 +94,13 @@ impl Connection {
|
||||
let (tx_from_cm, mut rx_from_cm) = mpsc::unbounded_channel::<ipc::Data>();
|
||||
let (tx_to_cm, rx_to_cm) = mpsc::unbounded_channel::<ipc::Data>();
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
|
||||
let (tx_video, mut rx_video) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
|
||||
let mut conn = Self {
|
||||
inner: ConnInner { id, tx: Some(tx) },
|
||||
inner: ConnInner {
|
||||
id,
|
||||
tx: Some(tx),
|
||||
tx_video: Some(tx_video),
|
||||
},
|
||||
stream,
|
||||
server,
|
||||
hash,
|
||||
@ -131,8 +143,11 @@ impl Connection {
|
||||
let mut test_delay_timer =
|
||||
time::interval_at(Instant::now() + TEST_DELAY_TIMEOUT, TEST_DELAY_TIMEOUT);
|
||||
let mut last_recv_time = Instant::now();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
Some(data) = rx_from_cm.recv() => {
|
||||
match data {
|
||||
ipc::Data::Authorize => {
|
||||
@ -193,26 +208,6 @@ impl Connection {
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Some((instant, value)) = rx.recv() => {
|
||||
let latency = instant.elapsed().as_millis() as i64;
|
||||
super::video_service::update_internal_latency(id, latency);
|
||||
let msg: &Message = &value;
|
||||
if latency > 1000 {
|
||||
match &msg.union {
|
||||
Some(message::Union::video_frame(_)) => {
|
||||
continue;
|
||||
}
|
||||
Some(message::Union::audio_frame(_)) => {
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
if let Err(err) = conn.stream.send(msg).await {
|
||||
conn.on_close(&err.to_string(), false);
|
||||
break;
|
||||
}
|
||||
},
|
||||
res = conn.stream.next() => {
|
||||
if let Some(res) = res {
|
||||
@ -244,7 +239,32 @@ impl Connection {
|
||||
} else {
|
||||
conn.timer = time::interval_at(Instant::now() + SEC30, SEC30);
|
||||
}
|
||||
}
|
||||
},
|
||||
Some((instant, value)) = rx_video.recv() => {
|
||||
video_service::notify_video_frame_feched(id, Some(instant.into()));
|
||||
if let Err(err) = conn.stream.send(&value as &Message).await {
|
||||
conn.on_close(&err.to_string(), false);
|
||||
break;
|
||||
}
|
||||
},
|
||||
Some((instant, value)) = rx.recv() => {
|
||||
let latency = instant.elapsed().as_millis() as i64;
|
||||
let msg: &Message = &value;
|
||||
|
||||
if latency > 1000 {
|
||||
match &msg.union {
|
||||
Some(message::Union::audio_frame(_)) => {
|
||||
log::info!("audio frame latency {}", instant.elapsed().as_secs_f32());
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
if let Err(err) = conn.stream.send(msg).await {
|
||||
conn.on_close(&err.to_string(), false);
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = test_delay_timer.tick() => {
|
||||
if last_recv_time.elapsed() >= SEC30 {
|
||||
conn.on_close("Timeout", true);
|
||||
@ -263,7 +283,8 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
}
|
||||
super::video_service::update_internal_latency(id, 0);
|
||||
video_service::notify_video_frame_feched(id, None);
|
||||
|
||||
super::video_service::update_test_latency(id, 0);
|
||||
super::video_service::update_image_quality(id, None);
|
||||
if let Some(forward) = conn.port_forward_socket.as_mut() {
|
||||
|
@ -79,6 +79,8 @@ impl Subscriber for MouseCursorSub {
|
||||
self.inner.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
fn send_video_frame(&mut self, _: std::time::Instant, _: Arc<Message>) {}
|
||||
}
|
||||
|
||||
pub const NAME_CURSOR: &'static str = "mouse_cursor";
|
||||
|
@ -1,5 +1,6 @@
|
||||
use super::*;
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
thread::{self, JoinHandle},
|
||||
time,
|
||||
};
|
||||
@ -15,6 +16,7 @@ pub trait Service: Send + Sync {
|
||||
pub trait Subscriber: Default + Send + Sync + 'static {
|
||||
fn id(&self) -> i32;
|
||||
fn send(&mut self, msg: Arc<Message>);
|
||||
fn send_video_frame(&mut self, tm: time::Instant, msg: Arc<Message>);
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@ -143,6 +145,20 @@ impl<T: Subscriber + From<ConnInner>> ServiceTmpl<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_video_frame(&self, tm: time::Instant, msg: Message) -> HashSet<i32> {
|
||||
self.send_video_frame_shared(tm, Arc::new(msg))
|
||||
}
|
||||
|
||||
pub fn send_video_frame_shared(&self, tm: time::Instant, msg: Arc<Message>) -> HashSet<i32> {
|
||||
let mut conn_ids = HashSet::new();
|
||||
let mut lock = self.0.write().unwrap();
|
||||
for s in lock.subscribes.values_mut() {
|
||||
s.send_video_frame(tm, msg.clone());
|
||||
conn_ids.insert(s.id());
|
||||
}
|
||||
conn_ids
|
||||
}
|
||||
|
||||
pub fn send_without(&self, msg: Message, sub: i32) {
|
||||
let mut lock = self.0.write().unwrap();
|
||||
let msg = Arc::new(msg);
|
||||
|
@ -7,7 +7,7 @@
|
||||
// how to capture with mouse cursor:
|
||||
// https://docs.microsoft.com/zh-cn/windows/win32/direct3ddxgi/desktop-dup-api?redirectedfrom=MSDN
|
||||
|
||||
// 实现了硬件编解码和音频抓取,还绘制了鼠标
|
||||
// RECORD: The following Project has implemented audio capture, hardware codec and mouse cursor drawn.
|
||||
// https://github.com/PHZ76/DesktopSharing
|
||||
|
||||
// dxgi memory leak issue
|
||||
@ -19,10 +19,18 @@
|
||||
// https://slhck.info/video/2017/03/01/rate-control.html
|
||||
|
||||
use super::*;
|
||||
use hbb_common::tokio::{
|
||||
runtime::Runtime,
|
||||
sync::{
|
||||
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||
Mutex as TokioMutex,
|
||||
},
|
||||
};
|
||||
use scrap::{Capturer, Config, Display, EncodeFrame, Encoder, VideoCodecId, STRIDE_ALIGN};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
io::ErrorKind::WouldBlock,
|
||||
time::{self, Instant},
|
||||
time::{self, Duration, Instant},
|
||||
};
|
||||
|
||||
const WAIT_BASE: i32 = 17;
|
||||
@ -32,9 +40,85 @@ lazy_static::lazy_static! {
|
||||
static ref CURRENT_DISPLAY: Arc<Mutex<usize>> = Arc::new(Mutex::new(usize::MAX));
|
||||
static ref LAST_ACTIVE: Arc<Mutex<Instant>> = Arc::new(Mutex::new(Instant::now()));
|
||||
static ref SWITCH: Arc<Mutex<bool>> = Default::default();
|
||||
static ref INTERNAL_LATENCIES: Arc<Mutex<HashMap<i32, i64>>> = Default::default();
|
||||
static ref TEST_LATENCIES: Arc<Mutex<HashMap<i32, i64>>> = Default::default();
|
||||
static ref IMAGE_QUALITIES: Arc<Mutex<HashMap<i32, i32>>> = Default::default();
|
||||
static ref FRAME_FETCHED_NOTIFIER: (UnboundedSender<(i32, Option<Instant>)>, Arc<TokioMutex<UnboundedReceiver<(i32, Option<Instant>)>>>) = {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
(tx, Arc::new(TokioMutex::new(rx)))
|
||||
};
|
||||
}
|
||||
|
||||
pub fn notify_video_frame_feched(conn_id: i32, frame_tm: Option<Instant>) {
|
||||
FRAME_FETCHED_NOTIFIER.0.send((conn_id, frame_tm)).unwrap()
|
||||
}
|
||||
|
||||
struct VideoFrameController {
|
||||
cur: Instant,
|
||||
send_conn_ids: HashSet<i32>,
|
||||
rt: Runtime,
|
||||
}
|
||||
|
||||
impl VideoFrameController {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
cur: Instant::now(),
|
||||
send_conn_ids: HashSet::new(),
|
||||
rt: Runtime::new().unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn reset(&mut self) {
|
||||
self.send_conn_ids.clear();
|
||||
}
|
||||
|
||||
fn set_send(&mut self, tm: Instant, conn_ids: HashSet<i32>) {
|
||||
if !conn_ids.is_empty() {
|
||||
self.cur = tm;
|
||||
self.send_conn_ids = conn_ids;
|
||||
}
|
||||
}
|
||||
|
||||
fn blocking_wait_next(&mut self, timeout_millis: u128) {
|
||||
if self.send_conn_ids.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let send_conn_ids = self.send_conn_ids.clone();
|
||||
self.rt.block_on(async move {
|
||||
let mut fetched_conn_ids = HashSet::new();
|
||||
let begin = Instant::now();
|
||||
while begin.elapsed().as_millis() < timeout_millis {
|
||||
let timeout_dur =
|
||||
Duration::from_millis((timeout_millis - begin.elapsed().as_millis()) as u64);
|
||||
match tokio::time::timeout(
|
||||
timeout_dur,
|
||||
FRAME_FETCHED_NOTIFIER.1.lock().await.recv(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(_) => {
|
||||
// break if timeout
|
||||
log::error!("blocking wait frame receiving timeout {}", timeout_millis);
|
||||
break;
|
||||
}
|
||||
Ok(Some((id, instant))) => {
|
||||
if let Some(tm) = instant {
|
||||
log::trace!("channel recv latency: {}", tm.elapsed().as_secs_f32());
|
||||
}
|
||||
fetched_conn_ids.insert(id);
|
||||
|
||||
// break if all connections have received current frame
|
||||
if fetched_conn_ids.is_superset(&send_conn_ids) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// this branch would nerver be reached
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new() -> GenericService {
|
||||
@ -129,9 +213,11 @@ fn run(sp: GenericService) -> ResultType<()> {
|
||||
*SWITCH.lock().unwrap() = false;
|
||||
sp.send(msg_out);
|
||||
}
|
||||
|
||||
let mut frame_controller = VideoFrameController::new();
|
||||
|
||||
let mut crc = (0, 0);
|
||||
let start = time::Instant::now();
|
||||
let mut last_sent = time::Instant::now();
|
||||
let mut last_check_displays = time::Instant::now();
|
||||
#[cfg(windows)]
|
||||
let mut try_gdi = true;
|
||||
@ -164,43 +250,47 @@ fn run(sp: GenericService) -> ResultType<()> {
|
||||
}
|
||||
}
|
||||
*LAST_ACTIVE.lock().unwrap() = now;
|
||||
if get_latency() < 1000 || last_sent.elapsed().as_millis() > 1000 {
|
||||
match c.frame(wait as _) {
|
||||
Ok(frame) => {
|
||||
let time = now - start;
|
||||
let ms = (time.as_secs() * 1000 + time.subsec_millis() as u64) as i64;
|
||||
handle_one_frame(&sp, &frame, ms, &mut crc, &mut vpx)?;
|
||||
last_sent = now;
|
||||
#[cfg(windows)]
|
||||
{
|
||||
try_gdi = false;
|
||||
}
|
||||
}
|
||||
Err(ref e) if e.kind() == WouldBlock => {
|
||||
// https://github.com/NVIDIA/video-sdk-samples/tree/master/nvEncDXGIOutputDuplicationSample
|
||||
wait = WAIT_BASE - now.elapsed().as_millis() as i32;
|
||||
if wait < 0 {
|
||||
wait = 0
|
||||
}
|
||||
#[cfg(windows)]
|
||||
if try_gdi && !c.is_gdi() {
|
||||
c.set_gdi();
|
||||
try_gdi = false;
|
||||
log::info!("No image, fall back to gdi");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
if check_display_changed(ndisplay, current, width, height) {
|
||||
log::info!("Displays changed");
|
||||
*SWITCH.lock().unwrap() = true;
|
||||
bail!("SWITCH");
|
||||
}
|
||||
|
||||
return Err(err.into());
|
||||
frame_controller.reset();
|
||||
|
||||
match c.frame(wait as _) {
|
||||
Ok(frame) => {
|
||||
let time = now - start;
|
||||
let ms = (time.as_secs() * 1000 + time.subsec_millis() as u64) as i64;
|
||||
let send_conn_ids = handle_one_frame(&sp, now, &frame, ms, &mut crc, &mut vpx)?;
|
||||
frame_controller.set_send(now, send_conn_ids);
|
||||
#[cfg(windows)]
|
||||
{
|
||||
try_gdi = false;
|
||||
}
|
||||
}
|
||||
Err(ref e) if e.kind() == WouldBlock => {
|
||||
// https://github.com/NVIDIA/video-sdk-samples/tree/master/nvEncDXGIOutputDuplicationSample
|
||||
wait = WAIT_BASE - now.elapsed().as_millis() as i32;
|
||||
if wait < 0 {
|
||||
wait = 0
|
||||
}
|
||||
#[cfg(windows)]
|
||||
if try_gdi && !c.is_gdi() {
|
||||
c.set_gdi();
|
||||
try_gdi = false;
|
||||
log::info!("No image, fall back to gdi");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
if check_display_changed(ndisplay, current, width, height) {
|
||||
log::info!("Displays changed");
|
||||
*SWITCH.lock().unwrap() = true;
|
||||
bail!("SWITCH");
|
||||
}
|
||||
|
||||
return Err(err.into());
|
||||
}
|
||||
}
|
||||
|
||||
frame_controller.blocking_wait_next(1000 * 5);
|
||||
|
||||
let elapsed = now.elapsed();
|
||||
// may need to enable frame(timeout)
|
||||
log::trace!("{:?} {:?}", time::Instant::now(), elapsed);
|
||||
@ -236,11 +326,12 @@ fn create_frame(frame: &EncodeFrame) -> VP9 {
|
||||
#[inline]
|
||||
fn handle_one_frame(
|
||||
sp: &GenericService,
|
||||
now: Instant,
|
||||
frame: &[u8],
|
||||
ms: i64,
|
||||
crc: &mut (u32, u32),
|
||||
vpx: &mut Encoder,
|
||||
) -> ResultType<()> {
|
||||
) -> ResultType<HashSet<i32>> {
|
||||
sp.snapshot(|sps| {
|
||||
// so that new sub and old sub share the same encoder after switch
|
||||
if sps.has_subscribes() {
|
||||
@ -257,6 +348,8 @@ fn handle_one_frame(
|
||||
} else {
|
||||
crc.1 += 1;
|
||||
}
|
||||
|
||||
let mut send_conn_ids: HashSet<i32> = Default::default();
|
||||
if crc.1 <= 180 && crc.1 % 5 == 0 {
|
||||
let mut frames = Vec::new();
|
||||
for ref frame in vpx
|
||||
@ -268,12 +361,13 @@ fn handle_one_frame(
|
||||
for ref frame in vpx.flush().with_context(|| "Failed to flush")? {
|
||||
frames.push(create_frame(frame));
|
||||
}
|
||||
|
||||
// to-do: flush periodically, e.g. 1 second
|
||||
if frames.len() > 0 {
|
||||
sp.send(create_msg(frames));
|
||||
send_conn_ids = sp.send_video_frame(now, create_msg(frames));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(send_conn_ids)
|
||||
}
|
||||
|
||||
fn get_display_num() -> usize {
|
||||
@ -373,20 +467,6 @@ pub fn update_test_latency(id: i32, latency: i64) {
|
||||
update_latency(id, latency, &mut *TEST_LATENCIES.lock().unwrap());
|
||||
}
|
||||
|
||||
pub fn update_internal_latency(id: i32, latency: i64) {
|
||||
update_latency(id, latency, &mut *INTERNAL_LATENCIES.lock().unwrap());
|
||||
}
|
||||
|
||||
pub fn get_latency() -> i64 {
|
||||
INTERNAL_LATENCIES
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.max()
|
||||
.unwrap_or(&0)
|
||||
.clone()
|
||||
}
|
||||
|
||||
fn convert_quality(q: i32) -> i32 {
|
||||
let q = {
|
||||
if q == ImageQuality::Balanced.value() {
|
||||
|
Loading…
Reference in New Issue
Block a user