From 67a252ee8a0cbcf0f33cb7c94de21d4791ef1f39 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 4 Sep 2024 20:16:44 +0500 Subject: [PATCH] [master] refactor func ShouldGrowVolumes (#5884) --- weed/server/master_grpc_server_volume.go | 47 ++++++++---- weed/stats/metrics.go | 19 +++-- weed/topology/topology.go | 11 ++- weed/topology/topology_info.go | 11 ++- weed/topology/volume_growth.go | 1 + weed/topology/volume_layout.go | 94 ++++++++++++++---------- 6 files changed, 121 insertions(+), 62 deletions(-) diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 8fc06bdb6..eb0d43705 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -27,7 +27,7 @@ func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) { newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start)) if err != nil { - glog.Warningf("automatic volume grow %s: %+v", req.Option, err) + glog.V(1).Infof("automatic volume grow failed: %+v", err) return } for _, newVidLocation := range newVidLocations { @@ -38,19 +38,37 @@ func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) { func (ms *MasterServer) ProcessGrowRequest() { go func() { for { - time.Sleep(14*time.Minute + time.Duration(120*rand.Float32())*time.Second) if !ms.Topo.IsLeader() { continue } - for _, vl := range ms.Topo.ListVolumeLayouts() { - if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(&topology.VolumeGrowOption{}) { + dcs := ms.Topo.ListDataCenters() + for _, vlc := range ms.Topo.ListVolumeLayoutCollections() { + vl := vlc.VolumeLayout + if vl.HasGrowRequest() { + continue + } + if vl.ShouldGrowVolumes(vlc.Collection) { vl.AddGrowRequest() ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{ - Option: vl.ToGrowOption(), + Option: vlc.ToGrowOption(), Count: vl.GetLastGrowCount(), } + } else { + for _, dc := range dcs { + if vl.ShouldGrowVolumesByDataNode("DataCenter", dc) { + vl.AddGrowRequest() + volumeGrowOption := vlc.ToGrowOption() + volumeGrowOption.DataCenter = dc + ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{ + Option: volumeGrowOption, + Count: vl.GetLastGrowCount(), + Force: true, + } + } + } } } + time.Sleep(14*time.Minute + time.Duration(120*rand.Float32())*time.Second) } }() go func() { @@ -81,19 +99,20 @@ func (ms *MasterServer) ProcessGrowRequest() { }) // not atomic but it's okay - if !found && vl.ShouldGrowVolumes(option) { - filter.Store(req, nil) - // we have lock called inside vg - go func(req *topology.VolumeGrowRequest, vl *topology.VolumeLayout) { - ms.DoAutomaticVolumeGrow(req) - vl.DoneGrowRequest() - filter.Delete(req) - }(req, vl) - } else { + if found || (!req.Force && !vl.ShouldGrowVolumes(req.Option.Collection)) { glog.V(4).Infoln("discard volume grow request") time.Sleep(time.Millisecond * 211) vl.DoneGrowRequest() + continue } + + filter.Store(req, nil) + // we have lock called inside vg + go func(req *topology.VolumeGrowRequest, vl *topology.VolumeLayout) { + ms.DoAutomaticVolumeGrow(req) + vl.DoneGrowRequest() + filter.Delete(req) + }(req, vl) } }() } diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 134485946..956bf4009 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -70,13 +70,21 @@ var ( Help: "replica placement mismatch", }, []string{"collection", "id"}) - MasterVolumeLayout = prometheus.NewGaugeVec( + MasterVolumeLayoutWritable = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: "master", - Name: "volume_layout_total", - Help: "Number of volumes in volume layouts", - }, []string{"collection", "dataCenter", "type"}) + Name: "volume_layout_writable", + Help: "Number of writable volumes in volume layouts", + }, []string{"collection", "disk", "rp", "ttl"}) + + MasterVolumeLayoutCrowded = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "master", + Name: "volume_layout_crowded", + Help: "Number of crowded volumes in volume layouts", + }, []string{"collection", "disk", "rp", "ttl"}) MasterPickForWriteErrorCounter = prometheus.NewCounter( prometheus.CounterOpts{ @@ -281,7 +289,8 @@ func init() { Gather.MustRegister(MasterReceivedHeartbeatCounter) Gather.MustRegister(MasterLeaderChangeCounter) Gather.MustRegister(MasterReplicaPlacementMismatch) - Gather.MustRegister(MasterVolumeLayout) + Gather.MustRegister(MasterVolumeLayoutWritable) + Gather.MustRegister(MasterVolumeLayoutCrowded) Gather.MustRegister(FilerRequestCounter) Gather.MustRegister(FilerHandlerCounter) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 6bfd912cb..ba3be97c4 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -329,7 +329,7 @@ func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { } func (t *Topology) DataCenterExists(dcName string) bool { - return dcName == "" || t.GetOrCreateDataCenter(dcName) != nil + return dcName == "" || t.GetDataCenter(dcName) != nil } func (t *Topology) GetDataCenter(dcName string) (dc *DataCenter) { @@ -358,6 +358,15 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { return dc } +func (t *Topology) ListDataCenters() (dcs []string) { + t.RLock() + defer t.RUnlock() + for _, c := range t.children { + dcs = append(dcs, string(c.(*DataCenter).Id())) + } + return dcs +} + func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) { // convert into in memory struct storage.VolumeInfo var volumeInfos []storage.VolumeInfo diff --git a/weed/topology/topology_info.go b/weed/topology/topology_info.go index 5fa439a0b..33a047b1e 100644 --- a/weed/topology/topology_info.go +++ b/weed/topology/topology_info.go @@ -13,6 +13,11 @@ type TopologyInfo struct { Layouts []VolumeLayoutInfo `json:"Layouts"` } +type VolumeLayoutCollection struct { + Collection string + VolumeLayout *VolumeLayout +} + func (t *Topology) ToInfo() (info TopologyInfo) { info.Max = t.diskUsages.GetMaxVolumeCount() info.Free = t.diskUsages.FreeSpace() @@ -42,10 +47,12 @@ func (t *Topology) ToInfo() (info TopologyInfo) { return } -func (t *Topology) ListVolumeLayouts() (volumeLayouts []*VolumeLayout) { +func (t *Topology) ListVolumeLayoutCollections() (volumeLayouts []*VolumeLayoutCollection) { for _, col := range t.collectionMap.Items() { for _, volumeLayout := range col.(*Collection).storageType2VolumeLayout.Items() { - volumeLayouts = append(volumeLayouts, volumeLayout.(*VolumeLayout)) + volumeLayouts = append(volumeLayouts, + &VolumeLayoutCollection{col.(*Collection).Name, volumeLayout.(*VolumeLayout)}, + ) } } return volumeLayouts diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 7f8684753..70f0d9cd4 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -28,6 +28,7 @@ This package is created to resolve these replica placement issues: type VolumeGrowRequest struct { Option *VolumeGrowOption Count uint32 + Force bool } type volumeGrowthStrategy struct { diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 6ac4d63db..baa9b91d4 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -3,7 +3,7 @@ package topology import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/stats" - "math/rand/v2" + "math/rand" "sync" "sync/atomic" "time" @@ -218,17 +218,21 @@ func (vl *VolumeLayout) EnsureCorrectWritables(v *storage.VolumeInfo) { } func (vl *VolumeLayout) ensureCorrectWritables(vid needle.VolumeId) { - if vl.enoughCopies(vid) && vl.isAllWritable(vid) { - if !vl.oversizedVolumes.IsTrue(vid) { - vl.setVolumeWritable(vid) - } + isEnoughCopies := vl.enoughCopies(vid) + isAllWritable := vl.isAllWritable(vid) + isOversizedVolume := vl.oversizedVolumes.IsTrue(vid) + if isEnoughCopies && isAllWritable && !isOversizedVolume { + vl.setVolumeWritable(vid) } else { - if !vl.enoughCopies(vid) { + if !isEnoughCopies { glog.V(0).Infof("volume %d does not have enough copies", vid) } - if !vl.isAllWritable(vid) { + if !isAllWritable { glog.V(0).Infof("volume %d are not all writable", vid) } + if isOversizedVolume { + glog.V(1).Infof("volume %d are oversized", vid) + } glog.V(0).Infof("volume %d remove from writable", vid) vl.removeFromWritable(vid) } @@ -254,6 +258,10 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { return uint64(v.Size) >= vl.volumeSizeLimit } +func (vl *VolumeLayout) isCrowdedVolume(v *storage.VolumeInfo) bool { + return float64(v.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold +} + func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { return !vl.isOversized(v) && v.Version == needle.CurrentVersion && @@ -296,7 +304,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi return 0, 0, nil, true, fmt.Errorf("%s in volume layout", noWritableVolumes) } if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" { - vid := vl.writables[rand.IntN(lenWriters)] + vid := vl.writables[rand.Intn(lenWriters)] locationList = vl.vid2location[vid] if locationList == nil || len(locationList.list) == 0 { return 0, 0, nil, false, fmt.Errorf("Strangely vid %s is on no machine!", vid.String()) @@ -351,40 +359,45 @@ func (vl *VolumeLayout) GetLastGrowCount() uint32 { return vl.lastGrowCount.Load() } -func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool { - total, active, crowded := vl.GetActiveVolumeCount(option) - stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.DataCenter, "total").Set(float64(total)) - stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.DataCenter, "active").Set(float64(active)) - stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.DataCenter, "crowded").Set(float64(crowded)) - //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high) - return active <= crowded +func (vl *VolumeLayout) ShouldGrowVolumes(collection string) bool { + writable, crowded := vl.GetWritableVolumeCount() + stats.MasterVolumeLayoutWritable.WithLabelValues(collection, vl.diskType.String(), vl.rp.String(), vl.ttl.String()).Set(float64(writable)) + stats.MasterVolumeLayoutCrowded.WithLabelValues(collection, vl.diskType.String(), vl.rp.String(), vl.ttl.String()).Set(float64(crowded)) + return writable <= crowded } -func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (total, active, crowded int) { +func (vl *VolumeLayout) ShouldGrowVolumesByDataNode(nodeType string, dataNode string) bool { vl.accessLock.RLock() - defer vl.accessLock.RUnlock() - if option.DataCenter == "" { - return len(vl.writables), len(vl.writables), len(vl.crowded) - } - total = len(vl.writables) - for _, v := range vl.writables { + writables := make([]needle.VolumeId, len(vl.writables)) + copy(writables, vl.writables) + vl.accessLock.RUnlock() + + dataNodeId := NodeId(dataNode) + for _, v := range writables { for _, dn := range vl.vid2location[v].list { - if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { - if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { - continue - } - if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { - continue - } - active++ - info, _ := dn.GetVolumesById(v) - if float64(info.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold { - crowded++ + dataNodeFound := false + switch nodeType { + case "DataCenter": + dataNodeFound = dn.GetDataCenter().Id() == dataNodeId + case "Rack": + dataNodeFound = dn.GetRack().Id() == dataNodeId + case "DataNode": + dataNodeFound = dn.Id() == dataNodeId + } + if dataNodeFound { + if info, err := dn.GetVolumesById(v); err == nil && !vl.isCrowdedVolume(&info) { + return false } } } } - return + return true +} + +func (vl *VolumeLayout) GetWritableVolumeCount() (active, crowded int) { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + return len(vl.writables), len(vl.crowded) } func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool { @@ -531,12 +544,13 @@ func (vl *VolumeLayout) ToInfo() (info VolumeLayoutInfo) { return } -func (vl *VolumeLayout) ToGrowOption() (option *VolumeGrowOption) { - option = &VolumeGrowOption{} - option.ReplicaPlacement = vl.rp - option.Ttl = vl.ttl - option.DiskType = vl.diskType - return +func (vlc *VolumeLayoutCollection) ToGrowOption() (option *VolumeGrowOption) { + return &VolumeGrowOption{ + Collection: vlc.Collection, + ReplicaPlacement: vlc.VolumeLayout.rp, + Ttl: vlc.VolumeLayout.ttl, + DiskType: vlc.VolumeLayout.diskType, + } } func (vl *VolumeLayout) Stats() *VolumeLayoutStats {