Merge pull request #2481 from 21pages/upload_record

upload record
This commit is contained in:
RustDesk 2022-12-08 13:37:50 +08:00 committed by GitHub
commit 160a226977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 287 additions and 26 deletions

View File

@ -12,11 +12,10 @@ use hwcodec::mux::{MuxContext, Muxer};
use std::{ use std::{
fs::{File, OpenOptions}, fs::{File, OpenOptions},
io, io,
time::Instant,
};
use std::{
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
path::PathBuf, path::PathBuf,
sync::mpsc::Sender,
time::Instant,
}; };
use webm::mux::{self, Segment, Track, VideoTrack, Writer}; use webm::mux::{self, Segment, Track, VideoTrack, Writer};
@ -31,12 +30,14 @@ pub enum RecordCodecID {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RecorderContext { pub struct RecorderContext {
pub server: bool,
pub id: String, pub id: String,
pub default_dir: String, pub default_dir: String,
pub filename: String, pub filename: String,
pub width: usize, pub width: usize,
pub height: usize, pub height: usize,
pub codec_id: RecordCodecID, pub codec_id: RecordCodecID,
pub tx: Option<Sender<RecordState>>,
} }
impl RecorderContext { impl RecorderContext {
@ -52,7 +53,8 @@ impl RecorderContext {
std::fs::create_dir_all(&dir)?; std::fs::create_dir_all(&dir)?;
} }
} }
let file = self.id.clone() let file = if self.server { "s" } else { "c" }.to_string()
+ &self.id.clone()
+ &chrono::Local::now().format("_%Y%m%d%H%M%S").to_string() + &chrono::Local::now().format("_%Y%m%d%H%M%S").to_string()
+ if self.codec_id == RecordCodecID::VP9 { + if self.codec_id == RecordCodecID::VP9 {
".webm" ".webm"
@ -60,7 +62,7 @@ impl RecorderContext {
".mp4" ".mp4"
}; };
self.filename = PathBuf::from(&dir).join(file).to_string_lossy().to_string(); self.filename = PathBuf::from(&dir).join(file).to_string_lossy().to_string();
log::info!("video save to:{}", self.filename); log::info!("video will save to:{}", self.filename);
Ok(()) Ok(())
} }
} }
@ -75,6 +77,14 @@ pub trait RecorderApi {
fn write_video(&mut self, frame: &EncodedVideoFrame) -> bool; fn write_video(&mut self, frame: &EncodedVideoFrame) -> bool;
} }
#[derive(Debug)]
pub enum RecordState {
NewFile(String),
NewFrame,
WriteTail,
RemoveFile,
}
pub struct Recorder { pub struct Recorder {
pub inner: Box<dyn RecorderApi>, pub inner: Box<dyn RecorderApi>,
ctx: RecorderContext, ctx: RecorderContext,
@ -110,6 +120,7 @@ impl Recorder {
#[cfg(not(feature = "hwcodec"))] #[cfg(not(feature = "hwcodec"))]
_ => bail!("unsupported codec type"), _ => bail!("unsupported codec type"),
}; };
recorder.send_state(RecordState::NewFile(recorder.ctx.filename.clone()));
Ok(recorder) Ok(recorder)
} }
@ -123,6 +134,7 @@ impl Recorder {
_ => bail!("unsupported codec type"), _ => bail!("unsupported codec type"),
}; };
self.ctx = ctx; self.ctx = ctx;
self.send_state(RecordState::NewFile(self.ctx.filename.clone()));
Ok(()) Ok(())
} }
@ -171,8 +183,13 @@ impl Recorder {
} }
_ => bail!("unsupported frame type"), _ => bail!("unsupported frame type"),
} }
self.send_state(RecordState::NewFrame);
Ok(()) Ok(())
} }
fn send_state(&self, state: RecordState) {
self.ctx.tx.as_ref().map(|tx| tx.send(state));
}
} }
struct WebmRecorder { struct WebmRecorder {
@ -237,9 +254,12 @@ impl RecorderApi for WebmRecorder {
impl Drop for WebmRecorder { impl Drop for WebmRecorder {
fn drop(&mut self) { fn drop(&mut self) {
std::mem::replace(&mut self.webm, None).map_or(false, |webm| webm.finalize(None)); std::mem::replace(&mut self.webm, None).map_or(false, |webm| webm.finalize(None));
let mut state = RecordState::WriteTail;
if !self.written || self.start.elapsed().as_secs() < MIN_SECS { if !self.written || self.start.elapsed().as_secs() < MIN_SECS {
std::fs::remove_file(&self.ctx.filename).ok(); std::fs::remove_file(&self.ctx.filename).ok();
state = RecordState::RemoveFile;
} }
self.ctx.tx.as_ref().map(|tx| tx.send(state));
} }
} }
@ -292,8 +312,11 @@ impl RecorderApi for HwRecorder {
impl Drop for HwRecorder { impl Drop for HwRecorder {
fn drop(&mut self) { fn drop(&mut self) {
self.muxer.write_tail().ok(); self.muxer.write_tail().ok();
let mut state = RecordState::WriteTail;
if !self.written || self.start.elapsed().as_secs() < MIN_SECS { if !self.written || self.start.elapsed().as_secs() < MIN_SECS {
std::fs::remove_file(&self.ctx.filename).ok(); std::fs::remove_file(&self.ctx.filename).ok();
state = RecordState::RemoveFile;
} }
self.ctx.tx.as_ref().map(|tx| tx.send(state));
} }
} }

View File

@ -863,12 +863,14 @@ impl VideoHandler {
self.record = false; self.record = false;
if start { if start {
self.recorder = Recorder::new(RecorderContext { self.recorder = Recorder::new(RecorderContext {
server: false,
id, id,
default_dir: crate::ui_interface::default_video_save_directory(), default_dir: crate::ui_interface::default_video_save_directory(),
filename: "".to_owned(), filename: "".to_owned(),
width: w as _, width: w as _,
height: h as _, height: h as _,
codec_id: scrap::record::RecordCodecID::VP9, codec_id: scrap::record::RecordCodecID::VP9,
tx: None,
}) })
.map_or(Default::default(), |r| Arc::new(Mutex::new(Some(r)))); .map_or(Default::default(), |r| Arc::new(Mutex::new(Some(r))));
} else { } else {

View File

@ -216,8 +216,6 @@ pub fn core_main() -> Option<Vec<String>> {
if args.len() == 2 { if args.len() == 2 {
if crate::platform::is_root() { if crate::platform::is_root() {
crate::ipc::set_permanent_password(args[1].to_owned()).unwrap(); crate::ipc::set_permanent_password(args[1].to_owned()).unwrap();
} else {
log::info!("Permission denied!");
} }
} }
return None; return None;

View File

@ -4,6 +4,7 @@ use serde_json::{Map, Value};
#[cfg(feature = "flutter")] #[cfg(feature = "flutter")]
pub mod account; pub mod account;
pub mod record_upload;
#[derive(Debug)] #[derive(Debug)]
pub enum HbbHttpResponse<T> { pub enum HbbHttpResponse<T> {

View File

@ -0,0 +1,204 @@
use bytes::Bytes;
use hbb_common::{bail, config::Config, lazy_static, log, ResultType};
use reqwest::blocking::{Body, Client};
use scrap::record::RecordState;
use serde::Serialize;
use serde_json::Map;
use std::{
fs::File,
io::{prelude::*, SeekFrom},
sync::{mpsc::Receiver, Arc, Mutex},
time::{Duration, Instant},
};
const MAX_HEADER_LEN: usize = 1024;
const SHOULD_SEND_TIME: Duration = Duration::from_secs(1);
const SHOULD_SEND_SIZE: u64 = 1024 * 1024;
lazy_static::lazy_static! {
static ref ENABLE: Arc<Mutex<bool>> = Default::default();
}
pub fn is_enable() -> bool {
ENABLE.lock().unwrap().clone()
}
pub fn run(rx: Receiver<RecordState>) {
let mut uploader = RecordUploader {
client: Client::new(),
api_server: crate::get_api_server(
Config::get_option("api-server"),
Config::get_option("custom-rendezvous-server"),
),
filepath: Default::default(),
filename: Default::default(),
upload_size: Default::default(),
running: Default::default(),
last_send: Instant::now(),
};
std::thread::spawn(move || loop {
if let Err(e) = match rx.recv() {
Ok(state) => match state {
RecordState::NewFile(filepath) => uploader.handle_new_file(filepath),
RecordState::NewFrame => {
if uploader.running {
uploader.handle_frame(false)
} else {
Ok(())
}
}
RecordState::WriteTail => {
if uploader.running {
uploader.handle_tail()
} else {
Ok(())
}
}
RecordState::RemoveFile => {
if uploader.running {
uploader.handle_remove()
} else {
Ok(())
}
}
},
Err(e) => {
log::trace!("upload thread stop:{}", e);
break;
}
} {
uploader.running = false;
log::error!("upload stop:{}", e);
}
});
}
struct RecordUploader {
client: Client,
api_server: String,
filepath: String,
filename: String,
upload_size: u64,
running: bool,
last_send: Instant,
}
impl RecordUploader {
fn send<Q, B>(&self, query: &Q, body: B) -> ResultType<()>
where
Q: Serialize + ?Sized,
B: Into<Body>,
{
match self
.client
.post(format!("{}/api/record", self.api_server))
.query(query)
.body(body)
.send()
{
Ok(resp) => {
if let Ok(m) = resp.json::<Map<String, serde_json::Value>>() {
if let Some(e) = m.get("error") {
bail!(e.to_string());
}
}
Ok(())
}
Err(e) => bail!(e.to_string()),
}
}
fn handle_new_file(&mut self, filepath: String) -> ResultType<()> {
match std::path::PathBuf::from(&filepath).file_name() {
Some(filename) => match filename.to_owned().into_string() {
Ok(filename) => {
self.filename = filename.clone();
self.filepath = filepath.clone();
self.upload_size = 0;
self.running = true;
self.last_send = Instant::now();
self.send(&[("type", "new"), ("file", &filename)], Bytes::new())?;
Ok(())
}
Err(_) => bail!("can't parse filename:{:?}", filename),
},
None => bail!("can't parse filepath:{}", filepath),
}
}
fn handle_frame(&mut self, flush: bool) -> ResultType<()> {
if !flush && self.last_send.elapsed() < SHOULD_SEND_TIME {
return Ok(());
}
match File::open(&self.filepath) {
Ok(mut file) => match file.metadata() {
Ok(m) => {
let len = m.len();
if len <= self.upload_size {
return Ok(());
}
if !flush && len - self.upload_size < SHOULD_SEND_SIZE {
return Ok(());
}
let mut buf = Vec::new();
match file.seek(SeekFrom::Start(self.upload_size)) {
Ok(_) => match file.read_to_end(&mut buf) {
Ok(length) => {
self.send(
&[
("type", "part"),
("file", &self.filename),
("offset", &self.upload_size.to_string()),
("length", &length.to_string()),
],
buf,
)?;
self.upload_size = len;
self.last_send = Instant::now();
Ok(())
}
Err(e) => bail!(e.to_string()),
},
Err(e) => bail!(e.to_string()),
}
}
Err(e) => bail!(e.to_string()),
},
Err(e) => bail!(e.to_string()),
}
}
fn handle_tail(&mut self) -> ResultType<()> {
self.handle_frame(true)?;
match File::open(&self.filepath) {
Ok(mut file) => {
let mut buf = vec![0u8; MAX_HEADER_LEN];
match file.read(&mut buf) {
Ok(length) => {
buf.truncate(length);
self.send(
&[
("type", "tail"),
("file", &self.filename),
("offset", "0"),
("length", &length.to_string()),
],
buf,
)?;
log::info!("upload success, file:{}", self.filename);
Ok(())
}
Err(e) => bail!(e.to_string()),
}
}
Err(e) => bail!(e.to_string()),
}
}
fn handle_remove(&mut self) -> ResultType<()> {
self.send(
&[("type", "remove"), ("file", &self.filename)],
Bytes::new(),
)?;
Ok(())
}
}

View File

@ -44,7 +44,7 @@ const ADDR_CAPTURE_FRAME_COUNTER: usize = ADDR_CAPTURE_WOULDBLOCK + size_of::<i3
const ADDR_CAPTURE_FRAME: usize = const ADDR_CAPTURE_FRAME: usize =
(ADDR_CAPTURE_FRAME_COUNTER + SIZE_COUNTER + FRAME_ALIGN - 1) / FRAME_ALIGN * FRAME_ALIGN; (ADDR_CAPTURE_FRAME_COUNTER + SIZE_COUNTER + FRAME_ALIGN - 1) / FRAME_ALIGN * FRAME_ALIGN;
const IPC_PROFIX: &str = "_portable_service"; const IPC_SUFFIX: &str = "_portable_service";
pub const SHMEM_NAME: &str = "_portable_service"; pub const SHMEM_NAME: &str = "_portable_service";
const MAX_NACK: usize = 3; const MAX_NACK: usize = 3;
const MAX_DXGI_FAIL_TIME: usize = 5; const MAX_DXGI_FAIL_TIME: usize = 5;
@ -376,7 +376,7 @@ pub mod server {
async fn run_ipc_client() { async fn run_ipc_client() {
use DataPortableService::*; use DataPortableService::*;
let postfix = IPC_PROFIX; let postfix = IPC_SUFFIX;
match ipc::connect(1000, postfix).await { match ipc::connect(1000, postfix).await {
Ok(mut stream) => { Ok(mut stream) => {
@ -622,7 +622,7 @@ pub mod client {
async fn start_ipc_server_async(rx: mpsc::UnboundedReceiver<Data>) { async fn start_ipc_server_async(rx: mpsc::UnboundedReceiver<Data>) {
use DataPortableService::*; use DataPortableService::*;
let rx = Arc::new(tokio::sync::Mutex::new(rx)); let rx = Arc::new(tokio::sync::Mutex::new(rx));
let postfix = IPC_PROFIX; let postfix = IPC_SUFFIX;
#[cfg(feature = "flutter")] #[cfg(feature = "flutter")]
let quick_support = { let quick_support = {
let args: Vec<_> = std::env::args().collect(); let args: Vec<_> = std::env::args().collect();

View File

@ -481,22 +481,7 @@ fn run(sp: GenericService) -> ResultType<()> {
#[cfg(windows)] #[cfg(windows)]
log::info!("gdi: {}", c.is_gdi()); log::info!("gdi: {}", c.is_gdi());
let codec_name = Encoder::current_hw_encoder_name(); let codec_name = Encoder::current_hw_encoder_name();
#[cfg(not(target_os = "ios"))] let recorder = get_recorder(c.width, c.height, &codec_name);
let recorder = if !Config::get_option("allow-auto-record-incoming").is_empty() {
Recorder::new(RecorderContext {
id: "local".to_owned(),
default_dir: crate::ui_interface::default_video_save_directory(),
filename: "".to_owned(),
width: c.width,
height: c.height,
codec_id: scrap::record::RecordCodecID::VP9,
})
.map_or(Default::default(), |r| Arc::new(Mutex::new(Some(r))))
} else {
Default::default()
};
#[cfg(target_os = "ios")]
let recorder: Arc<Mutex<Option<Recorder>>> = Default::default();
#[cfg(windows)] #[cfg(windows)]
start_uac_elevation_check(); start_uac_elevation_check();
@ -673,6 +658,53 @@ fn run(sp: GenericService) -> ResultType<()> {
Ok(()) Ok(())
} }
fn get_recorder(
width: usize,
height: usize,
codec_name: &Option<String>,
) -> Arc<Mutex<Option<Recorder>>> {
#[cfg(not(target_os = "ios"))]
let recorder = if !Config::get_option("allow-auto-record-incoming").is_empty() {
use crate::hbbs_http::record_upload;
use scrap::record::RecordCodecID::*;
let tx = if record_upload::is_enable() {
let (tx, rx) = std::sync::mpsc::channel();
record_upload::run(rx);
Some(tx)
} else {
None
};
let codec_id = match codec_name {
Some(name) => {
if name.contains("264") {
H264
} else {
H265
}
}
None => VP9,
};
Recorder::new(RecorderContext {
server: true,
id: Config::get_id(),
default_dir: crate::ui_interface::default_video_save_directory(),
filename: "".to_owned(),
width,
height,
codec_id,
tx,
})
.map_or(Default::default(), |r| Arc::new(Mutex::new(Some(r))))
} else {
Default::default()
};
#[cfg(target_os = "ios")]
let recorder: Arc<Mutex<Option<Recorder>>> = Default::default();
recorder
}
fn check_privacy_mode_changed(sp: &GenericService, privacy_mode_id: i32) -> ResultType<()> { fn check_privacy_mode_changed(sp: &GenericService, privacy_mode_id: i32) -> ResultType<()> {
let privacy_mode_id_2 = *PRIVACY_MODE_CONN_ID.lock().unwrap(); let privacy_mode_id_2 = *PRIVACY_MODE_CONN_ID.lock().unwrap();
if privacy_mode_id != privacy_mode_id_2 { if privacy_mode_id != privacy_mode_id_2 {

View File

@ -685,6 +685,7 @@ pub fn discover() {
}); });
} }
#[cfg(feature = "flutter")]
pub fn peer_to_map(id: String, p: PeerConfig) -> HashMap<&'static str, String> { pub fn peer_to_map(id: String, p: PeerConfig) -> HashMap<&'static str, String> {
HashMap::<&str, String>::from_iter([ HashMap::<&str, String>::from_iter([
("id", id), ("id", id),