From 42e062b3b5047f108c1b3533d3bc025bae99be58 Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 25 Feb 2016 10:28:34 +0800 Subject: [PATCH 1/3] all: improvement in utils/conn --- cmd/frpc/control.go | 7 +++--- cmd/frps/control.go | 18 +++++++++----- conf/frpc.ini | 2 +- models/server/config.go | 1 + models/server/server.go | 52 ++++++++++++++++++++++++++++------------- utils/conn/conn.go | 12 +++++----- 6 files changed, 59 insertions(+), 33 deletions(-) diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index c3285554..eeea1840 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -14,8 +14,6 @@ import ( "github.com/fatedier/frp/utils/log" ) -var isHeartBeatContinue bool = true - func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { defer wait.Done() @@ -30,9 +28,10 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { // ignore response content now _, err := c.ReadLine() if err == io.EOF { - isHeartBeatContinue = false log.Debug("ProxyName [%s], server close this control conn", cli.Name) var sleepTime time.Duration = 1 + + // loop until connect to server for { log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort) tmpConn, err := loginToServer(cli) @@ -114,5 +113,5 @@ func startHeartBeat(c *conn.Conn) { break } } - log.Info("heartbeat exit") + log.Debug("heartbeat exit") } diff --git a/cmd/frps/control.go b/cmd/frps/control.go index 02ff86e7..4a78c62a 100644 --- a/cmd/frps/control.go +++ b/cmd/frps/control.go @@ -15,7 +15,10 @@ import ( func ProcessControlConn(l *conn.Listener) { for { - c := l.GetConn() + c, err := l.GetConn() + if err != nil { + return + } log.Debug("Get one new conn, %v", c.GetRemoteAddr()) go controlWorker(c) } @@ -47,7 +50,6 @@ func controlWorker(c *conn.Conn) { } if needRes { - // control conn defer c.Close() buf, _ := json.Marshal(clientCtlRes) @@ -62,7 +64,7 @@ func controlWorker(c *conn.Conn) { return } - // others is from server to client + // other messages is from server to client s, ok := server.ProxyServers[clientCtlReq.ProxyName] if !ok { log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName) @@ -138,7 +140,7 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne return } - s.CliConnChan <- c + s.GetNewCliConn(c) } else { info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type) log.Warn(info) @@ -153,8 +155,8 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) { isContinueRead := true f := func() { isContinueRead = false - c.Close() s.Close() + log.Error("ProxyName [%s], client heartbeat timeout", s.Name) } timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f) defer timer.Stop() @@ -164,13 +166,17 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) { if err != nil { if err == io.EOF { log.Warn("ProxyName [%s], client is dead!", s.Name) - c.Close() s.Close() break + } else if c.IsClosed() { + log.Warn("ProxyName [%s], client connection is closed", s.Name) + break } + log.Error("ProxyName [%s], read error: %v", s.Name, err) continue } + log.Debug("ProxyName [%s], get heartbeat", s.Name) timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second) } diff --git a/conf/frpc.ini b/conf/frpc.ini index f6df4b6c..447cdc86 100644 --- a/conf/frpc.ini +++ b/conf/frpc.ini @@ -1,7 +1,7 @@ # common是必须的section [common] server_addr = 127.0.0.1 -bind_port = 7000 +server_port = 7000 log_file = ./frpc.log # debug, info, warn, error log_level = debug diff --git a/models/server/config.go b/models/server/config.go index f9e974ed..ec70071e 100644 --- a/models/server/config.go +++ b/models/server/config.go @@ -15,6 +15,7 @@ var ( LogLevel string = "warn" LogWay string = "file" HeartBeatTimeout int64 = 30 + UserConnTimeout int64 = 10 ) var ProxyServers map[string]*ProxyServer = make(map[string]*ProxyServer) diff --git a/models/server/server.go b/models/server/server.go index 889b2d7f..fa692bec 100644 --- a/models/server/server.go +++ b/models/server/server.go @@ -3,6 +3,7 @@ package server import ( "container/list" "sync" + "time" "github.com/fatedier/frp/models/consts" "github.com/fatedier/frp/utils/conn" @@ -10,22 +11,22 @@ import ( ) type ProxyServer struct { - Name string - Passwd string - BindAddr string - ListenPort int64 - Status int64 - CliConnChan chan *conn.Conn // get client conns from control goroutine + Name string + Passwd string + BindAddr string + ListenPort int64 + Status int64 - listener *conn.Listener // accept new connection from remote users - ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel - userConnList *list.List // store user conns + listener *conn.Listener // accept new connection from remote users + ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel + cliConnChan chan *conn.Conn // get client conns from control goroutine + userConnList *list.List // store user conns mutex sync.Mutex } func (p *ProxyServer) Init() { p.Status = consts.Idle - p.CliConnChan = make(chan *conn.Conn) + p.cliConnChan = make(chan *conn.Conn) p.ctlMsgChan = make(chan int64) p.userConnList = list.New() } @@ -48,13 +49,13 @@ func (p *ProxyServer) Start() (err error) { p.Status = consts.Working - // start a goroutine for listener + // start a goroutine for listener to accept user connection go func() { for { // block - // if listener is closed, get nil - c := p.listener.GetConn() - if c == nil { + // if listener is closed, err returned + c, err := p.listener.GetConn() + if err != nil { log.Info("ProxyName [%s], listener is closed", p.Name) return } @@ -73,13 +74,28 @@ func (p *ProxyServer) Start() (err error) { // put msg to control conn p.ctlMsgChan <- 1 + + // set timeout + time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() { + p.Lock() + defer p.Unlock() + element := p.userConnList.Front() + if element == nil { + return + } + + userConn := element.Value.(*conn.Conn) + if userConn == c { + log.Warn("ProxyName [%s], user conn [%s] timeout", p.Name, c.GetRemoteAddr()) + } + }) } }() // start another goroutine for join two conns from client and user go func() { for { - cliConn, ok := <-p.CliConnChan + cliConn, ok := <-p.cliConnChan if !ok { return } @@ -114,7 +130,7 @@ func (p *ProxyServer) Close() { p.Status = consts.Idle p.listener.Close() close(p.ctlMsgChan) - close(p.CliConnChan) + close(p.cliConnChan) p.userConnList = list.New() p.Unlock() } @@ -128,3 +144,7 @@ func (p *ProxyServer) WaitUserConn() (closeFlag bool) { } return } + +func (p *ProxyServer) GetNewCliConn(c *conn.Conn) { + p.cliConnChan <- c +} diff --git a/utils/conn/conn.go b/utils/conn/conn.go index 4cf67621..29f2ce42 100644 --- a/utils/conn/conn.go +++ b/utils/conn/conn.go @@ -52,15 +52,15 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { return l, err } -// wait util get one new connection or close -// if listener is closed, return nil -func (l *Listener) GetConn() (conn *Conn) { +// wait util get one new connection or listener is closed +// if listener is closed, err returned +func (l *Listener) GetConn() (conn *Conn, err error) { var ok bool conn, ok = <-l.conns if !ok { - return nil + return conn, fmt.Errorf("channel close") } - return conn + return conn, nil } func (l *Listener) Close() { @@ -116,7 +116,7 @@ func (c *Conn) Write(content string) (err error) { } func (c *Conn) Close() { - if c.TcpConn != nil { + if c.TcpConn != nil && c.closeFlag == false { c.closeFlag = true c.TcpConn.Close() } From f32cc7a8401c53f0473a2161f6711eb14c84757e Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 25 Feb 2016 15:45:27 +0800 Subject: [PATCH 2/3] update go version to 1.5.1 --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 68fda64f..8fdbf99d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ language: go go: - 1.4.2 - - 1.5.2 + - 1.5.1 install: - make From 09127a3b5573e1083be148e924f0dc6fb911918b Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 25 Feb 2016 17:38:34 +0800 Subject: [PATCH 3/3] all: improve the method of import for internal packages 1. Change directory structure and Makefile to let GOPATH=`pwd`, so wherever the project directory is, just use make to build. --- Makefile | 9 +++++---- {cmd => src/frp/cmd}/frpc/control.go | 10 +++++----- {cmd => src/frp/cmd}/frpc/main.go | 4 ++-- {cmd => src/frp/cmd}/frps/control.go | 10 +++++----- {cmd => src/frp/cmd}/frps/main.go | 6 +++--- {models => src/frp/models}/client/client.go | 8 ++++---- {models => src/frp/models}/client/config.go | 0 {models => src/frp/models}/consts/consts.go | 0 {models => src/frp/models}/msg/msg.go | 0 {models => src/frp/models}/server/config.go | 0 {models => src/frp/models}/server/server.go | 6 +++--- {utils => src/frp/utils}/broadcast/broadcast.go | 0 {utils => src/frp/utils}/broadcast/broadcast_test.go | 0 {utils => src/frp/utils}/conn/conn.go | 2 +- {utils => src/frp/utils}/log/log.go | 0 {utils => src/frp/utils}/pcrypto/pcrypto.go | 0 {utils => src/frp/utils}/pcrypto/pcrypto_test.go | 0 17 files changed, 28 insertions(+), 27 deletions(-) rename {cmd => src/frp/cmd}/frpc/control.go (93%) rename {cmd => src/frp/cmd}/frpc/main.go (85%) rename {cmd => src/frp/cmd}/frps/control.go (95%) rename {cmd => src/frp/cmd}/frps/main.go (76%) rename {models => src/frp/models}/client/client.go (90%) rename {models => src/frp/models}/client/config.go (100%) rename {models => src/frp/models}/consts/consts.go (100%) rename {models => src/frp/models}/msg/msg.go (100%) rename {models => src/frp/models}/server/config.go (100%) rename {models => src/frp/models}/server/server.go (96%) rename {utils => src/frp/utils}/broadcast/broadcast.go (100%) rename {utils => src/frp/utils}/broadcast/broadcast_test.go (100%) rename {utils => src/frp/utils}/conn/conn.go (98%) rename {utils => src/frp/utils}/log/log.go (100%) rename {utils => src/frp/utils}/pcrypto/pcrypto.go (100%) rename {utils => src/frp/utils}/pcrypto/pcrypto_test.go (100%) diff --git a/Makefile b/Makefile index 01515429..797e11f5 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ export PATH := $(GOPATH)/bin:$(PATH) +export NEW_GOPATH := $(shell pwd) all: build @@ -9,13 +10,13 @@ godep: godep restore fmt: - @godep go fmt ./... + @GOPATH=$(NEW_GOPATH) godep go fmt ./... frps: - godep go build -o bin/frps ./cmd/frps + GOPATH=$(NEW_GOPATH) godep go build -o bin/frps ./src/frp/cmd/frps frpc: - godep go build -o bin/frpc ./cmd/frpc + GOPATH=$(NEW_GOPATH) godep go build -o bin/frpc ./src/frp/cmd/frpc test: - @godep go test ./... + @GOPATH=$(NEW_GOPATH) godep go test ./... diff --git a/cmd/frpc/control.go b/src/frp/cmd/frpc/control.go similarity index 93% rename from cmd/frpc/control.go rename to src/frp/cmd/frpc/control.go index eeea1840..4cc3ecaf 100644 --- a/cmd/frpc/control.go +++ b/src/frp/cmd/frpc/control.go @@ -7,11 +7,11 @@ import ( "sync" "time" - "github.com/fatedier/frp/models/client" - "github.com/fatedier/frp/models/consts" - "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/utils/conn" - "github.com/fatedier/frp/utils/log" + "frp/models/client" + "frp/models/consts" + "frp/models/msg" + "frp/utils/conn" + "frp/utils/log" ) func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { diff --git a/cmd/frpc/main.go b/src/frp/cmd/frpc/main.go similarity index 85% rename from cmd/frpc/main.go rename to src/frp/cmd/frpc/main.go index c17f3e76..01772343 100644 --- a/cmd/frpc/main.go +++ b/src/frp/cmd/frpc/main.go @@ -4,8 +4,8 @@ import ( "os" "sync" - "github.com/fatedier/frp/models/client" - "github.com/fatedier/frp/utils/log" + "frp/models/client" + "frp/utils/log" ) func main() { diff --git a/cmd/frps/control.go b/src/frp/cmd/frps/control.go similarity index 95% rename from cmd/frps/control.go rename to src/frp/cmd/frps/control.go index 4a78c62a..61540544 100644 --- a/cmd/frps/control.go +++ b/src/frp/cmd/frps/control.go @@ -6,11 +6,11 @@ import ( "io" "time" - "github.com/fatedier/frp/models/consts" - "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/models/server" - "github.com/fatedier/frp/utils/conn" - "github.com/fatedier/frp/utils/log" + "frp/models/consts" + "frp/models/msg" + "frp/models/server" + "frp/utils/conn" + "frp/utils/log" ) func ProcessControlConn(l *conn.Listener) { diff --git a/cmd/frps/main.go b/src/frp/cmd/frps/main.go similarity index 76% rename from cmd/frps/main.go rename to src/frp/cmd/frps/main.go index e21f927e..c4ce4d78 100644 --- a/cmd/frps/main.go +++ b/src/frp/cmd/frps/main.go @@ -3,9 +3,9 @@ package main import ( "os" - "github.com/fatedier/frp/models/server" - "github.com/fatedier/frp/utils/conn" - "github.com/fatedier/frp/utils/log" + "frp/models/server" + "frp/utils/conn" + "frp/utils/log" ) func main() { diff --git a/models/client/client.go b/src/frp/models/client/client.go similarity index 90% rename from models/client/client.go rename to src/frp/models/client/client.go index 81a04480..69d256d6 100644 --- a/models/client/client.go +++ b/src/frp/models/client/client.go @@ -3,10 +3,10 @@ package client import ( "encoding/json" - "github.com/fatedier/frp/models/consts" - "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/utils/conn" - "github.com/fatedier/frp/utils/log" + "frp/models/consts" + "frp/models/msg" + "frp/utils/conn" + "frp/utils/log" ) type ProxyClient struct { diff --git a/models/client/config.go b/src/frp/models/client/config.go similarity index 100% rename from models/client/config.go rename to src/frp/models/client/config.go diff --git a/models/consts/consts.go b/src/frp/models/consts/consts.go similarity index 100% rename from models/consts/consts.go rename to src/frp/models/consts/consts.go diff --git a/models/msg/msg.go b/src/frp/models/msg/msg.go similarity index 100% rename from models/msg/msg.go rename to src/frp/models/msg/msg.go diff --git a/models/server/config.go b/src/frp/models/server/config.go similarity index 100% rename from models/server/config.go rename to src/frp/models/server/config.go diff --git a/models/server/server.go b/src/frp/models/server/server.go similarity index 96% rename from models/server/server.go rename to src/frp/models/server/server.go index fa692bec..e8d6d810 100644 --- a/models/server/server.go +++ b/src/frp/models/server/server.go @@ -5,9 +5,9 @@ import ( "sync" "time" - "github.com/fatedier/frp/models/consts" - "github.com/fatedier/frp/utils/conn" - "github.com/fatedier/frp/utils/log" + "frp/models/consts" + "frp/utils/conn" + "frp/utils/log" ) type ProxyServer struct { diff --git a/utils/broadcast/broadcast.go b/src/frp/utils/broadcast/broadcast.go similarity index 100% rename from utils/broadcast/broadcast.go rename to src/frp/utils/broadcast/broadcast.go diff --git a/utils/broadcast/broadcast_test.go b/src/frp/utils/broadcast/broadcast_test.go similarity index 100% rename from utils/broadcast/broadcast_test.go rename to src/frp/utils/broadcast/broadcast_test.go diff --git a/utils/conn/conn.go b/src/frp/utils/conn/conn.go similarity index 98% rename from utils/conn/conn.go rename to src/frp/utils/conn/conn.go index 29f2ce42..dceabd9f 100644 --- a/utils/conn/conn.go +++ b/src/frp/utils/conn/conn.go @@ -7,7 +7,7 @@ import ( "net" "sync" - "github.com/fatedier/frp/utils/log" + "frp/utils/log" ) type Listener struct { diff --git a/utils/log/log.go b/src/frp/utils/log/log.go similarity index 100% rename from utils/log/log.go rename to src/frp/utils/log/log.go diff --git a/utils/pcrypto/pcrypto.go b/src/frp/utils/pcrypto/pcrypto.go similarity index 100% rename from utils/pcrypto/pcrypto.go rename to src/frp/utils/pcrypto/pcrypto.go diff --git a/utils/pcrypto/pcrypto_test.go b/src/frp/utils/pcrypto/pcrypto_test.go similarity index 100% rename from utils/pcrypto/pcrypto_test.go rename to src/frp/utils/pcrypto/pcrypto_test.go