2021-03-29 15:59:14 +08:00
use crate ::{ bail , ResultType } ;
2022-01-02 22:55:33 +08:00
use anyhow ::anyhow ;
use bytes ::{ Bytes , BytesMut } ;
2021-06-25 19:42:51 +08:00
use futures ::{ SinkExt , StreamExt } ;
2021-03-29 15:59:14 +08:00
use protobuf ::Message ;
2022-01-14 23:02:08 +08:00
use socket2 ::{ Domain , Socket , Type } ;
use std ::net ::SocketAddr ;
2022-01-02 22:55:33 +08:00
use tokio ::net ::{ ToSocketAddrs , UdpSocket } ;
2022-01-04 00:44:50 +08:00
use tokio_socks ::{ udp ::Socks5UdpFramed , IntoTargetAddr , TargetAddr , ToProxyAddrs } ;
2021-03-29 15:59:14 +08:00
use tokio_util ::{ codec ::BytesCodec , udp ::UdpFramed } ;
2022-01-04 00:44:50 +08:00
pub enum FramedSocket {
Direct ( UdpFramed < BytesCodec > ) ,
ProxySocks ( Socks5UdpFramed ) ,
2022-01-02 22:55:33 +08:00
}
2022-05-12 17:35:25 +08:00
fn new_socket ( addr : SocketAddr , reuse : bool , buf_size : usize ) -> Result < Socket , std ::io ::Error > {
2021-06-25 19:42:51 +08:00
let socket = match addr {
SocketAddr ::V4 ( .. ) = > Socket ::new ( Domain ::ipv4 ( ) , Type ::dgram ( ) , None ) ,
SocketAddr ::V6 ( .. ) = > Socket ::new ( Domain ::ipv6 ( ) , Type ::dgram ( ) , None ) ,
} ? ;
if reuse {
// windows has no reuse_port, but it's reuse_address
// almost equals to unix's reuse_port + reuse_address,
2021-11-14 23:22:05 +08:00
// though may introduce nondeterministic behavior
2021-06-25 19:42:51 +08:00
#[ cfg(unix) ]
socket . set_reuse_port ( true ) ? ;
socket . set_reuse_address ( true ) ? ;
}
2022-07-12 23:22:27 +08:00
// only nonblocking work with tokio, https://stackoverflow.com/questions/64649405/receiver-on-tokiompscchannel-only-receives-messages-when-buffer-is-full
socket . set_nonblocking ( true ) ? ;
2022-05-12 17:35:25 +08:00
if buf_size > 0 {
socket . set_recv_buffer_size ( buf_size ) . ok ( ) ;
}
log ::info! (
" Receive buf size of udp {}: {:?} " ,
addr ,
socket . recv_buffer_size ( )
) ;
2021-06-25 19:42:51 +08:00
socket . bind ( & addr . into ( ) ) ? ;
Ok ( socket )
}
2022-01-04 00:44:50 +08:00
impl FramedSocket {
2021-03-29 15:59:14 +08:00
pub async fn new < T : ToSocketAddrs > ( addr : T ) -> ResultType < Self > {
let socket = UdpSocket ::bind ( addr ) . await ? ;
2022-01-04 00:44:50 +08:00
Ok ( Self ::Direct ( UdpFramed ::new ( socket , BytesCodec ::new ( ) ) ) )
2021-03-29 15:59:14 +08:00
}
#[ allow(clippy::never_loop) ]
2021-06-25 19:42:51 +08:00
pub async fn new_reuse < T : std ::net ::ToSocketAddrs > ( addr : T ) -> ResultType < Self > {
2022-06-12 15:39:58 +08:00
for addr in addr . to_socket_addrs ( ) ? . filter ( | x | x . is_ipv4 ( ) ) {
2022-05-12 17:35:25 +08:00
let socket = new_socket ( addr , true , 0 ) ? . into_udp_socket ( ) ;
2022-01-04 00:44:50 +08:00
return Ok ( Self ::Direct ( UdpFramed ::new (
2022-01-02 22:55:33 +08:00
UdpSocket ::from_std ( socket ) ? ,
2021-03-29 15:59:14 +08:00
BytesCodec ::new ( ) ,
2022-01-04 00:44:50 +08:00
) ) ) ;
2021-03-29 15:59:14 +08:00
}
bail! ( " could not resolve to any address " ) ;
}
2022-01-02 22:55:33 +08:00
2022-05-12 17:35:25 +08:00
pub async fn new_with_buf_size < T : std ::net ::ToSocketAddrs > (
addr : T ,
buf_size : usize ,
) -> ResultType < Self > {
2022-06-12 15:39:58 +08:00
for addr in addr . to_socket_addrs ( ) ? . filter ( | x | x . is_ipv4 ( ) ) {
2022-05-12 17:35:25 +08:00
return Ok ( Self ::Direct ( UdpFramed ::new (
UdpSocket ::from_std ( new_socket ( addr , false , buf_size ) ? . into_udp_socket ( ) ) ? ,
BytesCodec ::new ( ) ,
) ) ) ;
}
bail! ( " could not resolve to any address " ) ;
}
2022-01-05 13:21:14 +08:00
pub async fn new_proxy < ' a , ' t , P : ToProxyAddrs , T : ToSocketAddrs > (
2022-01-02 22:55:33 +08:00
proxy : P ,
2022-01-05 13:21:14 +08:00
local : T ,
2022-01-02 22:55:33 +08:00
username : & ' a str ,
password : & ' a str ,
ms_timeout : u64 ,
2022-01-05 13:21:14 +08:00
) -> ResultType < Self > {
2022-01-02 22:55:33 +08:00
let framed = if username . trim ( ) . is_empty ( ) {
2022-01-05 13:21:14 +08:00
super ::timeout ( ms_timeout , Socks5UdpFramed ::connect ( proxy , Some ( local ) ) ) . await ? ?
2022-01-02 22:55:33 +08:00
} else {
super ::timeout (
ms_timeout ,
2022-01-05 13:21:14 +08:00
Socks5UdpFramed ::connect_with_password ( proxy , Some ( local ) , username , password ) ,
2022-01-02 22:55:33 +08:00
)
. await ? ?
} ;
log ::trace! (
2022-01-05 15:09:36 +08:00
" Socks5 udp connected, local addr: {:?}, target addr: {} " ,
framed . local_addr ( ) ,
2022-01-05 13:21:14 +08:00
framed . socks_addr ( )
2022-01-02 22:55:33 +08:00
) ;
2022-01-05 13:21:14 +08:00
Ok ( Self ::ProxySocks ( framed ) )
2022-01-02 22:55:33 +08:00
}
2021-03-29 15:59:14 +08:00
#[ inline ]
2022-01-05 13:21:14 +08:00
pub async fn send (
& mut self ,
msg : & impl Message ,
addr : impl IntoTargetAddr < '_ > ,
) -> ResultType < ( ) > {
let addr = addr . into_target_addr ( ) ? . to_owned ( ) ;
2022-01-05 15:09:36 +08:00
let send_data = Bytes ::from ( msg . write_to_bytes ( ) ? ) ;
2022-01-04 00:44:50 +08:00
let _ = match self {
2022-01-05 13:21:14 +08:00
Self ::Direct ( f ) = > match addr {
TargetAddr ::Ip ( addr ) = > f . send ( ( send_data , addr ) ) . await ? ,
2022-02-15 14:46:08 +08:00
_ = > { }
2022-01-05 13:21:14 +08:00
} ,
Self ::ProxySocks ( f ) = > f . send ( ( send_data , addr ) ) . await ? ,
2022-01-04 00:44:50 +08:00
} ;
2021-03-29 15:59:14 +08:00
Ok ( ( ) )
}
2022-01-05 13:21:14 +08:00
// https://stackoverflow.com/a/68733302/1926020
2021-03-29 15:59:14 +08:00
#[ inline ]
2022-01-05 13:21:14 +08:00
pub async fn send_raw (
& mut self ,
msg : & 'static [ u8 ] ,
addr : impl IntoTargetAddr < 'static > ,
) -> ResultType < ( ) > {
let addr = addr . into_target_addr ( ) ? . to_owned ( ) ;
2022-01-04 00:44:50 +08:00
let _ = match self {
2022-01-05 13:21:14 +08:00
Self ::Direct ( f ) = > match addr {
TargetAddr ::Ip ( addr ) = > f . send ( ( Bytes ::from ( msg ) , addr ) ) . await ? ,
2022-02-15 14:46:08 +08:00
_ = > { }
2022-01-05 13:21:14 +08:00
} ,
2022-01-04 00:44:50 +08:00
Self ::ProxySocks ( f ) = > f . send ( ( Bytes ::from ( msg ) , addr ) ) . await ? ,
} ;
2021-03-29 15:59:14 +08:00
Ok ( ( ) )
}
#[ inline ]
2022-01-05 13:21:14 +08:00
pub async fn next ( & mut self ) -> Option < ResultType < ( BytesMut , TargetAddr < 'static > ) > > {
2022-01-04 00:44:50 +08:00
match self {
Self ::Direct ( f ) = > match f . next ( ) . await {
2022-01-05 13:21:14 +08:00
Some ( Ok ( ( data , addr ) ) ) = > {
2022-01-05 15:09:36 +08:00
Some ( Ok ( ( data , addr . into_target_addr ( ) . ok ( ) ? . to_owned ( ) ) ) )
2022-01-05 13:21:14 +08:00
}
2022-01-04 00:44:50 +08:00
Some ( Err ( e ) ) = > Some ( Err ( anyhow! ( e ) ) ) ,
None = > None ,
} ,
Self ::ProxySocks ( f ) = > match f . next ( ) . await {
2022-01-05 13:21:14 +08:00
Some ( Ok ( ( data , _ ) ) ) = > Some ( Ok ( ( data . data , data . dst_addr ) ) ) ,
2022-01-04 00:44:50 +08:00
Some ( Err ( e ) ) = > Some ( Err ( anyhow! ( e ) ) ) ,
None = > None ,
} ,
}
2021-03-29 15:59:14 +08:00
}
#[ inline ]
2022-01-05 13:21:14 +08:00
pub async fn next_timeout (
& mut self ,
ms : u64 ,
) -> Option < ResultType < ( BytesMut , TargetAddr < 'static > ) > > {
2021-03-29 15:59:14 +08:00
if let Ok ( res ) =
2022-01-04 00:44:50 +08:00
tokio ::time ::timeout ( std ::time ::Duration ::from_millis ( ms ) , self . next ( ) ) . await
2021-03-29 15:59:14 +08:00
{
res
} else {
None
}
}
}