From 67e2ea72be5b0f5bb1f69967165a279cc3067938 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 18 Jan 2019 14:14:47 -0800 Subject: [PATCH] master add separate grpc port due to https://github.com/soheilhy/cmux/issues/64 fix https://github.com/chrislusf/seaweedfs/issues/820 fix https://github.com/chrislusf/seaweedfs/issues/840 fix https://github.com/chrislusf/seaweedfs/issues/841 --- weed/command/master.go | 39 ++++++++++++--------- weed/command/server.go | 39 ++++++++++++--------- weed/operation/grpc_client.go | 7 +++- weed/operation/upload_content.go | 8 ++--- weed/server/volume_grpc_client_to_master.go | 11 ++++-- weed/util/grpc_client_server.go | 21 +++++++++++ weed/wdclient/masterclient.go | 16 +++++++-- 7 files changed, 95 insertions(+), 46 deletions(-) diff --git a/weed/command/master.go b/weed/command/master.go index 29c8a6833..11f634ed4 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -9,11 +9,10 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/gorilla/mux" - "github.com/soheilhy/cmux" "google.golang.org/grpc/reflection" ) @@ -32,6 +31,7 @@ var cmdMaster = &Command{ var ( mport = cmdMaster.Flag.Int("port", 9333, "http listen port") + mGrpcPort = cmdMaster.Flag.Int("port.grpc", 0, "grpc server listen port, default to http port + 10000") masterIp = cmdMaster.Flag.String("ip", "localhost", "master | address") masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") @@ -79,7 +79,7 @@ func runMaster(cmd *Command, args []string) bool { glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress) - listener, e := util.NewListener(listeningAddress, 0) + masterListener, e := util.NewListener(listeningAddress, 0) if e != nil { glog.Fatalf("Master startup error: %v", e) } @@ -91,23 +91,28 @@ func runMaster(cmd *Command, args []string) bool { ms.SetRaftServer(raftServer) }() - // start grpc and http server - m := cmux.New(listener) + go func() { + // starting grpc server + grpcPort := *mGrpcPort + if grpcPort == 0 { + grpcPort = *mport + 10000 + } + grpcL, err := util.NewListener(*masterBindIp + ":"+strconv.Itoa(grpcPort), 0) + if err != nil { + glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) + } + // Create your protocol servers. + grpcS := util.NewGrpcServer() + master_pb.RegisterSeaweedServer(grpcS, ms) + reflection.Register(grpcS) - grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) - httpL := m.Match(cmux.Any()) - - // Create your protocol servers. - grpcS := util.NewGrpcServer() - master_pb.RegisterSeaweedServer(grpcS, ms) - reflection.Register(grpcS) + glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterBindIp, grpcPort) + grpcS.Serve(grpcL) + }() + // start http server httpS := &http.Server{Handler: r} - - go grpcS.Serve(grpcL) - go httpS.Serve(httpL) - - if err := m.Serve(); err != nil { + if err := httpS.Serve(masterListener); err != nil { glog.Fatalf("master server failed to serve: %v", err) } diff --git a/weed/command/server.go b/weed/command/server.go index 8f760942c..ba5305a97 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -15,7 +15,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" - "github.com/soheilhy/cmux" "google.golang.org/grpc/reflection" ) @@ -62,6 +61,7 @@ var ( serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") serverGarbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") + masterGrpcPort = cmdServer.Flag.Int("master.port.grpc", 0, "master grpc server listen port, default to http port + 10000") masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") @@ -179,6 +179,25 @@ func runServer(cmd *Command, args []string) bool { glog.Fatalf("Master startup error: %v", e) } + go func() { + // starting grpc server + grpcPort := *masterGrpcPort + if grpcPort == 0 { + grpcPort = *masterPort + 10000 + } + grpcL, err := util.NewListener(*serverIp+":"+strconv.Itoa(grpcPort), 0) + if err != nil { + glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) + } + // Create your protocol servers. + grpcS := util.NewGrpcServer() + master_pb.RegisterSeaweedServer(grpcS, ms) + reflection.Register(grpcS) + + glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *serverIp, grpcPort) + grpcS.Serve(grpcL) + }() + go func() { raftWaitForMaster.Wait() time.Sleep(100 * time.Millisecond) @@ -190,23 +209,9 @@ func runServer(cmd *Command, args []string) bool { raftWaitForMaster.Done() - // start grpc and http server - m := cmux.New(masterListener) - - grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) - httpL := m.Match(cmux.Any()) - - // Create your protocol servers. - grpcS := util.NewGrpcServer() - master_pb.RegisterSeaweedServer(grpcS, ms) - reflection.Register(grpcS) - + // start http server httpS := &http.Server{Handler: r} - - go grpcS.Serve(grpcL) - go httpS.Serve(httpL) - - if err := m.Serve(); err != nil { + if err := httpS.Serve(masterListener); err != nil { glog.Fatalf("master server failed to serve: %v", err) } diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index 300f78b58..d0931a8d3 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -44,9 +44,14 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error { + masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer, 0) + if parseErr != nil { + return fmt.Errorf("failed to parse master grpc %v", masterServer) + } + return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, masterServer) + }, masterGrpcAddress) } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 8cf31e382..030bf5889 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -13,7 +13,6 @@ import ( "net/textproto" "path/filepath" "strings" - "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" @@ -31,10 +30,9 @@ var ( ) func init() { - client = &http.Client{ - Transport: &http.Transport{MaxIdleConnsPerHost: 1024}, - Timeout: 5 * time.Second, - } + client = &http.Client{Transport: &http.Transport{ + MaxIdleConnsPerHost: 1024, + }} } var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index fcbcfd4d4..bd3ffd7b3 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -27,7 +27,12 @@ func (vs *VolumeServer) heartbeat() { if newLeader != "" { master = newLeader } - newLeader, err = vs.doHeartbeat(master, time.Duration(vs.pulseSeconds)*time.Second) + masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master, 0) + if parseErr != nil { + glog.V(0).Infof("failed to parse master grpc %v", masterGrpcAddress) + continue + } + newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, time.Duration(vs.pulseSeconds)*time.Second) if err != nil { glog.V(0).Infof("heartbeat error: %v", err) time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) @@ -36,9 +41,9 @@ func (vs *VolumeServer) heartbeat() { } } -func (vs *VolumeServer) doHeartbeat(masterNode string, sleepInterval time.Duration) (newLeader string, err error) { +func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := util.GrpcDial(masterNode) + grpcConection, err := util.GrpcDial(masterGrpcAddress) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go index f80d71f29..d029d21ae 100644 --- a/weed/util/grpc_client_server.go +++ b/weed/util/grpc_client_server.go @@ -2,6 +2,8 @@ package util import ( "fmt" + "strconv" + "strings" "sync" "time" @@ -64,3 +66,22 @@ func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts return err } + +func ParseServerToGrpcAddress(server string, optionalGrpcPort int) (serverGrpcAddress string, err error) { + hostnameAndPort := strings.Split(server, ":") + if len(hostnameAndPort) != 2 { + return "", fmt.Errorf("The server should have hostname:port format: %v", hostnameAndPort) + } + + filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) + if parseErr != nil { + return "", fmt.Errorf("The server port parse error: %v", parseErr) + } + + filerGrpcPort := int(filerPort) + 10000 + if optionalGrpcPort != 0 { + filerGrpcPort = optionalGrpcPort + } + + return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil +} diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index df0adbd18..f58c28504 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -49,8 +49,9 @@ func (mc *MasterClient) KeepConnectedToMaster() { func (mc *MasterClient) tryAllMasters() { for _, master := range mc.masters { - glog.V(0).Infof("Connecting to %v", master) - withMasterClient(master, func(client master_pb.SeaweedClient) error { + glog.V(0).Infof("Connecting to master %v", master) + gprcErr := withMasterClient(master, func(client master_pb.SeaweedClient) error { + stream, err := client.KeepConnected(context.Background()) if err != nil { glog.V(0).Infof("failed to keep connected to %s: %v", master, err) @@ -88,13 +89,22 @@ func (mc *MasterClient) tryAllMasters() { }) + if gprcErr != nil { + glog.V(0).Infof("%s failed to connect with master %v: %v", mc.name, master, gprcErr) + } + mc.currentMaster = "" } } func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error { - grpcConnection, err := util.GrpcDial(master) + masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master, 0) + if parseErr != nil { + return fmt.Errorf("failed to parse master grpc %v", master) + } + + grpcConnection, err := util.GrpcDial(masterGrpcAddress) if err != nil { return fmt.Errorf("fail to dial %s: %v", master, err) }