From 5bddf0c085c8d1eabb3d1bd4f206ce60d5589c0d Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 4 Nov 2024 00:08:45 +0500 Subject: [PATCH] [shell] volume.balance collect volume servers by dc rack node (#6191) * chore: balance by rack * fix: rm check lock * fix: selected racks * fix: selected nodes * fix: containts * fix: one collectVolumeServersByDcRackNode * fix: revert lock and add lock * fix: panic test * revert noLock --- Makefile | 2 ++ weed/shell/command_volume_balance.go | 18 +++++++++++++----- weed/shell/command_volume_balance_test.go | 2 +- weed/shell/command_volume_server_evacuate.go | 4 ++-- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index bc63bc708..17eceafd3 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ +.PHONY: test + BINARY = weed SOURCE_DIR = . diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index bea6f33c3..42b4ea13c 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -32,7 +33,7 @@ func (c *commandVolumeBalance) Name() string { func (c *commandVolumeBalance) Help() string { return `balance all volumes among volume servers - volume.balance [-collection ALL_COLLECTIONS|EACH_COLLECTION|] [-force] [-dataCenter=] + volume.balance [-collection ALL_COLLECTIONS|EACH_COLLECTION|] [-force] [-dataCenter=] [-racks=rack_name_one,rack_name_two] [-nodes=192.168.0.1:8080,192.168.0.2:8080] Algorithm: @@ -73,6 +74,8 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) collection := balanceCommand.String("collection", "ALL_COLLECTIONS", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") + racks := balanceCommand.String("racks", "", "only apply the balancing for this racks") + nodes := balanceCommand.String("nodes", "", "only apply the balancing for this nodes") applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan.") if err = balanceCommand.Parse(args); err != nil { return nil @@ -84,12 +87,12 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer } // collect topology information - topologyInfo, _, err := collectTopologyInfo(commandEnv, 15*time.Second) + topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Second) if err != nil { return err } - volumeServers := collectVolumeServersByDc(topologyInfo, *dc) + volumeServers := collectVolumeServersByDcRackNode(topologyInfo, *dc, *racks, *nodes) volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) diskTypes := collectVolumeDiskTypes(topologyInfo) @@ -142,13 +145,19 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskT return nil } -func collectVolumeServersByDc(t *master_pb.TopologyInfo, selectedDataCenter string) (nodes []*Node) { +func collectVolumeServersByDcRackNode(t *master_pb.TopologyInfo, selectedDataCenter string, selectedRacks string, selectedNodes string) (nodes []*Node) { for _, dc := range t.DataCenterInfos { if selectedDataCenter != "" && dc.Id != selectedDataCenter { continue } for _, r := range dc.RackInfos { + if selectedRacks != "" && !strings.Contains(selectedRacks, r.Id) { + continue + } for _, dn := range r.DataNodeInfos { + if selectedNodes != "" && !strings.Contains(selectedNodes, dn.Id) { + continue + } nodes = append(nodes, &Node{ info: dn, dc: dc.Id, @@ -325,7 +334,6 @@ func attemptToMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][] } func maybeMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolume *master_pb.VolumeInformationMessage, emptyNode *Node, applyChange bool) (hasMoved bool, err error) { - if !commandEnv.isLocked() { return false, fmt.Errorf("lock is lost") } diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index fb39e063f..4e60f6ff8 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -251,7 +251,7 @@ func TestIsGoodMove(t *testing.T) { func TestBalance(t *testing.T) { topologyInfo := parseOutput(topoData) - volumeServers := collectVolumeServersByDc(topologyInfo, "") + volumeServers := collectVolumeServersByDcRackNode(topologyInfo, "", "", "") volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) diskTypes := collectVolumeDiskTypes(topologyInfo) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 3b593d9be..5c7c903de 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -110,7 +110,7 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server - volumeServers := collectVolumeServersByDc(c.topologyInfo, "") + volumeServers := collectVolumeServersByDcRackNode(c.topologyInfo, "", "", "") thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) @@ -124,7 +124,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE fmt.Fprintf(writer, "update topologyInfo %v", err) } else { _, otherNodesNew := c.nodesOtherThan( - collectVolumeServersByDc(topologyInfo, ""), volumeServer) + collectVolumeServersByDcRackNode(topologyInfo, "", "", ""), volumeServer) if len(otherNodesNew) > 0 { otherNodes = otherNodesNew c.topologyInfo = topologyInfo