// both soundio and cpal use wasapi on windows and coreaudio on mac, they do not support loopback. // libpulseaudio support loopback because pulseaudio is a standalone audio service with some // configuration, but need to install the library and start the service on OS, not a good choice. // windows: https://docs.microsoft.com/en-us/windows/win32/coreaudio/loopback-recording // mac: https://github.com/mattingalls/Soundflower // https://docs.microsoft.com/en-us/windows/win32/api/audioclient/nn-audioclient-iaudioclient // https://github.com/ExistentialAudio/BlackHole // if pactl not work, please run // sudo apt-get --purge --reinstall install pulseaudio // https://askubuntu.com/questions/403416/how-to-listen-live-sounds-from-input-from-external-sound-card // https://wiki.debian.org/audio-loopback // https://github.com/krruzic/pulsectl use super::*; #[cfg(not(any(target_os = "linux", target_os = "android")))] use hbb_common::anyhow::anyhow; use magnum_opus::{Application::*, Channels::*, Encoder}; use std::sync::atomic::{AtomicBool, Ordering}; pub const NAME: &'static str = "audio"; pub const AUDIO_DATA_SIZE_U8: usize = 960 * 4; // 10ms in 48000 stereo static RESTARTING: AtomicBool = AtomicBool::new(false); #[cfg(not(any(target_os = "linux", target_os = "android")))] pub fn new() -> GenericService { let svc = EmptyExtraFieldService::new(NAME.to_owned(), true); GenericService::repeat::(&svc.clone(), 33, cpal_impl::run); svc.sp } #[cfg(any(target_os = "linux", target_os = "android"))] pub fn new() -> GenericService { let svc = EmptyExtraFieldService::new(NAME.to_owned(), true); GenericService::run(&svc.clone(), pa_impl::run); svc.sp } pub fn restart() { log::info!("restart the audio service, freezing now..."); if RESTARTING.load(Ordering::SeqCst) { return; } RESTARTING.store(true, Ordering::SeqCst); } #[cfg(any(target_os = "linux", target_os = "android"))] mod pa_impl { use super::*; #[tokio::main(flavor = "current_thread")] pub async fn run(sp: EmptyExtraFieldService) -> ResultType<()> { hbb_common::sleep(0.1).await; // one moment to wait for _pa ipc RESTARTING.store(false, Ordering::SeqCst); #[cfg(target_os = "linux")] let mut stream = crate::ipc::connect(1000, "_pa").await?; unsafe { AUDIO_ZERO_COUNT = 0; } let mut encoder = Encoder::new(crate::platform::PA_SAMPLE_RATE, Stereo, LowDelay)?; #[cfg(target_os = "linux")] allow_err!( stream .send(&crate::ipc::Data::Config(( "audio-input".to_owned(), Some(Config::get_option("audio-input")) ))) .await ); #[cfg(target_os = "linux")] let zero_audio_frame: Vec = vec![0.; AUDIO_DATA_SIZE_U8 / 4]; while sp.ok() && !RESTARTING.load(Ordering::SeqCst) { sp.snapshot(|sps| { sps.send(create_format_msg(crate::platform::PA_SAMPLE_RATE, 2)); Ok(()) })?; #[cfg(target_os = "linux")] if let Ok(data) = stream.next_raw().await { if data.len() == 0 { send_f32(&zero_audio_frame, &mut encoder, &sp); continue; } if data.len() != AUDIO_DATA_SIZE_U8 { continue; } let data = unsafe { std::slice::from_raw_parts::(data.as_ptr() as _, data.len() / 4) }; send_f32(data, &mut encoder, &sp); } #[cfg(target_os = "android")] if let Some(data) = scrap::android::ffi::get_audio_raw() { let data = unsafe { std::slice::from_raw_parts::(data.as_ptr() as _, data.len() / 4) }; send_f32(data, &mut encoder, &sp); } else { hbb_common::sleep(0.1).await; } } Ok(()) } } #[cfg(not(any(target_os = "linux", target_os = "android")))] mod cpal_impl { use super::*; use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, BufferSize, Device, Host, InputCallbackInfo, StreamConfig, SupportedStreamConfig, }; lazy_static::lazy_static! { static ref HOST: Host = cpal::default_host(); static ref INPUT_BUFFER: Arc>> = Default::default(); } #[derive(Default)] pub struct State { stream: Option<(Box, Arc)>, } impl super::service::Reset for State { fn reset(&mut self) { self.stream.take(); } } pub fn run(sp: EmptyExtraFieldService, state: &mut State) -> ResultType<()> { sp.snapshot(|sps| { match &state.stream { None => { state.stream = Some(play(&sp)?); } _ => {} } if let Some((_, format)) = &state.stream { sps.send_shared(format.clone()); } Ok(()) })?; Ok(()) } fn send( data: Vec, sample_rate0: u32, sample_rate: u32, device_channel: u16, encode_channel: u16, encoder: &mut Encoder, sp: &GenericService, ) { let mut data = data; if sample_rate0 != sample_rate { data = crate::common::audio_resample(&data, sample_rate0, sample_rate, device_channel); } if device_channel != encode_channel { data = crate::common::audio_rechannel( data, sample_rate, sample_rate, device_channel, encode_channel, ) } send_f32(&data, encoder, sp); } #[cfg(windows)] fn get_device() -> ResultType<(Device, SupportedStreamConfig)> { let audio_input = Config::get_option("audio-input"); if !audio_input.is_empty() { return get_audio_input(&audio_input); } let device = HOST .default_output_device() .with_context(|| "Failed to get default output device for loopback")?; log::info!( "Default output device: {}", device.name().unwrap_or("".to_owned()) ); let format = device .default_output_config() .map_err(|e| anyhow!(e)) .with_context(|| "Failed to get default output format")?; log::info!("Default output format: {:?}", format); Ok((device, format)) } #[cfg(not(windows))] fn get_device() -> ResultType<(Device, SupportedStreamConfig)> { let audio_input = Config::get_option("audio-input"); get_audio_input(&audio_input) } fn get_audio_input(audio_input: &str) -> ResultType<(Device, SupportedStreamConfig)> { let mut device = None; if !audio_input.is_empty() { for d in HOST .devices() .with_context(|| "Failed to get audio devices")? { if d.name().unwrap_or("".to_owned()) == audio_input { device = Some(d); break; } } } let device = device.unwrap_or( HOST.default_input_device() .with_context(|| "Failed to get default input device for loopback")?, ); log::info!("Input device: {}", device.name().unwrap_or("".to_owned())); let format = device .default_input_config() .map_err(|e| anyhow!(e)) .with_context(|| "Failed to get default input format")?; log::info!("Default input format: {:?}", format); Ok((device, format)) } fn play(sp: &GenericService) -> ResultType<(Box, Arc)> { use cpal::SampleFormat::*; let (device, config) = get_device()?; let sp = sp.clone(); // Sample rate must be one of 8000, 12000, 16000, 24000, or 48000. let sample_rate_0 = config.sample_rate().0; let sample_rate = if sample_rate_0 < 12000 { 8000 } else if sample_rate_0 < 16000 { 12000 } else if sample_rate_0 < 24000 { 16000 } else if sample_rate_0 < 48000 { 24000 } else { 48000 }; let ch = if config.channels() > 1 { Stereo } else { Mono }; let stream = match config.sample_format() { I8 => build_input_stream::(device, &config, sp, sample_rate, ch)?, I16 => build_input_stream::(device, &config, sp, sample_rate, ch)?, I32 => build_input_stream::(device, &config, sp, sample_rate, ch)?, I64 => build_input_stream::(device, &config, sp, sample_rate, ch)?, U8 => build_input_stream::(device, &config, sp, sample_rate, ch)?, U16 => build_input_stream::(device, &config, sp, sample_rate, ch)?, U32 => build_input_stream::(device, &config, sp, sample_rate, ch)?, U64 => build_input_stream::(device, &config, sp, sample_rate, ch)?, F32 => build_input_stream::(device, &config, sp, sample_rate, ch)?, F64 => build_input_stream::(device, &config, sp, sample_rate, ch)?, f => bail!("unsupported audio format: {:?}", f), }; stream.play()?; Ok(( Box::new(stream), Arc::new(create_format_msg(sample_rate, ch as _)), )) } fn build_input_stream( device: cpal::Device, config: &cpal::SupportedStreamConfig, sp: GenericService, sample_rate: u32, encode_channel: magnum_opus::Channels, ) -> ResultType where T: cpal::SizedSample + dasp::sample::ToSample, { let err_fn = move |err| { // too many UnknownErrno, will improve later log::trace!("an error occurred on stream: {}", err); }; let sample_rate_0 = config.sample_rate().0; log::debug!("Audio sample rate : {}", sample_rate); unsafe { AUDIO_ZERO_COUNT = 0; } let device_channel = config.channels(); let mut encoder = Encoder::new(sample_rate, encode_channel, LowDelay)?; // https://www.opus-codec.org/docs/html_api/group__opusencoder.html#gace941e4ef26ed844879fde342ffbe546 // https://chromium.googlesource.com/chromium/deps/opus/+/1.1.1/include/opus.h let frame_size = sample_rate as usize / 100; // 10 ms let encode_len = frame_size * encode_channel as usize; let rechannel_len = encode_len * device_channel as usize / encode_channel as usize; INPUT_BUFFER.lock().unwrap().clear(); let timeout = None; let stream_config = StreamConfig { channels: device_channel, sample_rate: config.sample_rate(), buffer_size: BufferSize::Default, }; let stream = device.build_input_stream( &stream_config, move |data: &[T], _: &InputCallbackInfo| { let buffer: Vec = data.iter().map(|s| T::to_sample(*s)).collect(); let mut lock = INPUT_BUFFER.lock().unwrap(); lock.extend(buffer); while lock.len() >= rechannel_len { let frame: Vec = lock.drain(0..rechannel_len).collect(); send( frame, sample_rate_0, sample_rate, device_channel, encode_channel as _, &mut encoder, &sp, ); } }, err_fn, timeout, )?; Ok(stream) } } fn create_format_msg(sample_rate: u32, channels: u16) -> Message { let format = AudioFormat { sample_rate, channels: channels as _, ..Default::default() }; let mut misc = Misc::new(); misc.set_audio_format(format); let mut msg = Message::new(); msg.set_misc(misc); msg } // use AUDIO_ZERO_COUNT for the Noise(Zero) Gate Attack Time // every audio data length is set to 480 // MAX_AUDIO_ZERO_COUNT=800 is similar as Gate Attack Time 3~5s(Linux) || 6~8s(Windows) const MAX_AUDIO_ZERO_COUNT: u16 = 800; static mut AUDIO_ZERO_COUNT: u16 = 0; fn send_f32(data: &[f32], encoder: &mut Encoder, sp: &GenericService) { if data.iter().filter(|x| **x != 0.).next().is_some() { unsafe { AUDIO_ZERO_COUNT = 0; } } else { unsafe { if AUDIO_ZERO_COUNT > MAX_AUDIO_ZERO_COUNT { if AUDIO_ZERO_COUNT == MAX_AUDIO_ZERO_COUNT + 1 { log::debug!("Audio Zero Gate Attack"); AUDIO_ZERO_COUNT += 1; } return; } AUDIO_ZERO_COUNT += 1; } } #[cfg(target_os = "android")] { // the permitted opus data size are 120, 240, 480, 960, 1920, and 2880 // if data size is bigger than BATCH_SIZE, AND is an integer multiple of BATCH_SIZE // then upload in batches const BATCH_SIZE: usize = 960; let input_size = data.len(); if input_size > BATCH_SIZE && input_size % BATCH_SIZE == 0 { let n = input_size / BATCH_SIZE; for i in 0..n { match encoder .encode_vec_float(&data[i * BATCH_SIZE..(i + 1) * BATCH_SIZE], BATCH_SIZE) { Ok(data) => { let mut msg_out = Message::new(); msg_out.set_audio_frame(AudioFrame { data: data.into(), ..Default::default() }); sp.send(msg_out); } Err(_) => {} } } } else { log::debug!("invalid audio data size:{} ", input_size); return; } } #[cfg(not(target_os = "android"))] match encoder.encode_vec_float(data, data.len() * 6) { Ok(data) => { let mut msg_out = Message::new(); msg_out.set_audio_frame(AudioFrame { data: data.into(), ..Default::default() }); sp.send(msg_out); } Err(_) => {} } }