From 0a28d09ff84355fcfd5837255c39cb85192e8642 Mon Sep 17 00:00:00 2001 From: zyl Date: Wed, 13 Nov 2024 15:35:23 +0800 Subject: [PATCH] fix: windows, improve audio buffer (#9770) (#9893) * fix: windows, improve audio buffer (#9770) * . * fix statics does not record and avoid channel changing when drio audio when audio is stero * add some commence --------- Co-authored-by: zylthinking --- src/client.rs | 153 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 131 insertions(+), 22 deletions(-) diff --git a/src/client.rs b/src/client.rs index 0b5293d22..f7453b88d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -118,7 +118,7 @@ pub const SCRAP_X11_REQUIRED: &str = "x11 expected"; pub const SCRAP_X11_REF_URL: &str = "https://rustdesk.com/docs/en/manual/linux/#x11-required"; #[cfg(not(any(target_os = "android", target_os = "linux")))] -pub const AUDIO_BUFFER_MS: usize = 150; +pub const AUDIO_BUFFER_MS: usize = 3000; #[cfg(feature = "flutter")] #[cfg(not(any(target_os = "android", target_os = "ios")))] @@ -904,36 +904,123 @@ pub struct AudioHandler { } #[cfg(not(any(target_os = "android", target_os = "linux")))] -struct AudioBuffer(pub Arc>>); +struct AudioBuffer( + pub Arc>>, + usize, + [usize; 30], +); #[cfg(not(any(target_os = "android", target_os = "linux")))] impl Default for AudioBuffer { fn default() -> Self { - Self(Arc::new(std::sync::Mutex::new( - ringbuf::HeapRb::::new(48000 * 2 * AUDIO_BUFFER_MS / 1000), // 48000hz, 2 channel - ))) + Self( + Arc::new(std::sync::Mutex::new( + ringbuf::HeapRb::::new(48000 * 2 * AUDIO_BUFFER_MS / 1000), // 48000hz, 2 channel + )), + 48000 * 2, + [0; 30], + ) } } #[cfg(not(any(target_os = "android", target_os = "linux")))] impl AudioBuffer { - pub fn resize(&self, sample_rate: usize, channels: usize) { + pub fn resize(&mut self, sample_rate: usize, channels: usize) { let capacity = sample_rate * channels * AUDIO_BUFFER_MS / 1000; let old_capacity = self.0.lock().unwrap().capacity(); if capacity != old_capacity { *self.0.lock().unwrap() = ringbuf::HeapRb::::new(capacity); + self.1 = sample_rate * channels; log::info!("Audio buffer resized from {old_capacity} to {capacity}"); } } - // clear when full to avoid long time noise - #[inline] - pub fn clear_if_full(&self) { - let full = self.0.lock().unwrap().is_full(); - if full { - self.0.lock().unwrap().clear(); - log::trace!("Audio buffer cleared"); + fn try_shrink(&mut self, having: usize) { + extern crate chrono; + use chrono::prelude::*; + + let mut i = (having * 10) / self.1; + if i > 29 { + i = 29; } + self.2[i] += 1; + + static mut tms: i64 = 0; + let dt = Local::now().timestamp_millis(); + unsafe { + if tms == 0 { + tms = dt; + return; + } else if dt < tms + 12000 { + return; + } + tms = dt; + } + + // the safer water mark to drop + let mut zero = 0; + // the water mark taking most of time + let mut max = 0; + for i in 0..30 { + if self.2[i] == 0 && zero == i { + zero += 1; + } + + if self.2[i] > self.2[max] { + self.2[max] = 0; + max = i; + } else { + self.2[i] = 0; + } + } + zero = zero * 2 / 3; + + // how many data can be dropped: + // 1. will not drop if buffered data is less than 600ms + // 2. choose based on min(zero, max) + const N: usize = 4; + self.2[max] = 0; + if max < 6 { + return; + } else if max > zero * N { + max = zero * N; + } + + let mut lock = self.0.lock().unwrap(); + let cap = lock.capacity(); + let having = lock.occupied_len(); + let skip = (cap * max / (30 * N) + 1) & (!1); + if (having > skip * 3) && (skip > 0) { + lock.skip(skip); + log::info!("skip {skip}, based {max} {zero}"); + } + } + + /// append pcm to audio buffer, if buffered data + /// exceeds AUDIO_BUFFER_MS, only AUDIO_BUFFER_MS + /// will be kept. + fn append_pcm2(&self, buffer: &[f32]) -> usize { + let mut lock = self.0.lock().unwrap(); + let cap = lock.capacity(); + if buffer.len() > cap { + lock.push_slice_overwrite(buffer); + return cap; + } + + let having = lock.occupied_len() + buffer.len(); + if having > cap { + lock.skip(having - cap); + } + lock.push_slice_overwrite(buffer); + lock.occupied_len() + } + + /// append pcm to audio buffer, trying to drop data + /// when data is too much (per 12 seconds) based + /// statistics. + pub fn append_pcm(&mut self, buffer: &[f32]) { + let having = self.append_pcm2(buffer); + self.try_shrink(having); } } @@ -993,7 +1080,9 @@ impl AudioHandler { let sample_format = config.sample_format(); log::info!("Default output format: {:?}", config); log::info!("Remote input format: {:?}", format0); - let config: StreamConfig = config.into(); + let mut config: StreamConfig = config.into(); + config.buffer_size = cpal::BufferSize::Fixed(64); + self.sample_rate = (format0.sample_rate, config.sample_rate.0); let mut build_output_stream = |config: StreamConfig| match sample_format { cpal::SampleFormat::I8 => self.build_output_stream::(&config, &device), @@ -1062,7 +1151,6 @@ impl AudioHandler { { let sample_rate0 = self.sample_rate.0; let sample_rate = self.sample_rate.1; - let audio_buffer = self.audio_buffer.0.clone(); let mut buffer = buffer[0..n].to_owned(); if sample_rate != sample_rate0 { buffer = crate::audio_resample( @@ -1081,8 +1169,7 @@ impl AudioHandler { self.device_channel, ); } - self.audio_buffer.clear_if_full(); - audio_buffer.lock().unwrap().push_slice_overwrite(&buffer); + self.audio_buffer.append_pcm(&buffer); } #[cfg(target_os = "android")] { @@ -1117,18 +1204,40 @@ impl AudioHandler { let timeout = None; let stream = device.build_output_stream( config, - move |data: &mut [T], _: &_| { + move |data: &mut [T], info: &cpal::OutputCallbackInfo| { if !*ready.lock().unwrap() { *ready.lock().unwrap() = true; } - let mut lock = audio_buffer.lock().unwrap(); + let mut n = data.len(); - if lock.occupied_len() < n { - n = lock.occupied_len(); + let mut lock = audio_buffer.lock().unwrap(); + let mut having = lock.occupied_len(); + if having < n { + let tms = info.timestamp(); + let how_long = tms + .playback + .duration_since(&tms.callback) + .unwrap_or(Duration::from_millis(0)); + + // must long enough to fight back scheuler delay + if how_long > Duration::from_millis(6) { + drop(lock); + std::thread::sleep(how_long.div_f32(1.2)); + lock = audio_buffer.lock().unwrap(); + having = lock.occupied_len(); + } + + if having < n { + n = having; + } } + let mut elems = vec![0.0f32; n]; - lock.pop_slice(&mut elems); + if n > 0 { + lock.pop_slice(&mut elems); + } drop(lock); + let mut input = elems.into_iter(); for sample in data.iter_mut() { *sample = match input.next() {