use crate::{bail, ResultType}; use anyhow::anyhow; use bytes::{Bytes, BytesMut}; use futures::{SinkExt, StreamExt}; use protobuf::Message; use socket2::{Domain, Socket, Type}; use std::net::SocketAddr; use tokio::net::{ToSocketAddrs, UdpSocket}; use tokio_socks::{udp::Socks5UdpFramed, IntoTargetAddr, TargetAddr, ToProxyAddrs}; use tokio_util::{codec::BytesCodec, udp::UdpFramed}; pub enum FramedSocket { Direct(UdpFramed), ProxySocks(Socks5UdpFramed), } fn new_socket(addr: SocketAddr, reuse: bool, buf_size: usize) -> Result { let socket = match addr { SocketAddr::V4(..) => Socket::new(Domain::ipv4(), Type::dgram(), None), SocketAddr::V6(..) => Socket::new(Domain::ipv6(), Type::dgram(), None), }?; if reuse { // windows has no reuse_port, but it's reuse_address // almost equals to unix's reuse_port + reuse_address, // though may introduce nondeterministic behavior #[cfg(unix)] socket.set_reuse_port(true)?; socket.set_reuse_address(true)?; } // only nonblocking work with tokio, https://stackoverflow.com/questions/64649405/receiver-on-tokiompscchannel-only-receives-messages-when-buffer-is-full socket.set_nonblocking(true)?; if buf_size > 0 { socket.set_recv_buffer_size(buf_size).ok(); } log::info!( "Receive buf size of udp {}: {:?}", addr, socket.recv_buffer_size() ); socket.bind(&addr.into())?; Ok(socket) } impl FramedSocket { pub async fn new(addr: T) -> ResultType { let socket = UdpSocket::bind(addr).await?; Ok(Self::Direct(UdpFramed::new(socket, BytesCodec::new()))) } #[allow(clippy::never_loop)] pub async fn new_reuse(addr: T) -> ResultType { for addr in addr.to_socket_addrs()? { let socket = new_socket(addr, true, 0)?.into_udp_socket(); return Ok(Self::Direct(UdpFramed::new( UdpSocket::from_std(socket)?, BytesCodec::new(), ))); } bail!("could not resolve to any address"); } pub async fn new_with_buf_size( addr: T, buf_size: usize, ) -> ResultType { for addr in addr.to_socket_addrs()? { return Ok(Self::Direct(UdpFramed::new( UdpSocket::from_std(new_socket(addr, false, buf_size)?.into_udp_socket())?, BytesCodec::new(), ))); } bail!("could not resolve to any address"); } pub async fn new_proxy<'a, 't, P: ToProxyAddrs, T: ToSocketAddrs>( proxy: P, local: T, username: &'a str, password: &'a str, ms_timeout: u64, ) -> ResultType { let framed = if username.trim().is_empty() { super::timeout(ms_timeout, Socks5UdpFramed::connect(proxy, Some(local))).await?? } else { super::timeout( ms_timeout, Socks5UdpFramed::connect_with_password(proxy, Some(local), username, password), ) .await?? }; log::trace!( "Socks5 udp connected, local addr: {:?}, target addr: {}", framed.local_addr(), framed.socks_addr() ); Ok(Self::ProxySocks(framed)) } #[inline] pub async fn send( &mut self, msg: &impl Message, addr: impl IntoTargetAddr<'_>, ) -> ResultType<()> { let addr = addr.into_target_addr()?.to_owned(); let send_data = Bytes::from(msg.write_to_bytes()?); let _ = match self { Self::Direct(f) => match addr { TargetAddr::Ip(addr) => f.send((send_data, addr)).await?, _ => {} }, Self::ProxySocks(f) => f.send((send_data, addr)).await?, }; Ok(()) } // https://stackoverflow.com/a/68733302/1926020 #[inline] pub async fn send_raw( &mut self, msg: &'static [u8], addr: impl IntoTargetAddr<'static>, ) -> ResultType<()> { let addr = addr.into_target_addr()?.to_owned(); let _ = match self { Self::Direct(f) => match addr { TargetAddr::Ip(addr) => f.send((Bytes::from(msg), addr)).await?, _ => {} }, Self::ProxySocks(f) => f.send((Bytes::from(msg), addr)).await?, }; Ok(()) } #[inline] pub async fn next(&mut self) -> Option)>> { match self { Self::Direct(f) => match f.next().await { Some(Ok((data, addr))) => { Some(Ok((data, addr.into_target_addr().ok()?.to_owned()))) } Some(Err(e)) => Some(Err(anyhow!(e))), None => None, }, Self::ProxySocks(f) => match f.next().await { Some(Ok((data, _))) => Some(Ok((data.data, data.dst_addr))), Some(Err(e)) => Some(Err(anyhow!(e))), None => None, }, } } #[inline] pub async fn next_timeout( &mut self, ms: u64, ) -> Option)>> { if let Ok(res) = tokio::time::timeout(std::time::Duration::from_millis(ms), self.next()).await { res } else { None } } }