From 753138a3f7828b85d2fbaeb265d912dfc49fc2bd Mon Sep 17 00:00:00 2001 From: stlpmo Date: Sun, 17 Nov 2019 11:40:36 +0800 Subject: [PATCH] test passed --- weed/command/volume.go | 144 +++++++++--- weed/util/httpdown/http_down.go | 395 ++++++++++++++++++++++++++++++++ 2 files changed, 506 insertions(+), 33 deletions(-) create mode 100644 weed/util/httpdown/http_down.go diff --git a/weed/command/volume.go b/weed/command/volume.go index 3c1aa2b50..f016a89ae 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -1,6 +1,7 @@ package command import ( + "fmt" "net/http" "os" "runtime" @@ -10,7 +11,9 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util/httpdown" "github.com/spf13/viper" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -94,7 +97,7 @@ func runVolume(cmd *Command, args []string) bool { func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) { - //Set multiple folders and each folder's max volume count limit' + // Set multiple folders and each folder's max volume count limit' v.folders = strings.Split(volumeFolders, ",") maxCountStrings := strings.Split(maxVolumeCounts, ",") for _, maxString := range maxCountStrings { @@ -113,7 +116,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v } } - //security related white list configuration + // security related white list configuration if volumeWhiteListOption != "" { v.whiteList = strings.Split(volumeWhiteListOption, ",") } @@ -128,11 +131,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v if *v.publicUrl == "" { *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort) } - isSeperatedPublicPort := *v.publicPort != *v.port volumeMux := http.NewServeMux() publicVolumeMux := volumeMux - if isSeperatedPublicPort { + if v.isSeparatedPublicPort() { publicVolumeMux = http.NewServeMux() } @@ -158,51 +160,127 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.compactionMBPerSecond, ) - listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) - glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress) - listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) - if e != nil { - glog.Fatalf("Volume server listener error:%v", e) - } - if isSeperatedPublicPort { - publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) - glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) - publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) - if e != nil { - glog.Fatalf("Volume server listener error:%v", e) + // starting grpc server + grpcS := v.startGrpcService(volumeServer) + + // starting public http server + var publicHttpDown httpdown.Server + if v.isSeparatedPublicPort() { + publicHttpDown = v.startPublicHttpService(publicVolumeMux) + if nil == publicHttpDown { + glog.Fatalf("start public http service failed") } - go func() { - if e := http.Serve(publicListener, publicVolumeMux); e != nil { - glog.Fatalf("Volume server fail to serve public: %v", e) - } - }() } + // starting the cluster http server + clusterHttpServer := v.startClusterHttpService(volumeMux) + + stopChain := make(chan struct{}) util.OnInterrupt(func() { + fmt.Println("volume server has be killed") + var startTime time.Time + + // firstly, stop the public http service to prevent from receiving new user request + if nil != publicHttpDown { + startTime = time.Now() + if err := publicHttpDown.Stop(); err != nil { + glog.Warningf("stop the public http server failed, %v", err) + } + glog.V(0).Infof("graceful stop public http server, elapsed [%d]", time.Now().Sub(startTime).Milliseconds()) + } + + startTime = time.Now() + if err := clusterHttpServer.Stop(); err != nil { + glog.Warningf("stop the cluster http server failed, %v", err) + } + glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", time.Now().Sub(startTime).Milliseconds()) + + startTime = time.Now() + grpcS.GracefulStop() + glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", time.Now().Sub(startTime).Milliseconds()) + + startTime = time.Now() volumeServer.Shutdown() + glog.V(0).Infof("stop volume server, elapsed [%d]", time.Now().Sub(startTime).Milliseconds()) + pprof.StopCPUProfile() + + close(stopChain) // notify exit }) - // starting grpc server + select { + case <-stopChain: + } + glog.Warningf("the volume server exit.") +} + +// check whether configure the public port +func (v VolumeServerOptions) isSeparatedPublicPort() bool { + return *v.publicPort != *v.port +} + +func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server { grpcPort := *v.port + 10000 grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0) if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume")) - volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer) + volume_server_pb.RegisterVolumeServerServer(grpcS, vs) reflection.Register(grpcS) - go grpcS.Serve(grpcL) + go func() { + if err := grpcS.Serve(grpcL); err != nil { + glog.Fatalf("start gRPC service failed, %s", err) + } + }() + return grpcS +} - if viper.GetString("https.volume.key") != "" { - if e := http.ServeTLS(listener, volumeMux, - viper.GetString("https.volume.cert"), viper.GetString("https.volume.key")); e != nil { - glog.Fatalf("Volume server fail to serve: %v", e) - } - } else { - if e := http.Serve(listener, volumeMux); e != nil { - glog.Fatalf("Volume server fail to serve: %v", e) - } +func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server { + publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) + glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) + publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) + if e != nil { + glog.Fatalf("Volume server listener error:%v", e) } + pubHttp := httpdown.HTTP{StopTimeout: 5 * time.Minute, KillTimeout: 5 * time.Minute} + publicHttpDown := pubHttp.Serve(&http.Server{Handler: handler}, publicListener) + go func() { + if err := publicHttpDown.Wait(); err != nil { + glog.Errorf("public http down wait failed, %v", err) + } + }() + + return publicHttpDown +} + +func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpdown.Server { + var ( + certFile, keyFile string + ) + if viper.GetString("https.volume.key") != "" { + certFile = viper.GetString("https.volume.cert") + keyFile = viper.GetString("https.volume.key") + } + + listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) + glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress) + listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) + if e != nil { + glog.Fatalf("Volume server listener error:%v", e) + } + + httpDown := httpdown.HTTP{ + KillTimeout: 5 * time.Minute, + StopTimeout: 5 * time.Minute, + CertFile: certFile, + KeyFile: keyFile} + clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener) + go func() { + if e := clusterHttpServer.Wait(); e != nil { + glog.Fatalf("Volume server fail to serve: %v", e) + } + }() + return clusterHttpServer } diff --git a/weed/util/httpdown/http_down.go b/weed/util/httpdown/http_down.go new file mode 100644 index 000000000..5cbd9611c --- /dev/null +++ b/weed/util/httpdown/http_down.go @@ -0,0 +1,395 @@ +// Package httpdown provides http.ConnState enabled graceful termination of +// http.Server. +// based on github.com/facebookarchive/httpdown, who's licence is MIT-licence, +// we add a feature of supporting for http TLS +package httpdown + +import ( + "crypto/tls" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/facebookgo/clock" + "github.com/facebookgo/stats" +) + +const ( + defaultStopTimeout = time.Minute + defaultKillTimeout = time.Minute +) + +// A Server allows encapsulates the process of accepting new connections and +// serving them, and gracefully shutting down the listener without dropping +// active connections. +type Server interface { + // Wait waits for the serving loop to finish. This will happen when Stop is + // called, at which point it returns no error, or if there is an error in the + // serving loop. You must call Wait after calling Serve or ListenAndServe. + Wait() error + + // Stop stops the listener. It will block until all connections have been + // closed. + Stop() error +} + +// HTTP defines the configuration for serving a http.Server. Multiple calls to +// Serve or ListenAndServe can be made on the same HTTP instance. The default +// timeouts of 1 minute each result in a maximum of 2 minutes before a Stop() +// returns. +type HTTP struct { + // StopTimeout is the duration before we begin force closing connections. + // Defaults to 1 minute. + StopTimeout time.Duration + + // KillTimeout is the duration before which we completely give up and abort + // even though we still have connected clients. This is useful when a large + // number of client connections exist and closing them can take a long time. + // Note, this is in addition to the StopTimeout. Defaults to 1 minute. + KillTimeout time.Duration + + // Stats is optional. If provided, it will be used to record various metrics. + Stats stats.Client + + // Clock allows for testing timing related functionality. Do not specify this + // in production code. + Clock clock.Clock + + // when set CertFile and KeyFile, the httpDown will start a http with TLS. + // Files containing a certificate and matching private key for the + // server must be provided if neither the Server's + // TLSConfig.Certificates nor TLSConfig.GetCertificate are populated. + // If the certificate is signed by a certificate authority, the + // certFile should be the concatenation of the server's certificate, + // any intermediates, and the CA's certificate. + CertFile, KeyFile string +} + +// Serve provides the low-level API which is useful if you're creating your own +// net.Listener. +func (h HTTP) Serve(s *http.Server, l net.Listener) Server { + stopTimeout := h.StopTimeout + if stopTimeout == 0 { + stopTimeout = defaultStopTimeout + } + killTimeout := h.KillTimeout + if killTimeout == 0 { + killTimeout = defaultKillTimeout + } + klock := h.Clock + if klock == nil { + klock = clock.New() + } + + ss := &server{ + stopTimeout: stopTimeout, + killTimeout: killTimeout, + stats: h.Stats, + clock: klock, + oldConnState: s.ConnState, + listener: l, + server: s, + serveDone: make(chan struct{}), + serveErr: make(chan error, 1), + new: make(chan net.Conn), + active: make(chan net.Conn), + idle: make(chan net.Conn), + closed: make(chan net.Conn), + stop: make(chan chan struct{}), + kill: make(chan chan struct{}), + certFile: h.CertFile, + keyFile: h.KeyFile, + } + s.ConnState = ss.connState + go ss.manage() + go ss.serve() + return ss +} + +// ListenAndServe returns a Server for the given http.Server. It is equivalent +// to ListenAndServe from the standard library, but returns immediately. +// Requests will be accepted in a background goroutine. If the http.Server has +// a non-nil TLSConfig, a TLS enabled listener will be setup. +func (h HTTP) ListenAndServe(s *http.Server) (Server, error) { + addr := s.Addr + if addr == "" { + if s.TLSConfig == nil { + addr = ":http" + } else { + addr = ":https" + } + } + l, err := net.Listen("tcp", addr) + if err != nil { + stats.BumpSum(h.Stats, "listen.error", 1) + return nil, err + } + if s.TLSConfig != nil { + l = tls.NewListener(l, s.TLSConfig) + } + return h.Serve(s, l), nil +} + +// server manages the serving process and allows for gracefully stopping it. +type server struct { + stopTimeout time.Duration + killTimeout time.Duration + stats stats.Client + clock clock.Clock + + oldConnState func(net.Conn, http.ConnState) + server *http.Server + serveDone chan struct{} + serveErr chan error + listener net.Listener + + new chan net.Conn + active chan net.Conn + idle chan net.Conn + closed chan net.Conn + stop chan chan struct{} + kill chan chan struct{} + + stopOnce sync.Once + stopErr error + + certFile, keyFile string +} + +func (s *server) connState(c net.Conn, cs http.ConnState) { + if s.oldConnState != nil { + s.oldConnState(c, cs) + } + + switch cs { + case http.StateNew: + s.new <- c + case http.StateActive: + s.active <- c + case http.StateIdle: + s.idle <- c + case http.StateHijacked, http.StateClosed: + s.closed <- c + } +} + +func (s *server) manage() { + defer func() { + close(s.new) + close(s.active) + close(s.idle) + close(s.closed) + close(s.stop) + close(s.kill) + }() + + var stopDone chan struct{} + + conns := map[net.Conn]http.ConnState{} + var countNew, countActive, countIdle float64 + + // decConn decrements the count associated with the current state of the + // given connection. + decConn := func(c net.Conn) { + switch conns[c] { + default: + panic(fmt.Errorf("unknown existing connection: %s", c)) + case http.StateNew: + countNew-- + case http.StateActive: + countActive-- + case http.StateIdle: + countIdle-- + } + } + + // setup a ticker to report various values every minute. if we don't have a + // Stats implementation provided, we Stop it so it never ticks. + statsTicker := s.clock.Ticker(time.Minute) + if s.stats == nil { + statsTicker.Stop() + } + + for { + select { + case <-statsTicker.C: + // we'll only get here when s.stats is not nil + s.stats.BumpAvg("http-state.new", countNew) + s.stats.BumpAvg("http-state.active", countActive) + s.stats.BumpAvg("http-state.idle", countIdle) + s.stats.BumpAvg("http-state.total", countNew+countActive+countIdle) + case c := <-s.new: + conns[c] = http.StateNew + countNew++ + case c := <-s.active: + decConn(c) + countActive++ + + conns[c] = http.StateActive + case c := <-s.idle: + decConn(c) + countIdle++ + + conns[c] = http.StateIdle + + // if we're already stopping, close it + if stopDone != nil { + c.Close() + } + case c := <-s.closed: + stats.BumpSum(s.stats, "conn.closed", 1) + decConn(c) + delete(conns, c) + + // if we're waiting to stop and are all empty, we just closed the last + // connection and we're done. + if stopDone != nil && len(conns) == 0 { + close(stopDone) + return + } + case stopDone = <-s.stop: + // if we're already all empty, we're already done + if len(conns) == 0 { + close(stopDone) + return + } + + // close current idle connections right away + for c, cs := range conns { + if cs == http.StateIdle { + c.Close() + } + } + + // continue the loop and wait for all the ConnState updates which will + // eventually close(stopDone) and return from this goroutine. + + case killDone := <-s.kill: + // force close all connections + stats.BumpSum(s.stats, "kill.conn.count", float64(len(conns))) + for c := range conns { + c.Close() + } + + // don't block the kill. + close(killDone) + + // continue the loop and we wait for all the ConnState updates and will + // return from this goroutine when we're all done. otherwise we'll try to + // send those ConnState updates on closed channels. + + } + } +} + +func (s *server) serve() { + stats.BumpSum(s.stats, "serve", 1) + if s.certFile == "" && s.keyFile == "" { + s.serveErr <- s.server.Serve(s.listener) + } else { + s.serveErr <- s.server.ServeTLS(s.listener, s.certFile, s.keyFile) + } + close(s.serveDone) + close(s.serveErr) +} + +func (s *server) Wait() error { + if err := <-s.serveErr; !isUseOfClosedError(err) { + return err + } + return nil +} + +func (s *server) Stop() error { + s.stopOnce.Do(func() { + defer stats.BumpTime(s.stats, "stop.time").End() + stats.BumpSum(s.stats, "stop", 1) + + // first disable keep-alive for new connections + s.server.SetKeepAlivesEnabled(false) + + // then close the listener so new connections can't connect come thru + closeErr := s.listener.Close() + <-s.serveDone + + // then trigger the background goroutine to stop and wait for it + stopDone := make(chan struct{}) + s.stop <- stopDone + + // wait for stop + select { + case <-stopDone: + case <-s.clock.After(s.stopTimeout): + defer stats.BumpTime(s.stats, "kill.time").End() + stats.BumpSum(s.stats, "kill", 1) + + // stop timed out, wait for kill + killDone := make(chan struct{}) + s.kill <- killDone + select { + case <-killDone: + case <-s.clock.After(s.killTimeout): + // kill timed out, give up + stats.BumpSum(s.stats, "kill.timeout", 1) + } + } + + if closeErr != nil && !isUseOfClosedError(closeErr) { + stats.BumpSum(s.stats, "listener.close.error", 1) + s.stopErr = closeErr + } + }) + return s.stopErr +} + +func isUseOfClosedError(err error) bool { + if err == nil { + return false + } + if opErr, ok := err.(*net.OpError); ok { + err = opErr.Err + } + return err.Error() == "use of closed network connection" +} + +// ListenAndServe is a convenience function to serve and wait for a SIGTERM +// or SIGINT before shutting down. +func ListenAndServe(s *http.Server, hd *HTTP) error { + if hd == nil { + hd = &HTTP{} + } + hs, err := hd.ListenAndServe(s) + if err != nil { + return err + } + + waiterr := make(chan error, 1) + go func() { + defer close(waiterr) + waiterr <- hs.Wait() + }() + + signals := make(chan os.Signal, 10) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) + + select { + case err := <-waiterr: + if err != nil { + return err + } + case <-signals: + signal.Stop(signals) + if err := hs.Stop(); err != nil { + return err + } + if err := <-waiterr; err != nil { + return err + } + } + return nil +}