vdi: new message loop

This commit is contained in:
rustdesk 2023-03-22 16:50:59 +08:00
parent bbc6c98775
commit 3d5e8e0276
5 changed files with 88 additions and 127 deletions

1
vdi/host/Cargo.lock generated
View File

@ -1528,7 +1528,6 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"clap", "clap",
"derivative",
"hbb_common", "hbb_common",
"image", "image",
"qemu-display", "qemu-display",

View File

@ -9,6 +9,5 @@ qemu-display = { git = "https://github.com/rustdesk/qemu-display" }
hbb_common = { path = "../../libs/hbb_common" } hbb_common = { path = "../../libs/hbb_common" }
clap = { version = "4.1", features = ["derive"] } clap = { version = "4.1", features = ["derive"] }
zbus = { version = "3.0" } zbus = { version = "3.0" }
derivative = "2.2"
image = "0.24" image = "0.24"
async-trait = "0.1" async-trait = "0.1"

View File

@ -1 +1,11 @@
use hbb_common::{message_proto::*, tokio, ResultType}; use hbb_common::{message_proto::*, tokio, ResultType};
pub use tokio::sync::{mpsc, Mutex};
pub struct Connection {
pub tx: mpsc::UnboundedSender<Message>,
}
impl Connection {
pub async fn on_message(&mut self, message: Message) -> ResultType<bool> {
Ok(true)
}
}

View File

