use std::{collections::HashMap, sync::atomic::Ordering}; #[cfg(not(windows))] use std::{fs::File, io::prelude::*}; use bytes::Bytes; use parity_tokio_ipc::{ Connection as Conn, ConnectionClient as ConnClient, Endpoint, Incoming, SecurityAttributes, }; use serde_derive::{Deserialize, Serialize}; #[cfg(not(any(target_os = "android", target_os = "ios")))] pub use clipboard::ClipbaordFile; use hbb_common::{ allow_err, bail, bytes, bytes_codec::BytesCodec, config::{self, Config, Config2}, futures::StreamExt as _, futures_util::sink::SinkExt, log, password_security as password, timeout, tokio, tokio::io::{AsyncRead, AsyncWrite}, tokio_util::codec::Framed, ResultType, }; use crate::rendezvous_mediator::RendezvousMediator; // State with timestamp, because std::time::Instant cannot be serialized #[derive(Debug, Serialize, Deserialize, Copy, Clone)] #[serde(tag = "t", content = "c")] pub enum PrivacyModeState { OffSucceeded, OffFailed, OffByPeer, OffUnknown, } #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "t", content = "c")] pub enum FS { ReadDir { dir: String, include_hidden: bool, }, RemoveDir { path: String, id: i32, recursive: bool, }, RemoveFile { path: String, id: i32, file_num: i32, }, CreateDir { path: String, id: i32, }, NewWrite { path: String, id: i32, file_num: i32, files: Vec<(String, u64)>, overwrite_detection: bool, }, CancelWrite { id: i32, }, WriteBlock { id: i32, file_num: i32, data: Bytes, compressed: bool, }, WriteDone { id: i32, file_num: i32, }, WriteOffset { id: i32, file_num: i32, offset_blk: u32, }, CheckDigest { id: i32, file_num: i32, file_size: u64, last_modified: u64, is_upload: bool, }, } #[cfg(not(any(target_os = "android", target_os = "ios")))] #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "t", content = "c")] pub enum DataKeyboard { Sequence(String), KeyDown(enigo::Key), KeyUp(enigo::Key), KeyClick(enigo::Key), GetKeyState(enigo::Key), } #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "t", content = "c")] pub enum DataKeyboardResponse { GetKeyState(bool), } #[cfg(not(any(target_os = "android", target_os = "ios", feature = "cli")))] #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "t", content = "c")] pub enum DataMouse { MoveTo(i32, i32), MoveRelative(i32, i32), Down(enigo::MouseButton), Up(enigo::MouseButton), Click(enigo::MouseButton), ScrollX(i32), ScrollY(i32), } #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "t", content = "c")] pub enum DataControl { Resolution { minx: i32, maxx: i32, miny: i32, maxy: i32, }, } #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "t", content = "c")] pub enum Data { Login { id: i32, is_file_transfer: bool, peer_id: String, name: String, authorized: bool, port_forward: String, keyboard: bool, clipboard: bool, audio: bool, file: bool, file_transfer_enabled: bool, restart: bool, recording: bool, }, ChatMessage { text: String, }, SwitchPermission { name: String, enabled: bool, }, SystemInfo(Option), ClickTime(i64), MouseMoveTime(i64), Authorize, Close, SAS, OnlineStatus(Option<(i64, bool)>), Config((String, Option)), Options(Option>), NatType(Option), ConfirmedKey(Option<(Vec, Vec)>), RawMessage(Vec), Socks(Option), FS(FS), Test, SyncConfig(Option<(Config, Config2)>), #[cfg(not(any(target_os = "android", target_os = "ios")))] ClipbaordFile(ClipbaordFile), ClipboardFileEnabled(bool), PrivacyModeState((i32, PrivacyModeState)), TestRendezvousServer, #[cfg(not(any(target_os = "android", target_os = "ios", feature = "cli")))] Keyboard(DataKeyboard), #[cfg(not(any(target_os = "android", target_os = "ios", feature = "cli")))] KeyboardResponse(DataKeyboardResponse), #[cfg(not(any(target_os = "android", target_os = "ios", feature = "cli")))] Mouse(DataMouse), Control(DataControl), Theme(String), Language(String), Empty, } #[tokio::main(flavor = "current_thread")] pub async fn start(postfix: &str) -> ResultType<()> { let mut incoming = new_listener(postfix).await?; loop { if let Some(result) = incoming.next().await { match result { Ok(stream) => { let mut stream = Connection::new(stream); let postfix = postfix.to_owned(); tokio::spawn(async move { loop { match stream.next().await { Err(err) => { log::trace!("ipc{} connection closed: {}", postfix, err); break; } Ok(Some(data)) => { handle(data, &mut stream).await; } _ => {} } } }); } Err(err) => { log::error!("Couldn't get client: {:?}", err); } } } } } pub async fn new_listener(postfix: &str) -> ResultType { let path = Config::ipc_path(postfix); #[cfg(not(windows))] check_pid(postfix).await; let mut endpoint = Endpoint::new(path.clone()); match SecurityAttributes::allow_everyone_create() { Ok(attr) => endpoint.set_security_attributes(attr), Err(err) => log::error!("Failed to set ipc{} security: {}", postfix, err), }; match endpoint.incoming() { Ok(incoming) => { log::info!("Started ipc{} server at path: {}", postfix, &path); #[cfg(not(windows))] { use std::os::unix::fs::PermissionsExt; std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o0777)).ok(); write_pid(postfix); } Ok(incoming) } Err(err) => { log::error!( "Failed to start ipc{} server at path {}: {}", postfix, path, err ); Err(err.into()) } } } pub struct CheckIfRestart(String, Vec, String); impl CheckIfRestart { pub fn new() -> CheckIfRestart { CheckIfRestart( Config::get_option("stop-service"), Config::get_rendezvous_servers(), Config::get_option("audio-input"), ) } } impl Drop for CheckIfRestart { fn drop(&mut self) { if self.0 != Config::get_option("stop-service") || self.1 != Config::get_rendezvous_servers() { RendezvousMediator::restart(); } if self.2 != Config::get_option("audio-input") { crate::audio_service::restart(); } } } async fn handle(data: Data, stream: &mut Connection) { match data { Data::SystemInfo(_) => { let info = format!( "log_path: {}, config: {}, username: {}", Config::log_path().to_str().unwrap_or(""), Config::file().to_str().unwrap_or(""), crate::username(), ); allow_err!(stream.send(&Data::SystemInfo(Some(info))).await); } Data::ClickTime(_) => { let t = crate::server::CLICK_TIME.load(Ordering::SeqCst); allow_err!(stream.send(&Data::ClickTime(t)).await); } Data::MouseMoveTime(_) => { let t = crate::server::MOUSE_MOVE_TIME.load(Ordering::SeqCst); allow_err!(stream.send(&Data::MouseMoveTime(t)).await); } Data::Close => { log::info!("Receive close message"); #[cfg(not(target_os = "android"))] crate::server::input_service::fix_key_down_timeout_at_exit(); std::process::exit(0); } Data::OnlineStatus(_) => { let x = config::ONLINE .lock() .unwrap() .values() .max() .unwrap_or(&0) .clone(); let confirmed = Config::get_key_confirmed(); allow_err!(stream.send(&Data::OnlineStatus(Some((x, confirmed)))).await); } Data::ConfirmedKey(None) => { let out = if Config::get_key_confirmed() { Some(Config::get_key_pair()) } else { None }; allow_err!(stream.send(&Data::ConfirmedKey(out)).await); } Data::Socks(s) => match s { None => { allow_err!(stream.send(&Data::Socks(Config::get_socks())).await); } Some(data) => { if data.proxy.is_empty() { Config::set_socks(None); } else { Config::set_socks(Some(data)); } crate::common::test_nat_type(); RendezvousMediator::restart(); log::info!("socks updated"); } }, Data::Config((name, value)) => match value { None => { let value; if name == "id" { value = Some(Config::get_id()); } else if name == "temporary-password" { value = Some(password::temporary_password()); } else if name == "permanent-password" { value = Some(Config::get_permanent_password()); } else if name == "salt" { value = Some(Config::get_salt()); } else if name == "rendezvous_server" { value = Some(format!( "{},{}", Config::get_rendezvous_server(), Config::get_rendezvous_servers().join(",") )); } else if name == "rendezvous_servers" { value = Some(Config::get_rendezvous_servers().join(",")); } else { value = None; } allow_err!(stream.send(&Data::Config((name, value))).await); } Some(value) => { if name == "id" { Config::set_key_confirmed(false); Config::set_id(&value); } else if name == "temporary-password" { password::update_temporary_password(); } else if name == "permanent-password" { Config::set_permanent_password(&value); } else if name == "salt" { Config::set_salt(&value); } else { return; } log::info!("{} updated", name); } }, Data::Options(value) => match value { None => { let v = Config::get_options(); allow_err!(stream.send(&Data::Options(Some(v))).await); } Some(value) => { let _chk = CheckIfRestart::new(); Config::set_options(value); allow_err!(stream.send(&Data::Options(None)).await); } }, Data::NatType(_) => { let t = Config::get_nat_type(); allow_err!(stream.send(&Data::NatType(Some(t))).await); } Data::SyncConfig(Some((config, config2))) => { let _chk = CheckIfRestart::new(); Config::set(config); Config2::set(config2); allow_err!(stream.send(&Data::SyncConfig(None)).await); } Data::SyncConfig(None) => { allow_err!( stream .send(&Data::SyncConfig(Some((Config::get(), Config2::get())))) .await ); } Data::TestRendezvousServer => { crate::test_rendezvous_server(); } _ => {} } } pub async fn connect(ms_timeout: u64, postfix: &str) -> ResultType> { let path = Config::ipc_path(postfix); let client = timeout(ms_timeout, Endpoint::connect(&path)).await??; Ok(ConnectionTmpl::new(client)) } #[cfg(target_os = "linux")] #[tokio::main(flavor = "current_thread")] pub async fn start_pa() { use crate::audio_service::AUDIO_DATA_SIZE_U8; match new_listener("_pa").await { Ok(mut incoming) => { loop { if let Some(result) = incoming.next().await { match result { Ok(stream) => { let mut stream = Connection::new(stream); let mut device: String = "".to_owned(); if let Some(Ok(Some(Data::Config((_, Some(x)))))) = stream.next_timeout2(1000).await { device = x; } if !device.is_empty() { device = crate::platform::linux::get_pa_source_name(&device); } if device.is_empty() { device = crate::platform::linux::get_pa_monitor(); } if device.is_empty() { continue; } let spec = pulse::sample::Spec { format: pulse::sample::Format::F32le, channels: 2, rate: crate::platform::PA_SAMPLE_RATE, }; log::info!("pa monitor: {:?}", device); // systemctl --user status pulseaudio.service let mut buf: Vec = vec![0; AUDIO_DATA_SIZE_U8]; match psimple::Simple::new( None, // Use the default server &crate::get_app_name(), // Our application’s name pulse::stream::Direction::Record, // We want a record stream Some(&device), // Use the default device "record", // Description of our stream &spec, // Our sample format None, // Use default channel map None, // Use default buffering attributes ) { Ok(s) => loop { if let Ok(_) = s.read(&mut buf) { let out = if buf.iter().filter(|x| **x != 0).next().is_none() { vec![] } else { buf.clone() }; if let Err(err) = stream.send_raw(out.into()).await { log::error!("Failed to send audio data:{}", err); break; } } }, Err(err) => { log::error!("Could not create simple pulse: {}", err); } } } Err(err) => { log::error!("Couldn't get pa client: {:?}", err); } } } } } Err(err) => { log::error!("Failed to start pa ipc server: {}", err); } } } #[inline] #[cfg(not(windows))] fn get_pid_file(postfix: &str) -> String { let path = Config::ipc_path(postfix); format!("{}.pid", path) } #[cfg(not(windows))] async fn check_pid(postfix: &str) { let pid_file = get_pid_file(postfix); if let Ok(mut file) = File::open(&pid_file) { let mut content = String::new(); file.read_to_string(&mut content).ok(); let pid = content.parse::().unwrap_or(0); if pid > 0 { use sysinfo::{ProcessExt, System, SystemExt}; let mut sys = System::new(); sys.refresh_processes(); if let Some(p) = sys.process(pid.into()) { if let Some(current) = sys.process((std::process::id() as i32).into()) { if current.name() == p.name() { // double check with connect if connect(1000, postfix).await.is_ok() { return; } } } } } } hbb_common::allow_err!(std::fs::remove_file(&Config::ipc_path(postfix))); } #[inline] #[cfg(not(windows))] fn write_pid(postfix: &str) { let path = get_pid_file(postfix); if let Ok(mut file) = File::create(&path) { use std::os::unix::fs::PermissionsExt; std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o0777)).ok(); file.write_all(&std::process::id().to_string().into_bytes()) .ok(); } } pub struct ConnectionTmpl { inner: Framed, } pub type Connection = ConnectionTmpl; impl ConnectionTmpl where T: AsyncRead + AsyncWrite + std::marker::Unpin, { pub fn new(conn: T) -> Self { Self { inner: Framed::new(conn, BytesCodec::new()), } } pub async fn send(&mut self, data: &Data) -> ResultType<()> { let v = serde_json::to_vec(data)?; self.inner.send(bytes::Bytes::from(v)).await?; Ok(()) } async fn send_config(&mut self, name: &str, value: String) -> ResultType<()> { self.send(&Data::Config((name.to_owned(), Some(value)))) .await } pub async fn next_timeout(&mut self, ms_timeout: u64) -> ResultType> { Ok(timeout(ms_timeout, self.next()).await??) } pub async fn next_timeout2(&mut self, ms_timeout: u64) -> Option>> { if let Ok(x) = timeout(ms_timeout, self.next()).await { Some(x) } else { None } } pub async fn next(&mut self) -> ResultType> { match self.inner.next().await { Some(res) => { let bytes = res?; if let Ok(s) = std::str::from_utf8(&bytes) { if let Ok(data) = serde_json::from_str::(s) { return Ok(Some(data)); } } return Ok(None); } _ => { bail!("reset by the peer"); } } } pub async fn send_raw(&mut self, data: Bytes) -> ResultType<()> { self.inner.send(data).await?; Ok(()) } pub async fn next_raw(&mut self) -> ResultType { match self.inner.next().await { Some(Ok(res)) => Ok(res), _ => { bail!("reset by the peer"); } } } } #[tokio::main(flavor = "current_thread")] async fn get_config(name: &str) -> ResultType> { get_config_async(name, 1_000).await } async fn get_config_async(name: &str, ms_timeout: u64) -> ResultType> { let mut c = connect(ms_timeout, "").await?; c.send(&Data::Config((name.to_owned(), None))).await?; if let Some(Data::Config((name2, value))) = c.next_timeout(ms_timeout).await? { if name == name2 { return Ok(value); } } return Ok(None); } pub async fn set_config_async(name: &str, value: String) -> ResultType<()> { let mut c = connect(1000, "").await?; c.send_config(name, value).await?; Ok(()) } #[tokio::main(flavor = "current_thread")] pub async fn set_config(name: &str, value: String) -> ResultType<()> { set_config_async(name, value).await } pub fn update_temporary_password() -> ResultType<()> { set_config("temporary-password", "".to_owned()) } pub fn get_permanent_password() -> String { if let Ok(Some(v)) = get_config("permanent-password") { Config::set_permanent_password(&v); v } else { Config::get_permanent_password() } } pub fn set_permanent_password(v: String) -> ResultType<()> { Config::set_permanent_password(&v); set_config("permanent-password", v) } pub fn get_id() -> String { if let Ok(Some(v)) = get_config("id") { // update salt also, so that next time reinstallation not causing first-time auto-login failure if let Ok(Some(v2)) = get_config("salt") { Config::set_salt(&v2); } if v != Config::get_id() { Config::set_key_confirmed(false); Config::set_id(&v); } v } else { Config::get_id() } } pub async fn get_rendezvous_server(ms_timeout: u64) -> (String, Vec) { if let Ok(Some(v)) = get_config_async("rendezvous_server", ms_timeout).await { let mut urls = v.split(","); let a = urls.next().unwrap_or_default().to_owned(); let b: Vec = urls.map(|x| x.to_owned()).collect(); (a, b) } else { ( Config::get_rendezvous_server(), Config::get_rendezvous_servers(), ) } } async fn get_options_(ms_timeout: u64) -> ResultType> { let mut c = connect(ms_timeout, "").await?; c.send(&Data::Options(None)).await?; if let Some(Data::Options(Some(value))) = c.next_timeout(ms_timeout).await? { Config::set_options(value.clone()); Ok(value) } else { Ok(Config::get_options()) } } pub async fn get_options_async() -> HashMap { get_options_(1000).await.unwrap_or(Config::get_options()) } #[tokio::main(flavor = "current_thread")] pub async fn get_options() -> HashMap { get_options_async().await } pub async fn get_option_async(key: &str) -> String { if let Some(v) = get_options_async().await.get(key) { v.clone() } else { "".to_owned() } } pub fn set_option(key: &str, value: &str) { let mut options = get_options(); if value.is_empty() { options.remove(key); } else { options.insert(key.to_owned(), value.to_owned()); } set_options(options).ok(); } #[tokio::main(flavor = "current_thread")] pub async fn set_options(value: HashMap) -> ResultType<()> { if let Ok(mut c) = connect(1000, "").await { c.send(&Data::Options(Some(value.clone()))).await?; // do not put below before connect, because we need to check should_exit c.next_timeout(1000).await.ok(); } Config::set_options(value); Ok(()) } #[inline] async fn get_nat_type_(ms_timeout: u64) -> ResultType { let mut c = connect(ms_timeout, "").await?; c.send(&Data::NatType(None)).await?; if let Some(Data::NatType(Some(value))) = c.next_timeout(ms_timeout).await? { Config::set_nat_type(value); Ok(value) } else { Ok(Config::get_nat_type()) } } pub async fn get_nat_type(ms_timeout: u64) -> i32 { get_nat_type_(ms_timeout) .await .unwrap_or(Config::get_nat_type()) } pub async fn get_rendezvous_servers(ms_timeout: u64) -> Vec { if let Ok(Some(v)) = get_config_async("rendezvous_servers", ms_timeout).await { return v.split(',').map(|x| x.to_owned()).collect(); } return Config::get_rendezvous_servers(); } #[inline] async fn get_socks_(ms_timeout: u64) -> ResultType> { let mut c = connect(ms_timeout, "").await?; c.send(&Data::Socks(None)).await?; if let Some(Data::Socks(value)) = c.next_timeout(ms_timeout).await? { Config::set_socks(value.clone()); Ok(value) } else { Ok(Config::get_socks()) } } pub async fn get_socks_async(ms_timeout: u64) -> Option { get_socks_(ms_timeout).await.unwrap_or(Config::get_socks()) } #[tokio::main(flavor = "current_thread")] pub async fn get_socks() -> Option { get_socks_async(1_000).await } #[tokio::main(flavor = "current_thread")] pub async fn set_socks(value: config::Socks5Server) -> ResultType<()> { Config::set_socks(if value.proxy.is_empty() { None } else { Some(value.clone()) }); connect(1_000, "") .await? .send(&Data::Socks(Some(value))) .await?; Ok(()) } #[tokio::main(flavor = "current_thread")] pub async fn test_rendezvous_server() -> ResultType<()> { let mut c = connect(1000, "").await?; c.send(&Data::TestRendezvousServer).await?; Ok(()) }