diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index fd7a1acdc..b98921fd7 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,7 +3,6 @@ package shell import ( "context" "fmt" - "math" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -12,11 +11,32 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "golang.org/x/exp/slices" "google.golang.org/grpc" ) +type DataCenterId string +type EcNodeId string +type RackId string + +type EcNode struct { + info *master_pb.DataNodeInfo + dc DataCenterId + rack RackId + freeEcSlot int +} +type CandidateEcNode struct { + ecNode *EcNode + shardCount int +} + +type EcRack struct { + ecNodes map[EcNodeId]*EcNode + freeEcSlot int +} + func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { if !commandEnv.isLocked() { @@ -68,7 +88,6 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if targetAddress != existingLocation { - fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), @@ -109,6 +128,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, return } +// TODO: Make dc a DataCenterId instead of string. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) { for _, dc := range topo.DataCenterInfos { for _, rack := range dc.RackInfos { @@ -131,11 +151,6 @@ func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) { }) } -type CandidateEcNode struct { - ecNode *EcNode - shardCount int -} - // if the index node changed the freeEcSlot, need to keep every EcNode still sorted func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) { for i := index - 1; i >= 0; i-- { @@ -179,16 +194,6 @@ func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (c return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos) } -type RackId string -type EcNodeId string - -type EcNode struct { - info *master_pb.DataNodeInfo - dc string - rack RackId - freeEcSlot int -} - func (ecNode *EcNode) localShardIdCount(vid uint32) int { for _, diskInfo := range ecNode.info.DiskInfos { for _, ecShardInfo := range diskInfo.EcShardInfos { @@ -201,13 +206,7 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int { return 0 } -type EcRack struct { - ecNodes map[EcNodeId]*EcNode - freeEcSlot int -} - func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - // list all possible locations // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) @@ -232,7 +231,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) ecNodes = append(ecNodes, &EcNode{ info: dn, - dc: dc, + dc: DataCenterId(dc), rack: rack, freeEcSlot: int(freeEcSlots), }) @@ -283,8 +282,12 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n }) } -func ceilDivide(total, n int) int { - return int(math.Ceil(float64(total) / float64(n))) +func ceilDivide(a, b int) int { + var r int + if (a % b) != 0 { + r = 1 + } + return (a / b) + r } func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { @@ -772,6 +775,21 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl return vidLocations } +func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) { + for _, ecNode := range nodes { + for _, diskInfo := range ecNode.info.DiskInfos { + for _, volumeInfo := range diskInfo.VolumeInfos { + if needle.VolumeId(volumeInfo.Id) != vid { + continue + } + return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) + } + } + } + + return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid) +} + func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) { if len(collections) == 0 { return fmt.Errorf("no collections to balance") diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go new file mode 100644 index 000000000..412599115 --- /dev/null +++ b/weed/shell/command_ec_common_test.go @@ -0,0 +1,87 @@ +package shell + +import ( + "fmt" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" +) + +func TestEcDistribution(t *testing.T) { + + topologyInfo := parseOutput(topoData) + + // find out all volume servers with one slot left. + ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topologyInfo, "") + + sortEcNodesByFreeslotsDescending(ecNodes) + + if totalFreeEcSlots < erasure_coding.TotalShardsCount { + t.Errorf("not enough free ec shard slots: %d", totalFreeEcSlots) + } + allocatedDataNodes := ecNodes + if len(allocatedDataNodes) > erasure_coding.TotalShardsCount { + allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount] + } + + for _, dn := range allocatedDataNodes { + // fmt.Printf("info %+v %+v\n", dn.info, dn) + fmt.Printf("=> %+v %+v\n", dn.info.Id, dn.freeEcSlot) + } +} + +func TestVolumeIdToReplicaPlacement(t *testing.T) { + topo1 := parseOutput(topoData) + topo2 := parseOutput(topoData2) + + testCases := []struct { + topology *master_pb.TopologyInfo + vid string + want string + wantErr string + }{ + {topo1, "", "", "failed to resolve replica placement for volume ID 0"}, + {topo1, "0", "", "failed to resolve replica placement for volume ID 0"}, + {topo1, "1", "100", ""}, + {topo1, "296", "100", ""}, + {topo2, "", "", "failed to resolve replica placement for volume ID 0"}, + {topo2, "19012", "", "failed to resolve replica placement for volume ID 19012"}, + {topo2, "6271", "002", ""}, + {topo2, "17932", "002", ""}, + } + + for _, tc := range testCases { + vid, _ := needle.NewVolumeId(tc.vid) + ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + got, gotErr := volumeIdToReplicaPlacement(vid, ecNodes) + + if tc.wantErr == "" && gotErr != nil { + t.Errorf("expected no error for volume '%s', got '%s'", tc.vid, gotErr.Error()) + continue + } + if tc.wantErr != "" { + if gotErr == nil { + t.Errorf("got no error for volume '%s', expected '%s'", tc.vid, tc.wantErr) + continue + } + if gotErr.Error() != tc.wantErr { + t.Errorf("expected error '%s' for volume '%s', got '%s'", tc.wantErr, tc.vid, gotErr.Error()) + continue + } + } + + if got == nil { + if tc.want != "" { + t.Errorf("expected replica placement '%s' for volume '%s', got nil", tc.want, tc.vid) + } + continue + } + want, _ := super_block.NewReplicaPlacementFromString(tc.want) + if !got.Equals(want) { + t.Errorf("got replica placement '%s' for volune '%s', want '%s'", got.String(), tc.vid, want.String()) + } + } +} diff --git a/weed/shell/command_ec_encode_test.go b/weed/shell/command_ec_encode_test.go deleted file mode 100644 index 346e2af14..000000000 --- a/weed/shell/command_ec_encode_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package shell - -import ( - "fmt" - "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" - "testing" -) - -func TestEcDistribution(t *testing.T) { - - topologyInfo := parseOutput(topoData) - - // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topologyInfo, "") - - sortEcNodesByFreeslotsDescending(ecNodes) - - if totalFreeEcSlots < erasure_coding.TotalShardsCount { - println("not enough free ec shard slots", totalFreeEcSlots) - } - allocatedDataNodes := ecNodes - if len(allocatedDataNodes) > erasure_coding.TotalShardsCount { - allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount] - } - - for _, dn := range allocatedDataNodes { - // fmt.Printf("info %+v %+v\n", dn.info, dn) - fmt.Printf("=> %+v %+v\n", dn.info.Id, dn.freeEcSlot) - } - -} diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index 25380ddca..ef9461ef0 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -129,7 +129,7 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod Id: dataNodeId, DiskInfos: make(map[string]*master_pb.DiskInfo), }, - dc: dc, + dc: DataCenterId(dc), rack: RackId(rack), freeEcSlot: freeEcSlot, } diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go index b2bf21fcb..f6d14e25b 100644 --- a/weed/storage/super_block/replica_placement.go +++ b/weed/storage/super_block/replica_placement.go @@ -45,6 +45,15 @@ func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) { return NewReplicaPlacementFromString(fmt.Sprintf("%03d", b)) } +func (a *ReplicaPlacement) Equals(b *ReplicaPlacement) bool { + if a == nil || b == nil { + return false + } + return (a.SameRackCount == b.SameRackCount && + a.DiffRackCount == b.DiffRackCount && + a.DiffDataCenterCount == b.DiffDataCenterCount) +} + func (rp *ReplicaPlacement) Byte() byte { if rp == nil { return 0