diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 15d47eee7..043c70366 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -69,7 +69,7 @@ func (c *commandEcBalance) Help() string { volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack ecShardsToMove = select overflown ec shards from volumeServersOverAverage for each ecShardsToMove { - destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, averageShardCount) + destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, averageShardCount, ecShardReplicaPlacement) pickOneEcNodeAndMoveOneShard(destVolumeServers) } } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 7510a5277..96cfc40db 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -448,7 +448,7 @@ func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*E return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err) } - if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil { + if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, rp, applyBalancing); err != nil { return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err) } @@ -544,7 +544,6 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid } for shardId, ecNode := range ecShardsToMove { - // TODO: consider volume replica info when balancing racks rackId, err := pickRackToBalanceShardsInto(racks, rackToShardCount, rp, averageShardsPerEcRack) if err != nil { fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error()) @@ -555,7 +554,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid for _, n := range racks[rackId].ecNodes { possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) } - err = pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing) + err = pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, rp, applyBalancing) if err != nil { return err } @@ -609,7 +608,7 @@ func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCo return targets[rand.IntN(len(targets))], nil } -func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error { +func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, rp *super_block.ReplicaPlacement, applyBalancing bool) error { // collect vid => []ecNode, since previous steps can change the locations vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection) @@ -632,7 +631,7 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra } sourceEcNodes := rackEcNodesWithVid[rackId] averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) - if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil { + if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, rp, applyBalancing); err != nil { return err } } @@ -640,7 +639,7 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra return nil } -func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { +func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, rp *super_block.ReplicaPlacement, applyBalancing bool) error { for _, ecNode := range existingLocations { @@ -655,7 +654,7 @@ func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNo fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId) - err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing) + err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, rp, applyBalancing) if err != nil { return err } @@ -809,9 +808,8 @@ func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode } // TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. -func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { - // TODO: consider volume replica info when balancing nodes - destNode, err := pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, nil, averageShardsPerEcNode) +func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, rp *super_block.ReplicaPlacement, applyBalancing bool) error { + destNode, err := pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, rp, averageShardsPerEcNode) if err != nil { fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error()) return nil diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index 29d7c2d4b..d4fde9e55 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -193,25 +193,27 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { } func TestPickEcNodeToBalanceShardsInto(t *testing.T) { testCases := []struct { - topology *master_pb.TopologyInfo - nodeId string - vid string - wantOneOf []string - wantErr string + topology *master_pb.TopologyInfo + nodeId string + vid string + replicaPlacement string + wantOneOf []string + wantErr string }{ - {topologyEc, "", "", nil, "INTERNAL: missing source nodes"}, - {topologyEc, "idontexist", "12737", nil, "INTERNAL: missing source nodes"}, + {topologyEc, "", "", "", nil, "INTERNAL: missing source nodes"}, + {topologyEc, "idontexist", "12737", "", nil, "INTERNAL: missing source nodes"}, // Non-EC nodes. We don't care about these, but the function should return all available target nodes as a safeguard. { - topologyEc, "172.19.0.10:8702", "6225", []string{ + topologyEc, "172.19.0.10:8702", "6225", "123", + []string{ "172.19.0.13:8701", "172.19.0.14:8711", "172.19.0.16:8704", "172.19.0.17:8703", "172.19.0.19:8700", "172.19.0.20:8706", "172.19.0.21:8710", "172.19.0.3:8708", "172.19.0.4:8707", "172.19.0.5:8705", "172.19.0.6:8713", "172.19.0.8:8709", "172.19.0.9:8712"}, "", - }, - { - topologyEc, "172.19.0.8:8709", "6226", []string{ + }, { + topologyEc, "172.19.0.8:8709", "6226", "123", + []string{ "172.19.0.10:8702", "172.19.0.13:8701", "172.19.0.14:8711", "172.19.0.16:8704", "172.19.0.17:8703", "172.19.0.19:8700", "172.19.0.20:8706", "172.19.0.21:8710", "172.19.0.3:8708", "172.19.0.4:8707", "172.19.0.5:8705", "172.19.0.6:8713", @@ -219,23 +221,45 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) { "", }, // EC volumes. - {topologyEc, "172.19.0.10:8702", "14322", []string{ - "172.19.0.14:8711", "172.19.0.5:8705", "172.19.0.6:8713"}, - ""}, - {topologyEc, "172.19.0.13:8701", "10457", []string{ - "172.19.0.10:8702", "172.19.0.6:8713"}, - ""}, - {topologyEc, "172.19.0.17:8703", "12737", []string{ - "172.19.0.13:8701"}, - ""}, - {topologyEc, "172.19.0.20:8706", "14322", []string{ - "172.19.0.14:8711", "172.19.0.5:8705", "172.19.0.6:8713"}, - ""}, + { + topologyEc, "172.19.0.10:8702", "14322", "", + nil, "Skipped 172.19.0.13:8701 because shards 1 >= replica placement limit for the rack (0)", + }, { + topologyEc, "172.19.0.10:8702", "14322", "210", + nil, "Skipped 172.19.0.5:8705 because shards 0 >= replica placement limit for the rack (0)", + }, { + topologyEc, "172.19.0.10:8702", "9577", "110", + nil, "Skipped 172.19.0.4:8707 because shards 1 >= replica placement limit for the rack (0)", + }, { + topologyEc, "172.19.0.10:8702", "9577", "111", + nil, "Skipped 172.19.0.4:8707 because shards 1 >= replica placement limit for the rack (1)", + }, { + topologyEc, "172.19.0.10:8702", "9577", "113", + []string{ + "172.19.0.13:8701", "172.19.0.14:8711", "172.19.0.16:8704", "172.19.0.17:8703", + "172.19.0.19:8700", "172.19.0.20:8706", "172.19.0.21:8710", "172.19.0.3:8708", + "172.19.0.4:8707", "172.19.0.5:8705", "172.19.0.6:8713", "172.19.0.8:8709", + "172.19.0.9:8712"}, + "", + }, { + topologyEc, "172.19.0.10:8702", "14322", "222", + []string{"172.19.0.14:8711", "172.19.0.5:8705", "172.19.0.6:8713"}, "", + }, { + topologyEc, "172.19.0.13:8701", "10457", "222", + []string{"172.19.0.10:8702", "172.19.0.6:8713"}, "", + }, { + topologyEc, "172.19.0.17:8703", "12737", "222", + []string{"172.19.0.13:8701"}, "", + }, { + topologyEc, "172.19.0.20:8706", "14322", "222", + []string{"172.19.0.14:8711", "172.19.0.5:8705", "172.19.0.6:8713"}, "", + }, } for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement) // Resolve target node by name var ecNode *EcNode @@ -247,7 +271,7 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) { } averageShardsPerEcNode := 5 - got, gotErr := pickEcNodeToBalanceShardsInto(vid, ecNode, allEcNodes, nil, averageShardsPerEcNode) + got, gotErr := pickEcNodeToBalanceShardsInto(vid, ecNode, allEcNodes, rp, averageShardsPerEcNode) if err := errorCheck(gotErr, tc.wantErr); err != nil { t.Errorf("node %q, volume %q: %s", tc.nodeId, tc.vid, err.Error()) continue