diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 6004ad7e4..24dea3e2d 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -17,10 +17,11 @@ func (c *commandEcBalance) Name() string { return "ec.balance" } +// TODO: Update help string and move to command_ec_common.go once shard replica placement logic is enabled. func (c *commandEcBalance) Help() string { return `balance all ec shards among all racks and volume servers - ec.balance [-c EACH_COLLECTION|] [-force] [-dataCenter ] + ec.balance [-c EACH_COLLECTION|] [-force] [-dataCenter ] [-shardReplicaPlacement ] Algorithm: @@ -100,6 +101,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") + shardReplicaPlacement := balanceCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty (currently unused)") applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan") if err = balanceCommand.Parse(args); err != nil { return nil @@ -121,5 +123,10 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W } fmt.Printf("balanceEcVolumes collections %+v\n", len(collections)) - return EcBalance(commandEnv, collections, *dc, *applyBalancing) + rp, err := parseReplicaPlacementArg(commandEnv, *shardReplicaPlacement) + if err != nil { + return err + } + + return EcBalance(commandEnv, collections, *dc, rp, *applyBalancing) } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 3b4a0ff25..906e2e0dc 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -39,6 +39,42 @@ type EcRack struct { freeEcSlot int } +var ( + // Overridable functions for testing. + getDefaultReplicaPlacement = _getDefaultReplicaPlacement +) + +func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { + var resp *master_pb.GetMasterConfigurationResponse + var err error + + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + return err + }) + if err != nil { + return nil, err + } + + return super_block.NewReplicaPlacementFromString(resp.DefaultReplication) +} +func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) { + if replicaStr != "" { + rp, err := super_block.NewReplicaPlacementFromString(replicaStr) + if err == nil { + fmt.Printf("using replica placement %q for EC volumes\n", rp.String()) + } + return rp, err + } + + // No replica placement argument provided, resolve from master default settings. + rp, err := getDefaultReplicaPlacement(commandEnv) + if err == nil { + fmt.Printf("using master default replica placement %q for EC volumes\n", rp.String()) + } + return rp, err +} + func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { if !commandEnv.isLocked() { @@ -840,15 +876,19 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl return vidLocations } -// TODO: EC volumes have no replica placement info :( Maybe rely on the master's default? -func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) { +// TODO: EC volumes have no topology replica placement info :( We need a better solution to resolve topology, and balancing, for those. +func volumeIdToReplicaPlacement(commandEnv *CommandEnv, vid needle.VolumeId, nodes []*EcNode, ecReplicaPlacement *super_block.ReplicaPlacement) (*super_block.ReplicaPlacement, error) { for _, ecNode := range nodes { for _, diskInfo := range ecNode.info.DiskInfos { for _, volumeInfo := range diskInfo.VolumeInfos { - if needle.VolumeId(volumeInfo.Id) != vid { - continue + if needle.VolumeId(volumeInfo.Id) == vid { + return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) + } + } + for _, ecShardInfo := range diskInfo.EcShardInfos { + if needle.VolumeId(ecShardInfo.Id) == vid { + return ecReplicaPlacement, nil } - return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) } } } @@ -856,22 +896,7 @@ func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_bl return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid) } -func getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { - var resp *master_pb.GetMasterConfigurationResponse - var err error - - err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) - return err - }) - if err != nil { - return nil, err - } - - return super_block.NewReplicaPlacementFromString(resp.DefaultReplication) -} - -func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) { +func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, applyBalancing bool) (err error) { if len(collections) == 0 { return fmt.Errorf("no collections to balance") } diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index 08e4a41c7..76609c89d 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -32,6 +32,40 @@ func errorCheck(got error, want string) error { } return nil } +func TestParseReplicaPlacementArg(t *testing.T) { + getDefaultReplicaPlacementOrig := getDefaultReplicaPlacement + getDefaultReplicaPlacement = func(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { + return super_block.NewReplicaPlacementFromString("123") + } + defer func() { + getDefaultReplicaPlacement = getDefaultReplicaPlacementOrig + }() + + testCases := []struct { + argument string + want string + wantErr string + }{ + {"lalala", "lal", "unexpected replication type"}, + {"", "123", ""}, + {"021", "021", ""}, + } + + for _, tc := range testCases { + commandEnv := &CommandEnv{} + got, gotErr := parseReplicaPlacementArg(commandEnv, tc.argument) + + if err := errorCheck(gotErr, tc.wantErr); err != nil { + t.Errorf("argument %q: %s", tc.argument, err.Error()) + continue + } + + want, _ := super_block.NewReplicaPlacementFromString(tc.want) + if !got.Equals(want) { + t.Errorf("got replica placement %q, want %q", got.String(), want.String()) + } + } +} func TestEcDistribution(t *testing.T) { @@ -55,26 +89,35 @@ func TestEcDistribution(t *testing.T) { } func TestVolumeIdToReplicaPlacement(t *testing.T) { + ecReplicaPlacement, _ := super_block.NewReplicaPlacementFromString("123") + testCases := []struct { topology *master_pb.TopologyInfo vid string want string wantErr string }{ - {topology1, "", "", "failed to resolve replica placement for volume ID 0"}, - {topology1, "0", "", "failed to resolve replica placement for volume ID 0"}, + {topology1, "", "", "failed to resolve replica placement"}, + {topology1, "0", "", "failed to resolve replica placement"}, {topology1, "1", "100", ""}, {topology1, "296", "100", ""}, - {topology2, "", "", "failed to resolve replica placement for volume ID 0"}, - {topology2, "19012", "", "failed to resolve replica placement for volume ID 19012"}, + {topology2, "", "", "failed to resolve replica placement"}, + {topology2, "19012", "", "failed to resolve replica placement"}, {topology2, "6271", "002", ""}, {topology2, "17932", "002", ""}, + {topologyEc, "", "", "failed to resolve replica placement"}, + {topologyEc, "0", "", "failed to resolve replica placement"}, + {topologyEc, "6225", "002", ""}, + {topologyEc, "6241", "002", ""}, + {topologyEc, "9577", "123", ""}, // EC volume + {topologyEc, "12737", "123", ""}, // EC volume } for _, tc := range testCases { + commandEnv := &CommandEnv{} vid, _ := needle.NewVolumeId(tc.vid) ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") - got, gotErr := volumeIdToReplicaPlacement(vid, ecNodes) + got, gotErr := volumeIdToReplicaPlacement(commandEnv, vid, ecNodes, ecReplicaPlacement) if err := errorCheck(gotErr, tc.wantErr); err != nil { t.Errorf("volume %q: %s", tc.vid, err.Error())