@ -1,7 +1,7 @@
use hbb_common::{log, tokio, ResultType}; use hbb_common::{tokio, ResultType};
use image::GenericImage; use image::GenericImage;
use qemu_display::{Console, ConsoleListenerHandler, MouseButton}; use qemu_display::{Console, ConsoleListenerHandler, MouseButton};
use std::{collections::HashSet, sync::Arc, time}; use std::{collections::HashSet, sync::Arc};
pub use tokio::sync::{mpsc, Mutex}; pub use tokio::sync::{mpsc, Mutex};
#[derive(Debug)] #[derive(Debug)]
@ -54,91 +54,17 @@ impl ConsoleListenerHandler for ConsoleListener {
} }
fn disconnected(&mut self) { fn disconnected(&mut self) {
dbg!(); self.tx.send(Event::Disconnected).ok();
} }
} }
#[derive(derivative::Derivative)] pub async fn key_event(console: &mut Console, qnum: u32, down: bool) -> ResultType<()> {
#[derivative(Debug)] if down {
pub struct Client { console.keyboard.press(qnum).await?;
#[derivative(Debug = "ignore")] } else {
console: Arc<Mutex<Console>>, console.keyboard.release(qnum).await?;
last_update: Option<time::Instant>,
has_update: bool,
req_update: bool,
last_buttons: HashSet<MouseButton>,
dimensions: (u16, u16),
image: Arc<Mutex<BgraImage>>,
}
impl Client {
pub fn new(console: Arc<Mutex<Console>>, image: Arc<Mutex<BgraImage>>) -> Self {
Self {
console,
image,
last_update: None,
has_update: false,
req_update: false,
last_buttons: HashSet::new(),
dimensions: (0, 0),
}
}
pub fn update_pending(&self) -> bool {
self.has_update && self.req_update
}
pub async fn key_event(&self, qnum: u32, down: bool) -> ResultType<()> {
let console = self.console.lock().await;
if down {
console.keyboard.press(qnum).await?;
} else {
console.keyboard.release(qnum).await?;
}
Ok(())
}
pub async fn desktop_resize(&mut self) -> ResultType<()> {
let image = self.image.lock().await;
let (width, height) = (image.width() as _, image.height() as _);
if (width, height) == self.dimensions {
return Ok(());
}
self.dimensions = (width, height);
Ok(())
}
pub async fn send_framebuffer_update(&mut self) -> ResultType<()> {
self.desktop_resize().await?;
if self.has_update && self.req_update {
if let Some(last_update) = self.last_update {
if last_update.elapsed().as_millis() < 10 {
log::info!("TODO: <10ms, could delay update..")
}
}
// self.server.send_framebuffer_update(&self.vnc_server)?;
self.last_update = Some(time::Instant::now());
self.has_update = false;
self.req_update = false;
}
Ok(())
}
pub async fn handle_event(&mut self, event: Option<Event>) -> ResultType<bool> {
match event {
Some(Event::ConsoleUpdate(_)) => {
self.has_update = true;
}
Some(Event::Disconnected) => {
return Ok(false);
}
None => {
self.send_framebuffer_update().await?;
}
}
Ok(true)
} }
Ok(())
} }
fn image_from_vec(format: u32, width: u32, height: u32, stride: u32, data: Vec<u8>) -> BgraImage { fn image_from_vec(format: u32, width: u32, height: u32, stride: u32, data: Vec<u8>) -> BgraImage {

View File

@ -1,13 +1,18 @@
use clap::Parser; use clap::Parser;
use hbb_common::{anyhow::Context, log, tokio, ResultType}; use hbb_common::{
use qemu_display::{Console, VMProxy}; allow_err,
use std::{ anyhow::{bail, Context},
borrow::Borrow, log,
net::{TcpListener, TcpStream}, message_proto::*,
sync::Arc, protobuf::Message as _,
thread, tokio,
tokio::net::TcpListener,
ResultType, Stream,
}; };
use qemu_display::{Console, VMProxy};
use std::{borrow::Borrow, sync::Arc};
use crate::connection::*;
use crate::console::*; use crate::console::*;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@ -37,8 +42,10 @@ struct Cli {
#[derive(Debug)] #[derive(Debug)]
struct Server { struct Server {
vm_name: String, vm_name: String,
rx: mpsc::UnboundedReceiver<Event>, rx_console: mpsc::UnboundedReceiver<Event>,
tx: mpsc::UnboundedSender<Event>, tx_console: mpsc::UnboundedSender<Event>,
rx_conn: mpsc::UnboundedReceiver<Message>,
tx_conn: mpsc::UnboundedSender<Message>,
image: Arc<Mutex<BgraImage>>, image: Arc<Mutex<BgraImage>>,
console: Arc<Mutex<Console>>, console: Arc<Mutex<Console>>,
} }
@ -48,12 +55,15 @@ impl Server {
let width = console.width().await?; let width = console.width().await?;
let height = console.height().await?; let height = console.height().await?;
let image = BgraImage::new(width as _, height as _); let image = BgraImage::new(width as _, height as _);
let (tx, rx) = mpsc::unbounded_channel(); let (tx_console, rx_console) = mpsc::unbounded_channel();
let (tx_conn, rx_conn) = mpsc::unbounded_channel();
Ok(Self { Ok(Self {
vm_name, vm_name,
rx, rx_console,
tx_console,
rx_conn,
tx_conn,
image: Arc::new(Mutex::new(image)), image: Arc::new(Mutex::new(image)),
tx,
console: Arc::new(Mutex::new(console)), console: Arc::new(Mutex::new(console)),
}) })
} }
@ -69,7 +79,7 @@ impl Server {
.await .await
.register_listener(ConsoleListener { .register_listener(ConsoleListener {
image: self.image.clone(), image: self.image.clone(),
tx: self.tx.clone(), tx: self.tx_console.clone(),
}) })
.await?; .await?;
Ok(()) Ok(())
@ -80,33 +90,47 @@ impl Server {
(image.width() as u16, image.height() as u16) (image.width() as u16, image.height() as u16)
} }
async fn handle_connection(&mut self, stream: TcpStream) -> ResultType<()> { async fn handle_connection(&mut self, stream: Stream) -> ResultType<()> {
let (width, height) = self.dimensions().await; let mut stream = stream;
let tx = self.tx.clone();
let _client_thread = thread::spawn(move || loop {});
let mut client = Client::new(self.console.clone(), self.image.clone());
self.run_console().await?; self.run_console().await?;
let mut conn = Connection {
tx: self.tx_conn.clone(),
};
loop { loop {
let ev = if client.update_pending() { tokio::select! {
match self.rx.try_recv() { Some(evt) = self.rx_console.recv() => {
Ok(e) => Some(e), match evt {
Err(mpsc::error::TryRecvError::Empty) => None, _ => {}
Err(e) => { }
return Err(e.into()); }
Some(msg) = self.rx_conn.recv() => {
allow_err!(stream.send(&msg).await);
}
res = stream.next() => {
if let Some(res) = res {
match res {
Err(err) => {
bail!(err);
}
Ok(bytes) => {
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
match conn.on_message(msg_in).await {
Ok(false) => {
break;
}
Err(err) => {
log::error!("{err}");
}
_ => {}
}
}
}
}
} else {
bail!("Reset by the peer");
} }
} }
} else {
Some(
self.rx
.recv()
.await
.context("Channel closed unexpectedly")?,
)
};
if !client.handle_event(ev).await? {
break;
} }
} }
@ -119,7 +143,9 @@ impl Server {
pub async fn run() -> ResultType<()> { pub async fn run() -> ResultType<()> {
let args = Cli::parse(); let args = Cli::parse();
let listener = TcpListener::bind::<std::net::SocketAddr>(args.address.into()).unwrap(); let listener = TcpListener::bind::<std::net::SocketAddr>(args.address.into())
.await
.unwrap();
let dbus = if let Some(addr) = args.dbus_address { let dbus = if let Some(addr) = args.dbus_address {
zbus::ConnectionBuilder::address(addr.borrow())? zbus::ConnectionBuilder::address(addr.borrow())?
.build() .build()
@ -134,12 +160,13 @@ pub async fn run() -> ResultType<()> {
.await .await
.context("Failed to get the console")?; .context("Failed to get the console")?;
let mut server = Server::new(format!("qemu-rustdesk ({})", vm_name), console).await?; let mut server = Server::new(format!("qemu-rustdesk ({})", vm_name), console).await?;
for stream in listener.incoming() { loop {
let stream = stream?; let (stream, addr) = listener.accept().await?;
stream.set_nodelay(true).ok();
let laddr = stream.local_addr()?;
let stream = Stream::from(stream, laddr);
if let Err(err) = server.handle_connection(stream).await { if let Err(err) = server.handle_connection(stream).await {
log::error!("Connection closed: {err}"); log::error!("Connection from {addr} closed: {err}");
} }
} }
Ok(())
} }