diff --git a/weed/pb/master_pb/master_helper.go b/weed/pb/master_pb/master_helper.go new file mode 100644 index 000000000..52006fdee --- /dev/null +++ b/weed/pb/master_pb/master_helper.go @@ -0,0 +1,5 @@ +package master_pb + +func (v *VolumeLocation) IsEmptyUrl() bool { + return v.Url == "" || v.Url == ":0" +} diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 74d347142..7caaf01b2 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -70,8 +70,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } message := &master_pb.VolumeLocation{ - Url: dn.Url(), - PublicUrl: dn.PublicUrl, + DataCenter: dn.GetDataCenterId(), + Url: dn.Url(), + PublicUrl: dn.PublicUrl, } for _, v := range dn.GetVolumes() { message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) @@ -126,6 +127,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey) if dn == nil { + // Skip delta heartbeat for volume server versions better than 3.28 https://github.com/seaweedfs/seaweedfs/pull/3630 + if heartbeat.Ip == "" { + continue + } // ToDo must be removed after update major version dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) @@ -181,8 +186,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { - dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) - ms.Topo.DataNodeRegistration(dcName, rackName, dn) + if heartbeat.Ip != "" { + dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + ms.Topo.DataNodeRegistration(dcName, rackName, dn) + } // process heartbeat.Volumes stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc() diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index e55d821a8..3849eac19 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -160,11 +160,18 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti volumeTickChan := time.Tick(sleepInterval) ecShardTickChan := time.Tick(17 * sleepInterval) - + dataCenter := vs.store.GetDataCenter() + rack := vs.store.GetRack() + ip := vs.store.Ip + port := uint32(vs.store.Port) for { select { case volumeMessage := <-vs.store.NewVolumesChan: deltaBeat := &master_pb.Heartbeat{ + Ip: ip, + Port: port, + DataCenter: dataCenter, + Rack: rack, NewVolumes: []*master_pb.VolumeShortInformationMessage{ &volumeMessage, }, @@ -176,6 +183,10 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti } case ecShardMessage := <-vs.store.NewEcShardsChan: deltaBeat := &master_pb.Heartbeat{ + Ip: ip, + Port: port, + DataCenter: dataCenter, + Rack: rack, NewEcShards: []*master_pb.VolumeEcShardInformationMessage{ &ecShardMessage, }, @@ -188,6 +199,10 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti } case volumeMessage := <-vs.store.DeletedVolumesChan: deltaBeat := &master_pb.Heartbeat{ + Ip: ip, + Port: port, + DataCenter: dataCenter, + Rack: rack, DeletedVolumes: []*master_pb.VolumeShortInformationMessage{ &volumeMessage, }, @@ -199,6 +214,10 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti } case ecShardMessage := <-vs.store.DeletedEcShardsChan: deltaBeat := &master_pb.Heartbeat{ + Ip: ip, + Port: port, + DataCenter: dataCenter, + Rack: rack, DeletedEcShards: []*master_pb.VolumeEcShardInformationMessage{ &ecShardMessage, }, @@ -227,12 +246,12 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti case <-vs.stopChan: var volumeMessages []*master_pb.VolumeInformationMessage emptyBeat := &master_pb.Heartbeat{ - Ip: vs.store.Ip, - Port: uint32(vs.store.Port), + Ip: ip, + Port: port, PublicUrl: vs.store.PublicUrl, MaxFileKey: uint64(0), - DataCenter: vs.store.GetDataCenter(), - Rack: vs.store.GetRack(), + DataCenter: dataCenter, + Rack: rack, Volumes: volumeMessages, HasNoVolumes: len(volumeMessages) == 0, } diff --git a/weed/topology/configuration.go b/weed/topology/configuration.go index cb635658f..d9fde4a04 100644 --- a/weed/topology/configuration.go +++ b/weed/topology/configuration.go @@ -20,9 +20,8 @@ type topology struct { DataCenters []dataCenter `xml:"DataCenter"` } type Configuration struct { - XMLName xml.Name `xml:"Configuration"` - Topo topology `xml:"Topology"` - ip2location map[string]loc // this is not used any more. leave it here for later. + XMLName xml.Name `xml:"Configuration"` + Topo topology `xml:"Topology"` } func (c *Configuration) String() string { @@ -33,12 +32,6 @@ func (c *Configuration) String() string { } func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) { - if c != nil && c.ip2location != nil { - if loc, ok := c.ip2location[ip]; ok { - return loc.dcName, loc.rackName - } - } - if dcName == "" { dcName = "DefaultDataCenter" } diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 41c5a8bc4..64ae876b0 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -262,6 +262,10 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL } func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) { + if resp.VolumeLocation.IsEmptyUrl() { + glog.V(0).Infof("updateVidMap ignore short heartbeat: %+v", resp) + return + } // process new volume location loc := Location{ Url: resp.VolumeLocation.Url,