From 0d5393641eab18ec356c7e0ebaf51b1164094d8c Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Tue, 19 Nov 2024 15:33:18 +0100 Subject: [PATCH] Unify usage of shell.EcNode.dc as DataCenterId. (#6258) --- weed/shell/command_ec_common.go | 11 +++++------ weed/shell/command_ec_decode.go | 9 +++++---- weed/shell/command_ec_encode.go | 4 ++-- weed/shell/command_fs_verify.go | 13 +++++++------ weed/shell/command_volume_balance_test.go | 5 +++-- weed/shell/command_volume_configure_replication.go | 5 +++-- weed/shell/command_volume_delete_empty.go | 9 +++++---- weed/shell/command_volume_fix_replication.go | 4 ++-- weed/shell/command_volume_fsck.go | 2 +- weed/shell/command_volume_tier_download.go | 2 +- weed/shell/command_volume_tier_move.go | 2 +- 11 files changed, 35 insertions(+), 31 deletions(-) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index b98921fd7..39c3f75c5 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -128,12 +128,11 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, return } -// TODO: Make dc a DataCenterId instead of string. -func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) { +func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) { for _, dc := range topo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, dn := range rack.DataNodeInfos { - fn(dc.Id, RackId(rack.Id), dn) + fn(DataCenterId(dc.Id), RackId(rack.Id), dn) } } } @@ -223,15 +222,15 @@ func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes } func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) { - eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - if selectedDataCenter != "" && selectedDataCenter != dc { + eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { + if selectedDataCenter != "" && selectedDataCenter != string(dc) { return } freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) ecNodes = append(ecNodes, &EcNode{ info: dn, - dc: DataCenterId(dc), + dc: dc, rack: rack, freeEcSlot: int(freeEcSlots), }) diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 02d0f316d..d3e985a2f 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -4,11 +4,12 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/storage/types" "io" "time" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -262,7 +263,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { vidMap := make(map[uint32]bool) - eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { for _, v := range diskInfo.EcShardInfos { if v.Collection == selectedCollection { @@ -282,7 +283,7 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits { nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits) - eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { for _, v := range diskInfo.EcShardInfos { if v.Id == uint32(vid) { diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 6edd57f45..0b60694fc 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -85,7 +85,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr if !*forceChanges { var nodeCount int - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { nodeCount++ }) if nodeCount < erasure_coding.ParityShardsCount { @@ -309,7 +309,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri fmt.Printf("collect volumes quiet for: %d seconds and %.1f%% full\n", quietSeconds, fullPercentage) vidMap := make(map[uint32]bool) - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { // ignore remote volumes diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index ea9f86c3c..2cd7f06dc 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -5,6 +5,12 @@ import ( "context" "flag" "fmt" + "io" + "math" + "strings" + "sync" + "time" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -15,11 +21,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" "go.uber.org/atomic" "golang.org/x/exp/slices" - "io" - "math" - "strings" - "sync" - "time" ) func init() { @@ -114,7 +115,7 @@ func (c *commandFsVerify) collectVolumeIds() error { if err != nil { return err } - eachDataNode(topologyInfo, func(dc string, rack RackId, nodeInfo *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, nodeInfo *master_pb.DataNodeInfo) { for _, diskInfo := range nodeInfo.DiskInfos { for _, vi := range diskInfo.VolumeInfos { volumeServer := pb.NewServerAddressFromDataNode(nodeInfo) diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index 4e60f6ff8..f1a0c4450 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -2,9 +2,10 @@ package shell import ( "fmt" + "testing" + "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/stretchr/testify/assert" - "testing" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" @@ -275,7 +276,7 @@ func TestVolumeSelection(t *testing.T) { func TestDeleteEmptySelection(t *testing.T) { topologyInfo := parseOutput(topoData) - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 { diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index 29d7ebac4..bc1ca5304 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -5,10 +5,11 @@ import ( "errors" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" "io" "path/filepath" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -72,7 +73,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern) // find all data nodes with volumes that needs replication change - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { var targetVolumeIds []uint32 for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { diff --git a/weed/shell/command_volume_delete_empty.go b/weed/shell/command_volume_delete_empty.go index fc1d50e64..a81ebe6dd 100644 --- a/weed/shell/command_volume_delete_empty.go +++ b/weed/shell/command_volume_delete_empty.go @@ -2,13 +2,14 @@ package shell import ( "flag" + "io" + "log" + "time" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" - "io" - "log" - "time" ) func init() { @@ -59,7 +60,7 @@ func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, wri quietSeconds := int64(*quietPeriod / time.Second) nowUnixSeconds := time.Now().Unix() - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 2c35b3bcd..ea2c0791e 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -179,8 +179,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { volumeReplicas := make(map[uint32][]*VolumeReplica) var allLocations []location - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - loc := newLocation(dc, string(rack), dn) + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { + loc := newLocation(string(dc), string(rack), dn) for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 30d3ecd11..dbd814309 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -651,7 +651,7 @@ func (c *commandVolumeFsck) collectVolumeIds() (volumeIdToServer map[string]map[ return } - eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, t *master_pb.DataNodeInfo) { var volumeCount, ecShardCount int dataNodeId := t.GetId() for _, diskInfo := range t.DiskInfos { diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index 2878d16fe..9cea40eb2 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -90,7 +90,7 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { vidMap := make(map[uint32]bool) - eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" { diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index d5087f0ec..46da39eef 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -292,7 +292,7 @@ func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeS fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds) vidMap := make(map[uint32]bool) - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { // check collection name pattern