diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 96cfc40db..e91656931 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -39,8 +39,6 @@ type EcRack struct { freeEcSlot int } -// TODO: We're shuffling way too many parameters between internal functions. Encapsulate in a ecBalancer{} struct. - var ( // Overridable functions for testing. getDefaultReplicaPlacement = _getDefaultReplicaPlacement @@ -421,10 +419,16 @@ func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string] return groupMap } -func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack { - // collect racks info +type ecBalancer struct { + commandEnv *CommandEnv + ecNodes []*EcNode + replicaPlacement *super_block.ReplicaPlacement + applyBalancing bool +} + +func (ecb *ecBalancer) racks() map[RackId]*EcRack { racks := make(map[RackId]*EcRack) - for _, ecNode := range allEcNodes { + for _, ecNode := range ecb.ecNodes { if racks[ecNode.rack] == nil { racks[ecNode.rack] = &EcRack{ ecNodes: make(map[EcNodeId]*EcNode), @@ -436,39 +440,38 @@ func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack { return racks } -func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, rp *super_block.ReplicaPlacement, applyBalancing bool) error { +func (ecb *ecBalancer) balanceEcVolumes(collection string) error { fmt.Printf("balanceEcVolumes %s\n", collection) - if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil { + if err := ecb.deleteDuplicatedEcShards(collection); err != nil { return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err) } - if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, rp, applyBalancing); err != nil { + if err := ecb.balanceEcShardsAcrossRacks(collection); err != nil { return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err) } - if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, rp, applyBalancing); err != nil { + if err := ecb.balanceEcShardsWithinRacks(collection); err != nil { return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err) } return nil } -func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error { +func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error { // vid => []ecNode - vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection) + vidLocations := ecb.collectVolumeIdToEcNodes(collection) // deduplicate ec shards for vid, locations := range vidLocations { - if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil { + if err := ecb.doDeduplicateEcShards(collection, vid, locations); err != nil { return err } } return nil } -func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error { - +func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error { // check whether this volume has ecNodes that are over average shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount) for _, ecNode := range locations { @@ -483,16 +486,16 @@ func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle } sortEcNodesByFreeslotsAscending(ecNodes) fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id) - if !applyBalancing { + if !ecb.applyBalancing { continue } duplicatedShardIds := []uint32{uint32(shardId)} for _, ecNode := range ecNodes[1:] { - if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { + if err := unmountEcShards(ecb.commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { return err } - if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { + if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { return err } ecNode.deleteEcVolumeShards(vid, duplicatedShardIds) @@ -501,12 +504,12 @@ func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle return nil } -func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, rp *super_block.ReplicaPlacement, applyBalancing bool) error { +func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { // collect vid => []ecNode, since previous steps can change the locations - vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection) + vidLocations := ecb.collectVolumeIdToEcNodes(collection) // spread the ec shards evenly for vid, locations := range vidLocations { - if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, rp, applyBalancing); err != nil { + if err := ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations); err != nil { return err } } @@ -521,7 +524,9 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int } // TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. -func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, rp *super_block.ReplicaPlacement, applyBalancing bool) error { +func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error { + racks := ecb.racks() + // calculate average number of shards an ec rack should have for one volume averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) @@ -544,7 +549,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid } for shardId, ecNode := range ecShardsToMove { - rackId, err := pickRackToBalanceShardsInto(racks, rackToShardCount, rp, averageShardsPerEcRack) + rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount, averageShardsPerEcRack) if err != nil { fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error()) continue @@ -554,7 +559,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid for _, n := range racks[rackId].ecNodes { possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) } - err = pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, rp, applyBalancing) + err = ecb.pickOneEcNodeAndMoveOneShard(averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes) if err != nil { return err } @@ -567,7 +572,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid return nil } -func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) (RackId, error) { +func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) (RackId, error) { targets := []RackId{} targetShards := -1 for _, shards := range rackToShardCount { @@ -584,8 +589,8 @@ func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCo details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId) continue } - if replicaPlacement != nil && shards >= replicaPlacement.DiffRackCount { - details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for other racks (%d)\n", rackId, shards, replicaPlacement.DiffRackCount) + if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.DiffRackCount { + details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount) continue } if shards >= averageShardsPerEcRack { @@ -608,9 +613,10 @@ func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCo return targets[rand.IntN(len(targets))], nil } -func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, rp *super_block.ReplicaPlacement, applyBalancing bool) error { +func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { // collect vid => []ecNode, since previous steps can change the locations - vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection) + vidLocations := ecb.collectVolumeIdToEcNodes(collection) + racks := ecb.racks() // spread the ec shards evenly for vid, locations := range vidLocations { @@ -631,7 +637,7 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra } sourceEcNodes := rackEcNodesWithVid[rackId] averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) - if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, rp, applyBalancing); err != nil { + if err := ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes); err != nil { return err } } @@ -639,8 +645,7 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra return nil } -func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, rp *super_block.ReplicaPlacement, applyBalancing bool) error { - +func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error { for _, ecNode := range existingLocations { shardBits := findEcVolumeShards(ecNode, vid) @@ -654,7 +659,7 @@ func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNo fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId) - err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, rp, applyBalancing) + err := ecb.pickOneEcNodeAndMoveOneShard(averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes) if err != nil { return err } @@ -666,19 +671,17 @@ func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNo return nil } -func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error { - +func (ecb *ecBalancer) balanceEcRacks() error { // balance one rack for all ec shards - for _, ecRack := range racks { - if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil { + for _, ecRack := range ecb.racks() { + if err := ecb.doBalanceEcRack(ecRack); err != nil { return err } } return nil } -func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error { - +func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { if len(ecRack.ecNodes) <= 1 { return nil } @@ -729,7 +732,7 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) - err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing) + err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing) if err != nil { return err } @@ -749,7 +752,7 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool return nil } -func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcNode int) (*EcNode, error) { +func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode, averageShardsPerEcNode int) (*EcNode, error) { if existingLocation == nil { return nil, fmt.Errorf("INTERNAL: missing source nodes") } @@ -781,8 +784,8 @@ func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode } shards := nodeShards[node] - if replicaPlacement != nil && shards >= replicaPlacement.SameRackCount { - details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, replicaPlacement.SameRackCount) + if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.SameRackCount { + details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount) continue } if shards >= averageShardsPerEcNode { @@ -808,15 +811,15 @@ func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode } // TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. -func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, rp *super_block.ReplicaPlacement, applyBalancing bool) error { - destNode, err := pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, rp, averageShardsPerEcNode) +func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error { + destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, averageShardsPerEcNode) if err != nil { fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error()) return nil } fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) - return moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destNode, applyBalancing) + return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing) } func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { @@ -859,9 +862,9 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ return picked } -func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needle.VolumeId][]*EcNode { +func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode { vidLocations := make(map[needle.VolumeId][]*EcNode) - for _, ecNode := range allEcNodes { + for _, ecNode := range ecb.ecNodes { diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] if !found { continue @@ -876,9 +879,9 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl return vidLocations } -// TODO: EC volumes have no topology replica placement info :( We need a better solution to resolve topology, and balancing, for those. -func volumeIdToReplicaPlacement(commandEnv *CommandEnv, vid needle.VolumeId, nodes []*EcNode, ecReplicaPlacement *super_block.ReplicaPlacement) (*super_block.ReplicaPlacement, error) { - for _, ecNode := range nodes { +// TODO: Unused, delete me. +func (ecb *ecBalancer) volumeIdToReplicaPlacement(vid needle.VolumeId) (*super_block.ReplicaPlacement, error) { + for _, ecNode := range ecb.ecNodes { for _, diskInfo := range ecNode.info.DiskInfos { for _, volumeInfo := range diskInfo.VolumeInfos { if needle.VolumeId(volumeInfo.Id) == vid { @@ -887,7 +890,7 @@ func volumeIdToReplicaPlacement(commandEnv *CommandEnv, vid needle.VolumeId, nod } for _, ecShardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(ecShardInfo.Id) == vid { - return ecReplicaPlacement, nil + return ecb.replicaPlacement, nil } } } @@ -910,14 +913,19 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots) } - racks := collectRacks(allEcNodes) + ecb := &ecBalancer{ + commandEnv: commandEnv, + ecNodes: allEcNodes, + replicaPlacement: ecReplicaPlacement, + applyBalancing: applyBalancing, + } + for _, c := range collections { - if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, ecReplicaPlacement, applyBalancing); err != nil { + if err = ecb.balanceEcVolumes(c); err != nil { return err } } - - if err := balanceEcRacks(commandEnv, racks, applyBalancing); err != nil { + if err := ecb.balanceEcRacks(); err != nil { return fmt.Errorf("balance ec racks: %v", err) } diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index d4fde9e55..b5ea2efa8 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -114,10 +114,15 @@ func TestVolumeIdToReplicaPlacement(t *testing.T) { } for _, tc := range testCases { - commandEnv := &CommandEnv{} vid, _ := needle.NewVolumeId(tc.vid) ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") - got, gotErr := volumeIdToReplicaPlacement(commandEnv, vid, ecNodes, ecReplicaPlacement) + + ecb := ecBalancer{ + ecNodes: ecNodes, + replicaPlacement: ecReplicaPlacement, + } + + got, gotErr := ecb.volumeIdToReplicaPlacement(vid) if err := errorCheck(gotErr, tc.wantErr); err != nil { t.Errorf("volume %q: %s", tc.vid, err.Error()) @@ -163,14 +168,18 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") - racks := collectRacks(ecNodes) rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement) - locations := ecNodes - rackToShardCount := countShardsByRack(vid, locations) + ecb := &ecBalancer{ + ecNodes: ecNodes, + replicaPlacement: rp, + } + + racks := ecb.racks() + rackToShardCount := countShardsByRack(vid, ecNodes) averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) - got, gotErr := pickRackToBalanceShardsInto(racks, rackToShardCount, rp, averageShardsPerEcRack) + got, gotErr := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount, averageShardsPerEcRack) if err := errorCheck(gotErr, tc.wantErr); err != nil { t.Errorf("volume %q: %s", tc.vid, err.Error()) continue @@ -193,27 +202,25 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { } func TestPickEcNodeToBalanceShardsInto(t *testing.T) { testCases := []struct { - topology *master_pb.TopologyInfo - nodeId string - vid string - replicaPlacement string - wantOneOf []string - wantErr string + topology *master_pb.TopologyInfo + nodeId string + vid string + wantOneOf []string + wantErr string }{ - {topologyEc, "", "", "", nil, "INTERNAL: missing source nodes"}, - {topologyEc, "idontexist", "12737", "", nil, "INTERNAL: missing source nodes"}, + {topologyEc, "", "", nil, "INTERNAL: missing source nodes"}, + {topologyEc, "idontexist", "12737", nil, "INTERNAL: missing source nodes"}, // Non-EC nodes. We don't care about these, but the function should return all available target nodes as a safeguard. { - topologyEc, "172.19.0.10:8702", "6225", "123", - []string{ + topologyEc, "172.19.0.10:8702", "6225", []string{ "172.19.0.13:8701", "172.19.0.14:8711", "172.19.0.16:8704", "172.19.0.17:8703", "172.19.0.19:8700", "172.19.0.20:8706", "172.19.0.21:8710", "172.19.0.3:8708", "172.19.0.4:8707", "172.19.0.5:8705", "172.19.0.6:8713", "172.19.0.8:8709", "172.19.0.9:8712"}, "", - }, { - topologyEc, "172.19.0.8:8709", "6226", "123", - []string{ + }, + { + topologyEc, "172.19.0.8:8709", "6226", []string{ "172.19.0.10:8702", "172.19.0.13:8701", "172.19.0.14:8711", "172.19.0.16:8704", "172.19.0.17:8703", "172.19.0.19:8700", "172.19.0.20:8706", "172.19.0.21:8710", "172.19.0.3:8708", "172.19.0.4:8707", "172.19.0.5:8705", "172.19.0.6:8713", @@ -221,45 +228,27 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) { "", }, // EC volumes. - { - topologyEc, "172.19.0.10:8702", "14322", "", - nil, "Skipped 172.19.0.13:8701 because shards 1 >= replica placement limit for the rack (0)", - }, { - topologyEc, "172.19.0.10:8702", "14322", "210", - nil, "Skipped 172.19.0.5:8705 because shards 0 >= replica placement limit for the rack (0)", - }, { - topologyEc, "172.19.0.10:8702", "9577", "110", - nil, "Skipped 172.19.0.4:8707 because shards 1 >= replica placement limit for the rack (0)", - }, { - topologyEc, "172.19.0.10:8702", "9577", "111", - nil, "Skipped 172.19.0.4:8707 because shards 1 >= replica placement limit for the rack (1)", - }, { - topologyEc, "172.19.0.10:8702", "9577", "113", - []string{ - "172.19.0.13:8701", "172.19.0.14:8711", "172.19.0.16:8704", "172.19.0.17:8703", - "172.19.0.19:8700", "172.19.0.20:8706", "172.19.0.21:8710", "172.19.0.3:8708", - "172.19.0.4:8707", "172.19.0.5:8705", "172.19.0.6:8713", "172.19.0.8:8709", - "172.19.0.9:8712"}, - "", - }, { - topologyEc, "172.19.0.10:8702", "14322", "222", - []string{"172.19.0.14:8711", "172.19.0.5:8705", "172.19.0.6:8713"}, "", - }, { - topologyEc, "172.19.0.13:8701", "10457", "222", - []string{"172.19.0.10:8702", "172.19.0.6:8713"}, "", - }, { - topologyEc, "172.19.0.17:8703", "12737", "222", - []string{"172.19.0.13:8701"}, "", - }, { - topologyEc, "172.19.0.20:8706", "14322", "222", - []string{"172.19.0.14:8711", "172.19.0.5:8705", "172.19.0.6:8713"}, "", - }, + {topologyEc, "172.19.0.10:8702", "14322", []string{ + "172.19.0.14:8711", "172.19.0.5:8705", "172.19.0.6:8713"}, + ""}, + {topologyEc, "172.19.0.13:8701", "10457", []string{ + "172.19.0.10:8702", "172.19.0.6:8713"}, + ""}, + {topologyEc, "172.19.0.17:8703", "12737", []string{ + "172.19.0.13:8701"}, + ""}, + {topologyEc, "172.19.0.20:8706", "14322", []string{ + "172.19.0.14:8711", "172.19.0.5:8705", "172.19.0.6:8713"}, + ""}, } for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "") - rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement) + + ecb := &ecBalancer{ + ecNodes: allEcNodes, + } // Resolve target node by name var ecNode *EcNode @@ -271,7 +260,7 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) { } averageShardsPerEcNode := 5 - got, gotErr := pickEcNodeToBalanceShardsInto(vid, ecNode, allEcNodes, rp, averageShardsPerEcNode) + got, gotErr := ecb.pickEcNodeToBalanceShardsInto(vid, ecNode, allEcNodes, averageShardsPerEcNode) if err := errorCheck(gotErr, tc.wantErr); err != nil { t.Errorf("node %q, volume %q: %s", tc.nodeId, tc.vid, err.Error()) continue diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index 50e06ba6c..33befce8f 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -21,106 +21,111 @@ func TestCommandEcDistribution(t *testing.T) { } func TestCommandEcBalanceSmall(t *testing.T) { - - allEcNodes := []*EcNode{ - newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), - newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + ecb := &ecBalancer{ + ecNodes: []*EcNode{ + newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + }, + applyBalancing: false, } - racks := collectRacks(allEcNodes) - balanceEcVolumes(nil, "c1", allEcNodes, racks, nil, false) + ecb.balanceEcVolumes("c1") } func TestCommandEcBalanceNothingToMove(t *testing.T) { - - allEcNodes := []*EcNode{ - newEcNode("dc1", "rack1", "dn1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), - newEcNode("dc1", "rack1", "dn2", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), + ecb := &ecBalancer{ + ecNodes: []*EcNode{ + newEcNode("dc1", "rack1", "dn1", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack1", "dn2", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), + }, + applyBalancing: false, } - racks := collectRacks(allEcNodes) - balanceEcVolumes(nil, "c1", allEcNodes, racks, nil, false) + ecb.balanceEcVolumes("c1") } func TestCommandEcBalanceAddNewServers(t *testing.T) { - - allEcNodes := []*EcNode{ - newEcNode("dc1", "rack1", "dn1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), - newEcNode("dc1", "rack1", "dn2", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), - newEcNode("dc1", "rack1", "dn3", 100), - newEcNode("dc1", "rack1", "dn4", 100), + ecb := &ecBalancer{ + ecNodes: []*EcNode{ + newEcNode("dc1", "rack1", "dn1", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack1", "dn2", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), + newEcNode("dc1", "rack1", "dn3", 100), + newEcNode("dc1", "rack1", "dn4", 100), + }, + applyBalancing: false, } - racks := collectRacks(allEcNodes) - balanceEcVolumes(nil, "c1", allEcNodes, racks, nil, false) + ecb.balanceEcVolumes("c1") } func TestCommandEcBalanceAddNewRacks(t *testing.T) { - - allEcNodes := []*EcNode{ - newEcNode("dc1", "rack1", "dn1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), - newEcNode("dc1", "rack1", "dn2", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), - newEcNode("dc1", "rack2", "dn3", 100), - newEcNode("dc1", "rack2", "dn4", 100), + ecb := &ecBalancer{ + ecNodes: []*EcNode{ + newEcNode("dc1", "rack1", "dn1", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack1", "dn2", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), + newEcNode("dc1", "rack2", "dn3", 100), + newEcNode("dc1", "rack2", "dn4", 100), + }, + applyBalancing: false, } - racks := collectRacks(allEcNodes) - balanceEcVolumes(nil, "c1", allEcNodes, racks, nil, false) + ecb.balanceEcVolumes("c1") } func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) { + ecb := ecBalancer{ + ecNodes: []*EcNode{ + newEcNode("dc1", "rack1", "dn_shared", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{0}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{0}), - allEcNodes := []*EcNode{ - newEcNode("dc1", "rack1", "dn_shared", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{0}), + newEcNode("dc1", "rack1", "dn_a1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{1}), + newEcNode("dc1", "rack1", "dn_a2", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{2}), + newEcNode("dc1", "rack1", "dn_a3", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{3}), + newEcNode("dc1", "rack1", "dn_a4", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{4}), + newEcNode("dc1", "rack1", "dn_a5", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{5}), + newEcNode("dc1", "rack1", "dn_a6", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{6}), + newEcNode("dc1", "rack1", "dn_a7", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{7}), + newEcNode("dc1", "rack1", "dn_a8", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{8}), + newEcNode("dc1", "rack1", "dn_a9", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{9}), + newEcNode("dc1", "rack1", "dn_a10", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{10}), + newEcNode("dc1", "rack1", "dn_a11", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{11}), + newEcNode("dc1", "rack1", "dn_a12", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{12}), + newEcNode("dc1", "rack1", "dn_a13", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{13}), - newEcNode("dc1", "rack1", "dn_a1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{1}), - newEcNode("dc1", "rack1", "dn_a2", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{2}), - newEcNode("dc1", "rack1", "dn_a3", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{3}), - newEcNode("dc1", "rack1", "dn_a4", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{4}), - newEcNode("dc1", "rack1", "dn_a5", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{5}), - newEcNode("dc1", "rack1", "dn_a6", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{6}), - newEcNode("dc1", "rack1", "dn_a7", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{7}), - newEcNode("dc1", "rack1", "dn_a8", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{8}), - newEcNode("dc1", "rack1", "dn_a9", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{9}), - newEcNode("dc1", "rack1", "dn_a10", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{10}), - newEcNode("dc1", "rack1", "dn_a11", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{11}), - newEcNode("dc1", "rack1", "dn_a12", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{12}), - newEcNode("dc1", "rack1", "dn_a13", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{13}), + newEcNode("dc1", "rack1", "dn_b1", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{1}), + newEcNode("dc1", "rack1", "dn_b2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{2}), + newEcNode("dc1", "rack1", "dn_b3", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{3}), + newEcNode("dc1", "rack1", "dn_b4", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{4}), + newEcNode("dc1", "rack1", "dn_b5", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{5}), + newEcNode("dc1", "rack1", "dn_b6", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{6}), + newEcNode("dc1", "rack1", "dn_b7", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{7}), + newEcNode("dc1", "rack1", "dn_b8", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{8}), + newEcNode("dc1", "rack1", "dn_b9", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{9}), + newEcNode("dc1", "rack1", "dn_b10", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{10}), + newEcNode("dc1", "rack1", "dn_b11", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{11}), + newEcNode("dc1", "rack1", "dn_b12", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{12}), + newEcNode("dc1", "rack1", "dn_b13", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{13}), - newEcNode("dc1", "rack1", "dn_b1", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{1}), - newEcNode("dc1", "rack1", "dn_b2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{2}), - newEcNode("dc1", "rack1", "dn_b3", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{3}), - newEcNode("dc1", "rack1", "dn_b4", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{4}), - newEcNode("dc1", "rack1", "dn_b5", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{5}), - newEcNode("dc1", "rack1", "dn_b6", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{6}), - newEcNode("dc1", "rack1", "dn_b7", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{7}), - newEcNode("dc1", "rack1", "dn_b8", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{8}), - newEcNode("dc1", "rack1", "dn_b9", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{9}), - newEcNode("dc1", "rack1", "dn_b10", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{10}), - newEcNode("dc1", "rack1", "dn_b11", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{11}), - newEcNode("dc1", "rack1", "dn_b12", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{12}), - newEcNode("dc1", "rack1", "dn_b13", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{13}), - - newEcNode("dc1", "rack1", "dn3", 100), + newEcNode("dc1", "rack1", "dn3", 100), + }, + applyBalancing: false, } - racks := collectRacks(allEcNodes) - balanceEcVolumes(nil, "c1", allEcNodes, racks, nil, false) - balanceEcRacks(nil, racks, false) + ecb.balanceEcVolumes("c1") + ecb.balanceEcRacks() } func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode {