2023-01-27 11:42:08 +08:00
use crate ::ResultType ;
use anyhow ::{ anyhow , Context } ;
2022-01-02 22:55:33 +08:00
use bytes ::{ Bytes , BytesMut } ;
2021-06-25 19:42:51 +08:00
use futures ::{ SinkExt , StreamExt } ;
2021-03-29 15:59:14 +08:00
use protobuf ::Message ;
2022-01-14 23:02:08 +08:00
use socket2 ::{ Domain , Socket , Type } ;
use std ::net ::SocketAddr ;
2023-01-27 11:42:08 +08:00
use tokio ::net ::{ lookup_host , ToSocketAddrs , UdpSocket } ;
2022-01-04 00:44:50 +08:00
use tokio_socks ::{ udp ::Socks5UdpFramed , IntoTargetAddr , TargetAddr , ToProxyAddrs } ;
2021-03-29 15:59:14 +08:00
use tokio_util ::{ codec ::BytesCodec , udp ::UdpFramed } ;
2022-01-04 00:44:50 +08:00
pub enum FramedSocket {
Direct ( UdpFramed < BytesCodec > ) ,
ProxySocks ( Socks5UdpFramed ) ,
2022-01-02 22:55:33 +08:00
}
2022-05-12 17:35:25 +08:00
fn new_socket ( addr : SocketAddr , reuse : bool , buf_size : usize ) -> Result < Socket , std ::io ::Error > {
2021-06-25 19:42:51 +08:00
let socket = match addr {
SocketAddr ::V4 ( .. ) = > Socket ::new ( Domain ::ipv4 ( ) , Type ::dgram ( ) , None ) ,
SocketAddr ::V6 ( .. ) = > Socket ::new ( Domain ::ipv6 ( ) , Type ::dgram ( ) , None ) ,
} ? ;
if reuse {
// windows has no reuse_port, but it's reuse_address
// almost equals to unix's reuse_port + reuse_address,
2021-11-14 23:22:05 +08:00
// though may introduce nondeterministic behavior
2021-06-25 19:42:51 +08:00
#[ cfg(unix) ]
2023-06-27 15:20:32 +08:00
socket . set_reuse_port ( true ) . ok ( ) ;
socket . set_reuse_address ( true ) . ok ( ) ;
2021-06-25 19:42:51 +08:00
}
2022-07-12 23:22:27 +08:00
// only nonblocking work with tokio, https://stackoverflow.com/questions/64649405/receiver-on-tokiompscchannel-only-receives-messages-when-buffer-is-full
socket . set_nonblocking ( true ) ? ;
2022-05-12 17:35:25 +08:00
if buf_size > 0 {
socket . set_recv_buffer_size ( buf_size ) . ok ( ) ;
}
2023-04-10 18:29:33 +08:00
log ::debug! (
2022-05-12 17:35:25 +08:00
" Receive buf size of udp {}: {:?} " ,
addr ,
socket . recv_buffer_size ( )
) ;
2023-01-27 11:42:08 +08:00
if addr . is_ipv6 ( ) & & addr . ip ( ) . is_unspecified ( ) & & addr . port ( ) > 0 {
socket . set_only_v6 ( false ) . ok ( ) ;
}
2021-06-25 19:42:51 +08:00
socket . bind ( & addr . into ( ) ) ? ;
Ok ( socket )
}
2022-01-04 00:44:50 +08:00
impl FramedSocket {
2021-03-29 15:59:14 +08:00
pub async fn new < T : ToSocketAddrs > ( addr : T ) -> ResultType < Self > {
2023-01-27 11:42:08 +08:00
Self ::new_reuse ( addr , false , 0 ) . await
2021-03-29 15:59:14 +08:00
}
2022-01-02 22:55:33 +08:00
2023-01-27 11:42:08 +08:00
pub async fn new_reuse < T : ToSocketAddrs > (
2022-05-12 17:35:25 +08:00
addr : T ,
2023-01-27 11:42:08 +08:00
reuse : bool ,
2022-05-12 17:35:25 +08:00
buf_size : usize ,
) -> ResultType < Self > {
2023-01-27 11:42:08 +08:00
let addr = lookup_host ( & addr )
. await ?
. next ( )
. context ( " could not resolve to any address " ) ? ;
Ok ( Self ::Direct ( UdpFramed ::new (
UdpSocket ::from_std ( new_socket ( addr , reuse , buf_size ) ? . into_udp_socket ( ) ) ? ,
BytesCodec ::new ( ) ,
) ) )
2022-05-12 17:35:25 +08:00
}
2022-01-05 13:21:14 +08:00
pub async fn new_proxy < ' a , ' t , P : ToProxyAddrs , T : ToSocketAddrs > (
2022-01-02 22:55:33 +08:00
proxy : P ,
2022-01-05 13:21:14 +08:00
local : T ,
2022-01-02 22:55:33 +08:00
username : & ' a str ,
password : & ' a str ,
ms_timeout : u64 ,
2022-01-05 13:21:14 +08:00
) -> ResultType < Self > {
2022-01-02 22:55:33 +08:00
let framed = if username . trim ( ) . is_empty ( ) {
2022-01-05 13:21:14 +08:00
super ::timeout ( ms_timeout , Socks5UdpFramed ::connect ( proxy , Some ( local ) ) ) . await ? ?
2022-01-02 22:55:33 +08:00
} else {
super ::timeout (
ms_timeout ,
2022-01-05 13:21:14 +08:00
Socks5UdpFramed ::connect_with_password ( proxy , Some ( local ) , username , password ) ,
2022-01-02 22:55:33 +08:00
)
. await ? ?
} ;
log ::trace! (
2022-01-05 15:09:36 +08:00
" Socks5 udp connected, local addr: {:?}, target addr: {} " ,
framed . local_addr ( ) ,
2022-01-05 13:21:14 +08:00
framed . socks_addr ( )
2022-01-02 22:55:33 +08:00
) ;
2022-01-05 13:21:14 +08:00
Ok ( Self ::ProxySocks ( framed ) )
2022-01-02 22:55:33 +08:00
}
2021-03-29 15:59:14 +08:00
#[ inline ]
2022-01-05 13:21:14 +08:00
pub async fn send (
& mut self ,
msg : & impl Message ,
addr : impl IntoTargetAddr < '_ > ,
) -> ResultType < ( ) > {
let addr = addr . into_target_addr ( ) ? . to_owned ( ) ;
2022-01-05 15:09:36 +08:00
let send_data = Bytes ::from ( msg . write_to_bytes ( ) ? ) ;
2023-01-27 11:42:08 +08:00
match self {
Self ::Direct ( f ) = > {
if let TargetAddr ::Ip ( addr ) = addr {
f . send ( ( send_data , addr ) ) . await ?
}
}
2022-01-05 13:21:14 +08:00
Self ::ProxySocks ( f ) = > f . send ( ( send_data , addr ) ) . await ? ,
2022-01-04 00:44:50 +08:00
} ;
2021-03-29 15:59:14 +08:00
Ok ( ( ) )
}
2022-01-05 13:21:14 +08:00
// https://stackoverflow.com/a/68733302/1926020
2021-03-29 15:59:14 +08:00
#[ inline ]
2022-01-05 13:21:14 +08:00
pub async fn send_raw (
& mut self ,
msg : & 'static [ u8 ] ,
addr : impl IntoTargetAddr < 'static > ,
) -> ResultType < ( ) > {
let addr = addr . into_target_addr ( ) ? . to_owned ( ) ;
2023-01-27 11:42:08 +08:00
match self {
Self ::Direct ( f ) = > {
if let TargetAddr ::Ip ( addr ) = addr {
f . send ( ( Bytes ::from ( msg ) , addr ) ) . await ?
}
}
2022-01-04 00:44:50 +08:00
Self ::ProxySocks ( f ) = > f . send ( ( Bytes ::from ( msg ) , addr ) ) . await ? ,
} ;
2021-03-29 15:59:14 +08:00
Ok ( ( ) )
}
#[ inline ]
2022-01-05 13:21:14 +08:00
pub async fn next ( & mut self ) -> Option < ResultType < ( BytesMut , TargetAddr < 'static > ) > > {
2022-01-04 00:44:50 +08:00
match self {
Self ::Direct ( f ) = > match f . next ( ) . await {
2022-01-05 13:21:14 +08:00
Some ( Ok ( ( data , addr ) ) ) = > {
2022-01-05 15:09:36 +08:00
Some ( Ok ( ( data , addr . into_target_addr ( ) . ok ( ) ? . to_owned ( ) ) ) )
2022-01-05 13:21:14 +08:00
}
2022-01-04 00:44:50 +08:00
Some ( Err ( e ) ) = > Some ( Err ( anyhow! ( e ) ) ) ,
None = > None ,
} ,
Self ::ProxySocks ( f ) = > match f . next ( ) . await {
2022-01-05 13:21:14 +08:00
Some ( Ok ( ( data , _ ) ) ) = > Some ( Ok ( ( data . data , data . dst_addr ) ) ) ,
2022-01-04 00:44:50 +08:00
Some ( Err ( e ) ) = > Some ( Err ( anyhow! ( e ) ) ) ,
None = > None ,
} ,
}
2021-03-29 15:59:14 +08:00
}
#[ inline ]
2022-01-05 13:21:14 +08:00
pub async fn next_timeout (
& mut self ,
ms : u64 ,
) -> Option < ResultType < ( BytesMut , TargetAddr < 'static > ) > > {
2021-03-29 15:59:14 +08:00
if let Ok ( res ) =
2022-01-04 00:44:50 +08:00
tokio ::time ::timeout ( std ::time ::Duration ::from_millis ( ms ) , self . next ( ) ) . await
2021-03-29 15:59:14 +08:00
{
res
} else {
None
}
}
2022-12-29 20:34:52 +08:00
2023-01-27 11:42:08 +08:00
pub fn local_addr ( & self ) -> Option < SocketAddr > {
2022-12-29 20:34:52 +08:00
if let FramedSocket ::Direct ( x ) = self {
if let Ok ( v ) = x . get_ref ( ) . local_addr ( ) {
2023-01-27 11:42:08 +08:00
return Some ( v ) ;
2022-12-29 20:34:52 +08:00
}
}
2023-01-27 11:42:08 +08:00
None
2022-12-29 20:34:52 +08:00
}
2021-03-29 15:59:14 +08:00
}