From 42e062b3b5047f108c1b3533d3bc025bae99be58 Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 25 Feb 2016 10:28:34 +0800 Subject: [PATCH] all: improvement in utils/conn --- cmd/frpc/control.go | 7 +++--- cmd/frps/control.go | 18 +++++++++----- conf/frpc.ini | 2 +- models/server/config.go | 1 + models/server/server.go | 52 ++++++++++++++++++++++++++++------------- utils/conn/conn.go | 12 +++++----- 6 files changed, 59 insertions(+), 33 deletions(-) diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index c3285554..eeea1840 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -14,8 +14,6 @@ import ( "github.com/fatedier/frp/utils/log" ) -var isHeartBeatContinue bool = true - func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { defer wait.Done() @@ -30,9 +28,10 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { // ignore response content now _, err := c.ReadLine() if err == io.EOF { - isHeartBeatContinue = false log.Debug("ProxyName [%s], server close this control conn", cli.Name) var sleepTime time.Duration = 1 + + // loop until connect to server for { log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort) tmpConn, err := loginToServer(cli) @@ -114,5 +113,5 @@ func startHeartBeat(c *conn.Conn) { break } } - log.Info("heartbeat exit") + log.Debug("heartbeat exit") } diff --git a/cmd/frps/control.go b/cmd/frps/control.go index 02ff86e7..4a78c62a 100644 --- a/cmd/frps/control.go +++ b/cmd/frps/control.go @@ -15,7 +15,10 @@ import ( func ProcessControlConn(l *conn.Listener) { for { - c := l.GetConn() + c, err := l.GetConn() + if err != nil { + return + } log.Debug("Get one new conn, %v", c.GetRemoteAddr()) go controlWorker(c) } @@ -47,7 +50,6 @@ func controlWorker(c *conn.Conn) { } if needRes { - // control conn defer c.Close() buf, _ := json.Marshal(clientCtlRes) @@ -62,7 +64,7 @@ func controlWorker(c *conn.Conn) { return } - // others is from server to client + // other messages is from server to client s, ok := server.ProxyServers[clientCtlReq.ProxyName] if !ok { log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName) @@ -138,7 +140,7 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne return } - s.CliConnChan <- c + s.GetNewCliConn(c) } else { info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type) log.Warn(info) @@ -153,8 +155,8 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) { isContinueRead := true f := func() { isContinueRead = false - c.Close() s.Close() + log.Error("ProxyName [%s], client heartbeat timeout", s.Name) } timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f) defer timer.Stop() @@ -164,13 +166,17 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) { if err != nil { if err == io.EOF { log.Warn("ProxyName [%s], client is dead!", s.Name) - c.Close() s.Close() break + } else if c.IsClosed() { + log.Warn("ProxyName [%s], client connection is closed", s.Name) + break } + log.Error("ProxyName [%s], read error: %v", s.Name, err) continue } + log.Debug("ProxyName [%s], get heartbeat", s.Name) timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second) } diff --git a/conf/frpc.ini b/conf/frpc.ini index f6df4b6c..447cdc86 100644 --- a/conf/frpc.ini +++ b/conf/frpc.ini @@ -1,7 +1,7 @@ # common是必须的section [common] server_addr = 127.0.0.1 -bind_port = 7000 +server_port = 7000 log_file = ./frpc.log # debug, info, warn, error log_level = debug diff --git a/models/server/config.go b/models/server/config.go index f9e974ed..ec70071e 100644 --- a/models/server/config.go +++ b/models/server/config.go @@ -15,6 +15,7 @@ var ( LogLevel string = "warn" LogWay string = "file" HeartBeatTimeout int64 = 30 + UserConnTimeout int64 = 10 ) var ProxyServers map[string]*ProxyServer = make(map[string]*ProxyServer) diff --git a/models/server/server.go b/models/server/server.go index 889b2d7f..fa692bec 100644 --- a/models/server/server.go +++ b/models/server/server.go @@ -3,6 +3,7 @@ package server import ( "container/list" "sync" + "time" "github.com/fatedier/frp/models/consts" "github.com/fatedier/frp/utils/conn" @@ -10,22 +11,22 @@ import ( ) type ProxyServer struct { - Name string - Passwd string - BindAddr string - ListenPort int64 - Status int64 - CliConnChan chan *conn.Conn // get client conns from control goroutine + Name string + Passwd string + BindAddr string + ListenPort int64 + Status int64 - listener *conn.Listener // accept new connection from remote users - ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel - userConnList *list.List // store user conns + listener *conn.Listener // accept new connection from remote users + ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel + cliConnChan chan *conn.Conn // get client conns from control goroutine + userConnList *list.List // store user conns mutex sync.Mutex } func (p *ProxyServer) Init() { p.Status = consts.Idle - p.CliConnChan = make(chan *conn.Conn) + p.cliConnChan = make(chan *conn.Conn) p.ctlMsgChan = make(chan int64) p.userConnList = list.New() } @@ -48,13 +49,13 @@ func (p *ProxyServer) Start() (err error) { p.Status = consts.Working - // start a goroutine for listener + // start a goroutine for listener to accept user connection go func() { for { // block - // if listener is closed, get nil - c := p.listener.GetConn() - if c == nil { + // if listener is closed, err returned + c, err := p.listener.GetConn() + if err != nil { log.Info("ProxyName [%s], listener is closed", p.Name) return } @@ -73,13 +74,28 @@ func (p *ProxyServer) Start() (err error) { // put msg to control conn p.ctlMsgChan <- 1 + + // set timeout + time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() { + p.Lock() + defer p.Unlock() + element := p.userConnList.Front() + if element == nil { + return + } + + userConn := element.Value.(*conn.Conn) + if userConn == c { + log.Warn("ProxyName [%s], user conn [%s] timeout", p.Name, c.GetRemoteAddr()) + } + }) } }() // start another goroutine for join two conns from client and user go func() { for { - cliConn, ok := <-p.CliConnChan + cliConn, ok := <-p.cliConnChan if !ok { return } @@ -114,7 +130,7 @@ func (p *ProxyServer) Close() { p.Status = consts.Idle p.listener.Close() close(p.ctlMsgChan) - close(p.CliConnChan) + close(p.cliConnChan) p.userConnList = list.New() p.Unlock() } @@ -128,3 +144,7 @@ func (p *ProxyServer) WaitUserConn() (closeFlag bool) { } return } + +func (p *ProxyServer) GetNewCliConn(c *conn.Conn) { + p.cliConnChan <- c +} diff --git a/utils/conn/conn.go b/utils/conn/conn.go index 4cf67621..29f2ce42 100644 --- a/utils/conn/conn.go +++ b/utils/conn/conn.go @@ -52,15 +52,15 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { return l, err } -// wait util get one new connection or close -// if listener is closed, return nil -func (l *Listener) GetConn() (conn *Conn) { +// wait util get one new connection or listener is closed +// if listener is closed, err returned +func (l *Listener) GetConn() (conn *Conn, err error) { var ok bool conn, ok = <-l.conns if !ok { - return nil + return conn, fmt.Errorf("channel close") } - return conn + return conn, nil } func (l *Listener) Close() { @@ -116,7 +116,7 @@ func (c *Conn) Write(content string) (err error) { } func (c *Conn) Close() { - if c.TcpConn != nil { + if c.TcpConn != nil && c.closeFlag == false { c.closeFlag = true c.TcpConn.Close() }