rustdesk/src/ipc.rs

620 lines
18 KiB
Rust
Raw Normal View History

2022-02-07 18:39:49 +08:00
use crate::rendezvous_mediator::RendezvousMediator;
2022-05-12 17:35:25 +08:00
#[cfg(not(any(target_os = "android", target_os = "ios")))]
pub use clipboard::ClipbaordFile;
use hbb_common::{
allow_err, bail, bytes,
bytes_codec::BytesCodec,
2022-04-26 11:19:45 +08:00
config::{self, Config, Config2},
futures::StreamExt as _,
futures_util::sink::SinkExt,
log, timeout, tokio,
tokio::io::{AsyncRead, AsyncWrite},
tokio_util::codec::Framed,
ResultType,
};
2021-06-26 01:14:22 +08:00
use parity_tokio_ipc::{
Connection as Conn, ConnectionClient as ConnClient, Endpoint, Incoming, SecurityAttributes,
};
2021-03-29 15:59:14 +08:00
use serde_derive::{Deserialize, Serialize};
2022-05-12 17:35:25 +08:00
use std::{collections::HashMap, sync::atomic::Ordering};
2021-03-29 15:59:14 +08:00
#[cfg(not(windows))]
use std::{fs::File, io::prelude::*};
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "t", content = "c")]
pub enum FS {
ReadDir {
dir: String,
include_hidden: bool,
},
RemoveDir {
path: String,
id: i32,
recursive: bool,
},
RemoveFile {
path: String,
id: i32,
file_num: i32,
},
CreateDir {
path: String,
id: i32,
},
NewWrite {
path: String,
id: i32,
files: Vec<(String, u64)>,
},
CancelWrite {
id: i32,
},
WriteBlock {
id: i32,
file_num: i32,
data: Vec<u8>,
compressed: bool,
},
WriteDone {
id: i32,
file_num: i32,
},
2022-04-27 10:45:20 +08:00
CheckDigest {
id: i32,
file_num: i32,
file_size: u64,
modified_time: u64,
},
2021-03-29 15:59:14 +08:00
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "t", content = "c")]
pub enum Data {
Login {
id: i32,
is_file_transfer: bool,
peer_id: String,
name: String,
authorized: bool,
port_forward: String,
keyboard: bool,
clipboard: bool,
audio: bool,
file: bool,
file_transfer_enabled: bool,
2021-03-29 15:59:14 +08:00
},
ChatMessage {
text: String,
},
SwitchPermission {
name: String,
enabled: bool,
},
SystemInfo(Option<String>),
2022-05-12 17:35:25 +08:00
ClickTime(i64),
MouseMoveTime(i64),
2021-03-29 15:59:14 +08:00
Authorize,
Close,
SAS,
OnlineStatus(Option<(i64, bool)>),
Config((String, Option<String>)),
Options(Option<HashMap<String, String>>),
NatType(Option<i32>),
ConfirmedKey(Option<(Vec<u8>, Vec<u8>)>),
RawMessage(Vec<u8>),
2022-01-05 18:34:30 +08:00
Socks(Option<config::Socks5Server>),
2021-03-29 15:59:14 +08:00
FS(FS),
Test,
2022-04-26 11:19:45 +08:00
SyncConfig(Option<(Config, Config2)>),
#[cfg(not(any(target_os = "android", target_os = "ios")))]
ClipbaordFile(ClipbaordFile),
ClipboardFileEnabled(bool),
2021-03-29 15:59:14 +08:00
}
2021-06-25 19:42:51 +08:00
#[tokio::main(flavor = "current_thread")]
2021-03-29 15:59:14 +08:00
pub async fn start(postfix: &str) -> ResultType<()> {
let mut incoming = new_listener(postfix).await?;
loop {
if let Some(result) = incoming.next().await {
match result {
Ok(stream) => {
let mut stream = Connection::new(stream);
let postfix = postfix.to_owned();
tokio::spawn(async move {
loop {
match stream.next().await {
Err(err) => {
log::trace!("ipc{} connection closed: {}", postfix, err);
break;
}
Ok(Some(data)) => {
handle(data, &mut stream).await;
}
_ => {}
}
}
});
}
Err(err) => {
log::error!("Couldn't get client: {:?}", err);
}
}
}
}
}
pub async fn new_listener(postfix: &str) -> ResultType<Incoming> {
let path = Config::ipc_path(postfix);
#[cfg(not(windows))]
check_pid(postfix).await;
2021-03-29 15:59:14 +08:00
let mut endpoint = Endpoint::new(path.clone());
match SecurityAttributes::allow_everyone_create() {
Ok(attr) => endpoint.set_security_attributes(attr),
Err(err) => log::error!("Failed to set ipc{} security: {}", postfix, err),
};
match endpoint.incoming() {
Ok(incoming) => {
log::info!("Started ipc{} server at path: {}", postfix, &path);
#[cfg(not(windows))]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o0777)).ok();
write_pid(postfix);
}
2021-03-29 15:59:14 +08:00
Ok(incoming)
}
Err(err) => {
log::error!(
2021-11-14 23:22:05 +08:00
"Failed to start ipc{} server at path {}: {}",
2021-03-29 15:59:14 +08:00
postfix,
path,
err
);
Err(err.into())
}
}
}
2022-04-26 11:19:45 +08:00
pub struct CheckIfRestart(String, Vec<String>, String);
impl CheckIfRestart {
pub fn new() -> CheckIfRestart {
CheckIfRestart(
Config::get_option("stop-service"),
Config::get_rendezvous_servers(),
Config::get_option("audio-input"),
)
}
}
impl Drop for CheckIfRestart {
fn drop(&mut self) {
if self.0 != Config::get_option("stop-service")
|| self.1 != Config::get_rendezvous_servers()
{
RendezvousMediator::restart();
}
if self.2 != Config::get_option("audio-input") {
crate::audio_service::restart();
}
}
}
2021-03-29 15:59:14 +08:00
async fn handle(data: Data, stream: &mut Connection) {
match data {
Data::SystemInfo(_) => {
let info = format!(
"log_path: {}, config: {}, username: {}",
Config::log_path().to_str().unwrap_or(""),
Config::file().to_str().unwrap_or(""),
crate::username(),
);
allow_err!(stream.send(&Data::SystemInfo(Some(info))).await);
}
2022-05-12 17:35:25 +08:00
Data::ClickTime(_) => {
let t = crate::server::CLICK_TIME.load(Ordering::SeqCst);
allow_err!(stream.send(&Data::ClickTime(t)).await);
}
Data::MouseMoveTime(_) => {
let t = crate::server::MOUSE_MOVE_TIME.load(Ordering::SeqCst);
allow_err!(stream.send(&Data::MouseMoveTime(t)).await);
}
2021-03-29 15:59:14 +08:00
Data::Close => {
log::info!("Receive close message");
2022-05-12 17:35:25 +08:00
#[cfg(not(target_os = "android"))]
2021-08-09 04:21:42 +08:00
crate::server::input_service::fix_key_down_timeout_at_exit();
2021-03-29 15:59:14 +08:00
std::process::exit(0);
}
Data::OnlineStatus(_) => {
let x = config::ONLINE
.lock()
.unwrap()
.values()
.max()
.unwrap_or(&0)
.clone();
let confirmed = Config::get_key_confirmed();
allow_err!(stream.send(&Data::OnlineStatus(Some((x, confirmed)))).await);
}
Data::ConfirmedKey(None) => {
let out = if Config::get_key_confirmed() {
Some(Config::get_key_pair())
} else {
None
};
allow_err!(stream.send(&Data::ConfirmedKey(out)).await);
}
2022-01-05 18:34:30 +08:00
Data::Socks(s) => match s {
None => {
allow_err!(stream.send(&Data::Socks(Config::get_socks())).await);
}
Some(data) => {
if data.proxy.is_empty() {
Config::set_socks(None);
} else {
Config::set_socks(Some(data));
}
2022-01-24 03:15:01 +08:00
crate::common::test_nat_type();
2022-02-07 18:39:49 +08:00
RendezvousMediator::restart();
2022-01-05 18:34:30 +08:00
log::info!("socks updated");
}
},
2021-03-29 15:59:14 +08:00
Data::Config((name, value)) => match value {
None => {
let value;
if name == "id" {
value = Some(Config::get_id());
} else if name == "password" {
value = Some(Config::get_password());
} else if name == "salt" {
value = Some(Config::get_salt());
} else if name == "rendezvous_server" {
2022-01-05 13:21:14 +08:00
value = Some(Config::get_rendezvous_server());
} else if name == "rendezvous_servers" {
value = Some(Config::get_rendezvous_servers().join(","));
2021-03-29 15:59:14 +08:00
} else {
value = None;
}
allow_err!(stream.send(&Data::Config((name, value))).await);
}
Some(value) => {
if name == "id" {
Config::set_key_confirmed(false);
2021-03-29 15:59:14 +08:00
Config::set_id(&value);
} else if name == "password" {
Config::set_password(&value);
} else if name == "salt" {
Config::set_salt(&value);
} else {
return;
}
log::info!("{} updated", name);
}
},
Data::Options(value) => match value {
None => {
let v = Config::get_options();
allow_err!(stream.send(&Data::Options(Some(v))).await);
}
Some(value) => {
2022-05-04 20:39:07 +08:00
let _chk = CheckIfRestart::new();
2021-03-29 15:59:14 +08:00
Config::set_options(value);
2022-02-08 18:09:45 +08:00
allow_err!(stream.send(&Data::Options(None)).await);
2021-03-29 15:59:14 +08:00
}
},
Data::NatType(_) => {
let t = Config::get_nat_type();
allow_err!(stream.send(&Data::NatType(Some(t))).await);
}
2022-04-26 11:19:45 +08:00
Data::SyncConfig(Some((config, config2))) => {
2022-05-04 20:39:07 +08:00
let _chk = CheckIfRestart::new();
2022-04-26 11:19:45 +08:00
Config::set(config);
Config2::set(config2);
allow_err!(stream.send(&Data::SyncConfig(None)).await);
2022-01-13 15:26:57 +08:00
}
2022-04-26 11:19:45 +08:00
Data::SyncConfig(None) => {
2022-01-13 15:26:57 +08:00
allow_err!(
stream
2022-04-26 11:19:45 +08:00
.send(&Data::SyncConfig(Some((Config::get(), Config2::get()))))
2022-01-13 15:26:57 +08:00
.await
);
}
2021-03-29 15:59:14 +08:00
_ => {}
}
}
2021-06-26 01:14:22 +08:00
pub async fn connect(ms_timeout: u64, postfix: &str) -> ResultType<ConnectionTmpl<ConnClient>> {
2021-03-29 15:59:14 +08:00
let path = Config::ipc_path(postfix);
let client = timeout(ms_timeout, Endpoint::connect(&path)).await??;
2021-06-26 01:14:22 +08:00
Ok(ConnectionTmpl::new(client))
2021-03-29 15:59:14 +08:00
}
#[inline]
#[cfg(not(windows))]
fn get_pid_file(postfix: &str) -> String {
let path = Config::ipc_path(postfix);
format!("{}.pid", path)
}
#[cfg(not(windows))]
async fn check_pid(postfix: &str) {
let pid_file = get_pid_file(postfix);
if let Ok(mut file) = File::open(&pid_file) {
let mut content = String::new();
file.read_to_string(&mut content).ok();
let pid = content.parse::<i32>().unwrap_or(0);
if pid > 0 {
2022-01-15 14:08:24 +08:00
use sysinfo::{ProcessExt, System, SystemExt};
let mut sys = System::new();
sys.refresh_processes();
if let Some(p) = sys.process(pid.into()) {
if let Some(current) = sys.process((std::process::id() as i32).into()) {
2022-01-15 13:47:57 +08:00
if current.name() == p.name() {
2021-03-29 15:59:14 +08:00
// double check with connect
if connect(1000, postfix).await.is_ok() {
return;
}
}
}
}
}
}
hbb_common::allow_err!(std::fs::remove_file(&Config::ipc_path(postfix)));
}
#[inline]
#[cfg(not(windows))]
fn write_pid(postfix: &str) {
let path = get_pid_file(postfix);
if let Ok(mut file) = File::create(&path) {
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o0777)).ok();
file.write_all(&std::process::id().to_string().into_bytes())
.ok();
}
}
2021-06-26 01:14:22 +08:00
pub struct ConnectionTmpl<T> {
inner: Framed<T, BytesCodec>,
2021-03-29 15:59:14 +08:00
}
2021-06-26 01:14:22 +08:00
pub type Connection = ConnectionTmpl<Conn>;
impl<T> ConnectionTmpl<T>
where
T: AsyncRead + AsyncWrite + std::marker::Unpin,
2021-06-26 01:14:22 +08:00
{
pub fn new(conn: T) -> Self {
2021-03-29 15:59:14 +08:00
Self {
inner: Framed::new(conn, BytesCodec::new()),
}
}
pub async fn send(&mut self, data: &Data) -> ResultType<()> {
let v = serde_json::to_vec(data)?;
self.inner.send(bytes::Bytes::from(v)).await?;
Ok(())
}
async fn send_config(&mut self, name: &str, value: String) -> ResultType<()> {
self.send(&Data::Config((name.to_owned(), Some(value))))
.await
}
pub async fn next_timeout(&mut self, ms_timeout: u64) -> ResultType<Option<Data>> {
Ok(timeout(ms_timeout, self.next()).await??)
}
pub async fn next_timeout2(&mut self, ms_timeout: u64) -> Option<ResultType<Option<Data>>> {
if let Ok(x) = timeout(ms_timeout, self.next()).await {
Some(x)
} else {
None
}
}
pub async fn next(&mut self) -> ResultType<Option<Data>> {
match self.inner.next().await {
Some(res) => {
let bytes = res?;
if let Ok(s) = std::str::from_utf8(&bytes) {
if let Ok(data) = serde_json::from_str::<Data>(s) {
return Ok(Some(data));
}
}
return Ok(None);
}
_ => {
bail!("reset by the peer");
}
}
}
pub async fn send_raw(&mut self, data: Vec<u8>) -> ResultType<()> {
self.inner.send(bytes::Bytes::from(data)).await?;
Ok(())
}
pub async fn next_raw(&mut self) -> ResultType<bytes::BytesMut> {
match self.inner.next().await {
Some(Ok(res)) => Ok(res),
_ => {
bail!("reset by the peer");
}
}
}
2021-03-29 15:59:14 +08:00
}
2021-06-25 19:42:51 +08:00
#[tokio::main(flavor = "current_thread")]
2021-03-29 15:59:14 +08:00
async fn get_config(name: &str) -> ResultType<Option<String>> {
get_config_async(name, 1_000).await
}
async fn get_config_async(name: &str, ms_timeout: u64) -> ResultType<Option<String>> {
let mut c = connect(ms_timeout, "").await?;
c.send(&Data::Config((name.to_owned(), None))).await?;
if let Some(Data::Config((name2, value))) = c.next_timeout(ms_timeout).await? {
if name == name2 {
return Ok(value);
}
}
return Ok(None);
}
2022-05-12 17:35:25 +08:00
pub async fn set_config_async(name: &str, value: String) -> ResultType<()> {
2021-03-29 15:59:14 +08:00
let mut c = connect(1000, "").await?;
c.send_config(name, value).await?;
Ok(())
}
2022-05-12 17:35:25 +08:00
#[tokio::main(flavor = "current_thread")]
pub async fn set_config(name: &str, value: String) -> ResultType<()> {
set_config_async(name, value).await
}
2021-03-29 15:59:14 +08:00
pub fn set_password(v: String) -> ResultType<()> {
Config::set_password(&v);
set_config("password", v)
}
pub fn get_id() -> String {
if let Ok(Some(v)) = get_config("id") {
2021-11-14 23:22:05 +08:00
// update salt also, so that next time reinstallation not causing first-time auto-login failure
2021-03-29 15:59:14 +08:00
if let Ok(Some(v2)) = get_config("salt") {
Config::set_salt(&v2);
}
if v != Config::get_id() {
Config::set_key_confirmed(false);
Config::set_id(&v);
}
v
} else {
Config::get_id()
}
}
2021-03-29 15:59:14 +08:00
pub fn get_password() -> String {
if let Ok(Some(v)) = get_config("password") {
Config::set_password(&v);
v
} else {
Config::get_password()
}
}
2022-01-05 13:21:14 +08:00
pub async fn get_rendezvous_server(ms_timeout: u64) -> String {
2021-03-29 15:59:14 +08:00
if let Ok(Some(v)) = get_config_async("rendezvous_server", ms_timeout).await {
2022-01-05 13:21:14 +08:00
v
} else {
Config::get_rendezvous_server()
2021-03-29 15:59:14 +08:00
}
}
async fn get_options_(ms_timeout: u64) -> ResultType<HashMap<String, String>> {
let mut c = connect(ms_timeout, "").await?;
c.send(&Data::Options(None)).await?;
if let Some(Data::Options(Some(value))) = c.next_timeout(ms_timeout).await? {
Config::set_options(value.clone());
Ok(value)
} else {
Ok(Config::get_options())
}
}
2022-05-12 17:35:25 +08:00
pub async fn get_options_async() -> HashMap<String, String> {
get_options_(1000).await.unwrap_or(Config::get_options())
}
2021-06-25 19:42:51 +08:00
#[tokio::main(flavor = "current_thread")]
2021-03-29 15:59:14 +08:00
pub async fn get_options() -> HashMap<String, String> {
2022-05-12 17:35:25 +08:00
get_options_async().await
2021-03-29 15:59:14 +08:00
}
2022-05-12 17:35:25 +08:00
pub async fn get_option_async(key: &str) -> String {
if let Some(v) = get_options_async().await.get(key) {
2021-03-29 15:59:14 +08:00
v.clone()
} else {
"".to_owned()
}
}
pub fn set_option(key: &str, value: &str) {
let mut options = get_options();
if value.is_empty() {
options.remove(key);
} else {
options.insert(key.to_owned(), value.to_owned());
}
set_options(options).ok();
}
2021-06-25 19:42:51 +08:00
#[tokio::main(flavor = "current_thread")]
2021-03-29 15:59:14 +08:00
pub async fn set_options(value: HashMap<String, String>) -> ResultType<()> {
2022-03-21 00:53:35 +08:00
if let Ok(mut c) = connect(1000, "").await {
c.send(&Data::Options(Some(value.clone()))).await?;
// do not put below before connect, because we need to check should_exit
c.next_timeout(1000).await.ok();
}
2022-02-08 18:09:45 +08:00
Config::set_options(value);
2021-03-29 15:59:14 +08:00
Ok(())
}
#[inline]
async fn get_nat_type_(ms_timeout: u64) -> ResultType<i32> {
let mut c = connect(ms_timeout, "").await?;
c.send(&Data::NatType(None)).await?;
if let Some(Data::NatType(Some(value))) = c.next_timeout(ms_timeout).await? {
Config::set_nat_type(value);
Ok(value)
} else {
Ok(Config::get_nat_type())
}
}
pub async fn get_nat_type(ms_timeout: u64) -> i32 {
get_nat_type_(ms_timeout)
.await
.unwrap_or(Config::get_nat_type())
}
2022-05-12 17:35:25 +08:00
pub async fn get_rendezvous_servers(ms_timeout: u64) -> Vec<String> {
if let Ok(Some(v)) = get_config_async("rendezvous_servers", ms_timeout).await {
return v.split(',').map(|x| x.to_owned()).collect();
}
return Config::get_rendezvous_servers();
}
2022-01-05 18:34:30 +08:00
#[inline]
async fn get_socks_(ms_timeout: u64) -> ResultType<Option<config::Socks5Server>> {
let mut c = connect(ms_timeout, "").await?;
c.send(&Data::Socks(None)).await?;
if let Some(Data::Socks(value)) = c.next_timeout(ms_timeout).await? {
Config::set_socks(value.clone());
Ok(value)
} else {
Ok(Config::get_socks())
}
}
pub async fn get_socks_async(ms_timeout: u64) -> Option<config::Socks5Server> {
get_socks_(ms_timeout).await.unwrap_or(Config::get_socks())
}
#[tokio::main(flavor = "current_thread")]
pub async fn get_socks() -> Option<config::Socks5Server> {
get_socks_async(1_000).await
}
#[tokio::main(flavor = "current_thread")]
pub async fn set_socks(value: config::Socks5Server) -> ResultType<()> {
Config::set_socks(if value.proxy.is_empty() {
None
} else {
Some(value.clone())
});
connect(1_000, "")
.await?
.send(&Data::Socks(Some(value)))
.await?;
Ok(())
}