all: some optimization

This commit is contained in:
fatedier 2016-03-31 18:03:44 +08:00
parent 45c21b2705
commit 52f99bbc00
9 changed files with 253 additions and 230 deletions

View File

@ -3,7 +3,7 @@ language: go
go:
- 1.4.2
- 1.5.1
- 1.5.3
install:
- make

View File

@ -28,64 +28,98 @@ import (
"frp/utils/log"
)
var connection *conn.Conn = nil
var heartBeatTimer *time.Timer = nil
func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
defer wait.Done()
msgSendChan := make(chan interface{}, 1024)
c, err := loginToServer(cli)
if err != nil {
log.Error("ProxyName [%s], connect to server failed!", cli.Name)
return
}
connection = c
defer connection.Close()
defer c.Close()
go heartbeatSender(c, msgSendChan)
go msgSender(cli, c, msgSendChan)
msgReader(cli, c, msgSendChan)
close(msgSendChan)
}
// loop for reading messages from frpc after control connection is established
func msgReader(cli *client.ProxyClient, c *conn.Conn, msgSendChan chan interface{}) error {
// for heartbeat
var heartbeatTimeout bool = false
timer := time.AfterFunc(time.Duration(client.HeartBeatTimeout)*time.Second, func() {
heartbeatTimeout = true
c.Close()
log.Error("ProxyName [%s], heartbeatRes from frps timeout", cli.Name)
})
defer timer.Stop()
for {
// ignore response content now
content, err := connection.ReadLine()
if err == io.EOF || nil == connection || connection.IsClosed() {
log.Debug("ProxyName [%s], server close this control conn", cli.Name)
var sleepTime time.Duration = 1
buf, err := c.ReadLine()
if err == io.EOF || c == nil || c.IsClosed() {
c.Close()
log.Warn("ProxyName [%s], frps close this control conn!", cli.Name)
var delayTime time.Duration = 1
// loop until connect to server
// loop until reconnect to frps
for {
log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort)
tmpConn, err := loginToServer(cli)
log.Info("ProxyName [%s], try to reconnect to frps [%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort)
c, err = loginToServer(cli)
if err == nil {
connection.Close()
connection = tmpConn
go heartbeatSender(c, msgSendChan)
break
}
if sleepTime < 60 {
sleepTime = sleepTime * 2
if delayTime < 60 {
delayTime = delayTime * 2
}
time.Sleep(sleepTime * time.Second)
time.Sleep(delayTime * time.Second)
}
continue
} else if err != nil {
log.Warn("ProxyName [%s], read from server error, %v", cli.Name, err)
log.Warn("ProxyName [%s], read from frps error: %v", cli.Name, err)
continue
}
clientCtlRes := &msg.ClientCtlRes{}
if err := json.Unmarshal([]byte(content), clientCtlRes); err != nil {
log.Warn("Parse err: %v : %s", err, content)
continue
}
if consts.SCHeartBeatRes == clientCtlRes.GeneralRes.Code {
if heartBeatTimer != nil {
log.Debug("Client rcv heartbeat response")
heartBeatTimer.Reset(time.Duration(client.HeartBeatTimeout) * time.Second)
} else {
log.Error("heartBeatTimer is nil")
}
ctlRes := &msg.ControlRes{}
if err := json.Unmarshal([]byte(buf), &ctlRes); err != nil {
log.Warn("ProxyName [%s], parse msg from frps error: %v : %s", cli.Name, err, buf)
continue
}
cli.StartTunnel(client.ServerAddr, client.ServerPort)
switch ctlRes.Type {
case consts.HeartbeatRes:
log.Debug("ProxyName [%s], receive heartbeat response", cli.Name)
timer.Reset(time.Duration(client.HeartBeatTimeout) * time.Second)
case consts.NoticeUserConn:
log.Debug("ProxyName [%s], new user connection", cli.Name)
cli.StartTunnel(client.ServerAddr, client.ServerPort)
default:
log.Warn("ProxyName [%s}, unsupport msgType [%d]", cli.Name, ctlRes.Type)
}
}
return nil
}
// loop for sending messages from channel to frps
func msgSender(cli *client.ProxyClient, c *conn.Conn, msgSendChan chan interface{}) {
for {
msg, ok := <-msgSendChan
if !ok {
break
}
buf, _ := json.Marshal(msg)
err := c.Write(string(buf) + "\n")
if err != nil {
log.Warn("ProxyName [%s], write to client error, proxy exit", cli.Name)
c.Close()
break
}
}
}
@ -96,8 +130,8 @@ func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) {
return
}
req := &msg.ClientCtlReq{
Type: consts.CtlConn,
req := &msg.ControlReq{
Type: consts.NewCtlConn,
ProxyName: cli.Name,
Passwd: cli.Passwd,
}
@ -115,53 +149,31 @@ func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) {
}
log.Debug("ProxyName [%s], read [%s]", cli.Name, res)
clientCtlRes := &msg.ClientCtlRes{}
if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil {
ctlRes := &msg.ControlRes{}
if err = json.Unmarshal([]byte(res), &ctlRes); err != nil {
log.Error("ProxyName [%s], format server response error, %v", cli.Name, err)
return
}
if clientCtlRes.Code != 0 {
log.Error("ProxyName [%s], start proxy error, %s", cli.Name, clientCtlRes.Msg)
return c, fmt.Errorf("%s", clientCtlRes.Msg)
if ctlRes.Code != 0 {
log.Error("ProxyName [%s], start proxy error, %s", cli.Name, ctlRes.Msg)
return c, fmt.Errorf("%s", ctlRes.Msg)
}
go startHeartBeat(c)
log.Debug("ProxyName [%s], connect to server[%s:%d] success!", cli.Name, client.ServerAddr, client.ServerPort)
log.Debug("ProxyName [%s], connect to server [%s:%d] success!", cli.Name, client.ServerAddr, client.ServerPort)
return
}
func startHeartBeat(c *conn.Conn) {
f := func() {
log.Error("HeartBeat timeout!")
if c != nil {
c.Close()
}
func heartbeatSender(c *conn.Conn, msgSendChan chan interface{}) {
heartbeatReq := &msg.ControlReq{
Type: consts.HeartbeatReq,
}
heartBeatTimer = time.AfterFunc(time.Duration(client.HeartBeatTimeout)*time.Second, f)
defer heartBeatTimer.Stop()
clientCtlReq := &msg.ClientCtlReq{
Type: consts.CSHeartBeatReq,
ProxyName: "",
Passwd: "",
}
request, err := json.Marshal(clientCtlReq)
if err != nil {
log.Warn("Serialize clientCtlReq err! Err: %v", err)
}
log.Debug("Start to send heartbeat")
log.Info("Start to send heartbeat to frps")
for {
time.Sleep(time.Duration(client.HeartBeatInterval) * time.Second)
if c != nil && !c.IsClosed() {
log.Debug("Send heartbeat to server")
err = c.Write(string(request) + "\n")
if err != nil {
log.Error("Send hearbeat to server failed! Err:%v", err)
continue
}
msgSendChan <- heartbeatReq
} else {
break
}

View File

@ -33,87 +33,162 @@ func ProcessControlConn(l *conn.Listener) {
if err != nil {
return
}
log.Debug("Get one new conn, %v", c.GetRemoteAddr())
log.Debug("Get new connection, %v", c.GetRemoteAddr())
go controlWorker(c)
}
}
// connection from every client and server
func controlWorker(c *conn.Conn) {
// the first message is from client to server
// if error, close connection
res, err := c.ReadLine()
// if login message type is NewWorkConn, don't close this connection
var closeFlag bool = true
var s *server.ProxyServer
defer func() {
if closeFlag {
c.Close()
if s != nil {
s.Close()
}
}
}()
// get login message
buf, err := c.ReadLine()
if err != nil {
log.Warn("Read error, %v", err)
return
}
log.Debug("get: %s", res)
log.Debug("Get msg from frpc: %s", buf)
clientCtlReq := &msg.ClientCtlReq{}
clientCtlRes := &msg.ClientCtlRes{}
if err := json.Unmarshal([]byte(res), &clientCtlReq); err != nil {
log.Warn("Parse err: %v : %s", err, res)
cliReq := &msg.ControlReq{}
if err := json.Unmarshal([]byte(buf), &cliReq); err != nil {
log.Warn("Parse msg from frpc error: %v : %s", err, buf)
return
}
// check
succ, info, needRes := checkProxy(clientCtlReq, c)
if !succ {
clientCtlRes.Code = 1
clientCtlRes.Msg = info
// do login when type is NewCtlConn or NewWorkConn
ret, info := doLogin(cliReq, c)
s, ok := server.ProxyServers[cliReq.ProxyName]
if !ok {
log.Warn("ProxyName [%s] is not exist", cliReq.ProxyName)
return
}
if needRes {
defer c.Close()
buf, _ := json.Marshal(clientCtlRes)
err = c.Write(string(buf) + "\n")
// if login type is NewWorkConn, nothing will be send to frpc
if cliReq.Type != consts.NewWorkConn {
cliRes := &msg.ControlRes{
Type: consts.NewCtlConnRes,
Code: ret,
Msg: info,
}
byteBuf, _ := json.Marshal(cliRes)
err = c.Write(string(byteBuf) + "\n")
if err != nil {
log.Warn("Write error, %v", err)
log.Warn("ProxyName [%s], write to client error, proxy exit", s.Name)
time.Sleep(1 * time.Second)
return
}
} else {
// work conn, just return
closeFlag = false
return
}
// other messages is from server to client
s, ok := server.ProxyServers[clientCtlReq.ProxyName]
if !ok {
log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName)
return
}
// create a channel for sending messages
msgSendChan := make(chan interface{}, 1024)
go msgSender(s, c, msgSendChan)
go noticeUserConn(s, msgSendChan)
// read control msg from client
go readControlMsgFromClient(s, c)
serverCtlReq := &msg.ClientCtlReq{}
serverCtlReq.Type = consts.WorkConn
for {
closeFlag := s.WaitUserConn()
if closeFlag {
log.Debug("ProxyName [%s], goroutine for dealing user conn is closed", s.Name)
break
}
buf, _ := json.Marshal(serverCtlReq)
err = c.Write(string(buf) + "\n")
if err != nil {
log.Warn("ProxyName [%s], write to client error, proxy exit", s.Name)
s.Close()
return
}
log.Debug("ProxyName [%s], write to client to add work conn success", s.Name)
}
// loop for reading control messages from frpc and deal with different types
msgReader(s, c, msgSendChan)
close(msgSendChan)
log.Info("ProxyName [%s], I'm dead!", s.Name)
return
}
func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, needRes bool) {
succ = false
needRes = true
// when frps get one new user connection, send NoticeUserConn message to frpc and accept one new WorkConn later
func noticeUserConn(s *server.ProxyServer, msgSendChan chan interface{}) {
for {
closeFlag := s.WaitUserConn()
if closeFlag {
log.Debug("ProxyName [%s], goroutine for noticing user conn is closed", s.Name)
break
}
notice := &msg.ControlRes{
Type: consts.NoticeUserConn,
}
msgSendChan <- notice
log.Debug("ProxyName [%s], notice client to add work conn", s.Name)
}
}
// loop for reading messages from frpc after control connection is established
func msgReader(s *server.ProxyServer, c *conn.Conn, msgSendChan chan interface{}) error {
// for heartbeat
var heartbeatTimeout bool = false
timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, func() {
heartbeatTimeout = true
s.Close()
c.Close()
log.Error("ProxyName [%s], client heartbeat timeout", s.Name)
})
defer timer.Stop()
for {
buf, err := c.ReadLine()
if err != nil {
if err == io.EOF {
log.Warn("ProxyName [%s], client is dead!", s.Name)
return err
} else if c == nil || c.IsClosed() {
log.Warn("ProxyName [%s], client connection is closed", s.Name)
return err
}
log.Warn("ProxyName [%s], read error: %v", s.Name, err)
continue
}
cliReq := &msg.ControlReq{}
if err := json.Unmarshal([]byte(buf), &cliReq); err != nil {
log.Warn("ProxyName [%s], parse msg from frpc error: %v : %s", s.Name, err, buf)
continue
}
switch cliReq.Type {
case consts.HeartbeatReq:
log.Debug("ProxyName [%s], get heartbeat", s.Name)
timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
heartbeatRes := msg.ControlRes{
Type: consts.HeartbeatRes,
}
msgSendChan <- heartbeatRes
default:
log.Warn("ProxyName [%s}, unsupport msgType [%d]", s.Name, cliReq.Type)
}
}
return nil
}
// loop for sending messages from channel to frpc
func msgSender(s *server.ProxyServer, c *conn.Conn, msgSendChan chan interface{}) {
for {
msg, ok := <-msgSendChan
if !ok {
break
}
buf, _ := json.Marshal(msg)
err := c.Write(string(buf) + "\n")
if err != nil {
log.Warn("ProxyName [%s], write to client error, proxy exit", s.Name)
s.Close()
break
}
}
}
// if success, ret equals 0, otherwise greater than 0
func doLogin(req *msg.ControlReq, c *conn.Conn) (ret int64, info string) {
ret = 1
// check if proxy name exist
s, ok := server.ProxyServers[req.ProxyName]
if !ok {
@ -130,89 +205,35 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne
}
// control conn
if req.Type == consts.CtlConn {
if req.Type == consts.NewCtlConn {
if s.Status != consts.Idle {
info = fmt.Sprintf("ProxyName [%s], already in use", req.ProxyName)
log.Warn(info)
return
}
// start proxy and listen for user conn, no block
// start proxy and listen for user connections, no block
err := s.Start()
if err != nil {
info = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err.Error())
info = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err)
log.Warn(info)
return
}
log.Info("ProxyName [%s], start proxy success", req.ProxyName)
} else if req.Type == consts.WorkConn {
} else if req.Type == consts.NewWorkConn {
// work conn
needRes = false
if s.Status != consts.Working {
log.Warn("ProxyName [%s], is not working when it gets one new work conn", req.ProxyName)
log.Warn("ProxyName [%s], is not working when it gets one new work connnection", req.ProxyName)
return
}
s.GetNewCliConn(c)
// the connection will close after join over
s.RecvNewWorkConn(c)
} else {
info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type)
log.Warn(info)
info = fmt.Sprintf("Unsupport login message type [%d]", req.Type)
log.Warn("Unsupport login message type [%d]", req.Type)
return
}
succ = true
ret = 0
return
}
func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) {
isContinueRead := true
f := func() {
isContinueRead = false
s.Close()
log.Error("ProxyName [%s], client heartbeat timeout", s.Name)
}
timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f)
defer timer.Stop()
for isContinueRead {
content, err := c.ReadLine()
if err != nil {
if err == io.EOF {
log.Warn("ProxyName [%s], client is dead!", s.Name)
s.Close()
break
} else if nil == c || c.IsClosed() {
log.Warn("ProxyName [%s], client connection is closed", s.Name)
break
}
log.Error("ProxyName [%s], read error: %v", s.Name, err)
continue
}
clientCtlReq := &msg.ClientCtlReq{}
if err := json.Unmarshal([]byte(content), clientCtlReq); err != nil {
log.Warn("Parse err: %v : %s", err, content)
continue
}
if consts.CSHeartBeatReq == clientCtlReq.Type {
log.Debug("ProxyName [%s], get heartbeat", s.Name)
timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
clientCtlRes := &msg.ClientCtlRes{}
clientCtlRes.GeneralRes.Code = consts.SCHeartBeatRes
response, err := json.Marshal(clientCtlRes)
if err != nil {
log.Warn("Serialize ClientCtlRes err! err: %v", err)
continue
}
err = c.Write(string(response) + "\n")
if err != nil {
log.Error("Send heartbeat response to client failed! Err:%v", err)
continue
}
}
}
}

View File

@ -51,8 +51,8 @@ func (p *ProxyClient) GetRemoteConn(addr string, port int64) (c *conn.Conn, err
return
}
req := &msg.ClientCtlReq{
Type: consts.WorkConn,
req := &msg.ControlReq{
Type: consts.NewWorkConn,
ProxyName: p.Name,
Passwd: p.Passwd,
}
@ -79,7 +79,7 @@ func (p *ProxyClient) StartTunnel(serverAddr string, serverPort int64) (err erro
}
// l means local, r means remote
log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", localConn.GetLocalAddr(), localConn.GetRemoteAddr(),
log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", localConn.GetLocalAddr(), localConn.GetRemoteAddr(),
remoteConn.GetLocalAddr(), remoteConn.GetRemoteAddr())
// go conn.Join(localConn, remoteConn)
go conn.JoinMore(localConn, remoteConn, p.Passwd)

View File

@ -20,18 +20,12 @@ const (
Working
)
// connection type
// msg type
const (
CtlConn = iota
WorkConn
)
// msg from client to server
const (
CSHeartBeatReq = 1
)
// msg from server to client
const (
SCHeartBeatRes = 100
NewCtlConn = iota
NewWorkConn
NoticeUserConn
NewCtlConnRes
HeartbeatReq
HeartbeatRes
)

View File

@ -19,16 +19,15 @@ type GeneralRes struct {
Msg string `json:"msg"`
}
type ClientCtlReq struct {
// messages between control connection of frpc and frps
type ControlReq struct {
Type int64 `json:"type"`
ProxyName string `json:"proxy_name"`
Passwd string `json:"passwd"`
ProxyName string `json:"proxy_name,omitempty"`
Passwd string `json:"passwd, omitempty"`
}
type ClientCtlRes struct {
GeneralRes
}
type ServerCtlReq struct {
Type int64 `json:"type"`
type ControlRes struct {
Type int64 `json:"type"`
Code int64 `json:"code"`
Msg string `json:"msg"`
}

View File

@ -33,14 +33,14 @@ type ProxyServer struct {
listener *conn.Listener // accept new connection from remote users
ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
cliConnChan chan *conn.Conn // get client conns from control goroutine
workConnChan chan *conn.Conn // get new work conns from control goroutine
userConnList *list.List // store user conns
mutex sync.Mutex
}
func (p *ProxyServer) Init() {
p.Status = consts.Idle
p.cliConnChan = make(chan *conn.Conn)
p.workConnChan = make(chan *conn.Conn)
p.ctlMsgChan = make(chan int64)
p.userConnList = list.New()
}
@ -109,7 +109,7 @@ func (p *ProxyServer) Start() (err error) {
// start another goroutine for join two conns from client and user
go func() {
for {
cliConn, ok := <-p.cliConnChan
workConn, ok := <-p.workConnChan
if !ok {
return
}
@ -122,7 +122,7 @@ func (p *ProxyServer) Start() (err error) {
userConn = element.Value.(*conn.Conn)
p.userConnList.Remove(element)
} else {
cliConn.Close()
workConn.Close()
p.Unlock()
continue
}
@ -130,10 +130,10 @@ func (p *ProxyServer) Start() (err error) {
// msg will transfer to another without modifying
// l means local, r means remote
log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", cliConn.GetLocalAddr(), cliConn.GetRemoteAddr(),
log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(),
userConn.GetLocalAddr(), userConn.GetRemoteAddr())
// go conn.Join(cliConn, userConn)
go conn.JoinMore(userConn, cliConn, p.Passwd)
// go conn.Join(workConn, userConn)
go conn.JoinMore(userConn, workConn, p.Passwd)
}
}()
@ -147,7 +147,7 @@ func (p *ProxyServer) Close() {
p.listener.Close()
}
close(p.ctlMsgChan)
close(p.cliConnChan)
close(p.workConnChan)
p.userConnList = list.New()
p.Unlock()
}
@ -162,6 +162,6 @@ func (p *ProxyServer) WaitUserConn() (closeFlag bool) {
return
}
func (p *ProxyServer) GetNewCliConn(c *conn.Conn) {
p.cliConnChan <- c
func (p *ProxyServer) RecvNewWorkConn(c *conn.Conn) {
p.workConnChan <- c
}

View File

@ -153,7 +153,7 @@ func Join(c1 *Conn, c2 *Conn) {
var err error
_, err = io.Copy(to.TcpConn, from.TcpConn)
if err != nil {
log.Warn("join conns error, %v", err)
log.Warn("join connections error, %v", err)
}
}
@ -171,10 +171,8 @@ func JoinMore(local *Conn, remote *Conn, cryptoKey string) {
defer to.Close()
defer wait.Done()
err := PipeEncryptoWriter(from.TcpConn, to.TcpConn, key)
if err != nil {
log.Warn("join conns error, %v", err)
}
// we don't care about errors here
PipeEncryptoWriter(from.TcpConn, to.TcpConn, key)
}
decryptoPipe := func(to *Conn, from *Conn, key string) {
@ -182,16 +180,15 @@ func JoinMore(local *Conn, remote *Conn, cryptoKey string) {
defer to.Close()
defer wait.Done()
err := PipeDecryptoReader(to.TcpConn, from.TcpConn, key)
if err != nil {
log.Warn("join conns error, %v", err)
}
// we don't care about errors here
PipeDecryptoReader(to.TcpConn, from.TcpConn, key)
}
wait.Add(2)
go encrypPipe(local, remote, cryptoKey)
go decryptoPipe(remote, local, cryptoKey)
wait.Wait()
log.Debug("One tunnel stopped")
return
}

View File

@ -19,7 +19,7 @@ import (
"strings"
)
var version string = "0.2.0"
var version string = "0.3.0"
func Full() string {
return version