2019-06-03 17:26:31 +08:00
package shell
import (
"context"
2024-11-28 03:51:57 +08:00
"errors"
2019-06-03 17:26:31 +08:00
"fmt"
2024-11-22 00:54:03 +08:00
"math/rand/v2"
2024-11-05 09:56:20 +08:00
2022-07-29 15:17:28 +08:00
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
2024-11-19 10:05:06 +08:00
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
2022-07-29 15:17:28 +08:00
"github.com/seaweedfs/seaweedfs/weed/storage/types"
2022-04-18 10:35:43 +08:00
"golang.org/x/exp/slices"
2019-06-03 17:26:31 +08:00
"google.golang.org/grpc"
)
2024-11-19 10:05:06 +08:00
type DataCenterId string
type EcNodeId string
type RackId string
type EcNode struct {
info * master_pb . DataNodeInfo
dc DataCenterId
rack RackId
freeEcSlot int
}
type CandidateEcNode struct {
ecNode * EcNode
shardCount int
}
type EcRack struct {
ecNodes map [ EcNodeId ] * EcNode
freeEcSlot int
}
2024-12-03 00:44:07 +08:00
var (
// Overridable functions for testing.
getDefaultReplicaPlacement = _getDefaultReplicaPlacement
)
func _getDefaultReplicaPlacement ( commandEnv * CommandEnv ) ( * super_block . ReplicaPlacement , error ) {
var resp * master_pb . GetMasterConfigurationResponse
var err error
err = commandEnv . MasterClient . WithClient ( false , func ( client master_pb . SeaweedClient ) error {
resp , err = client . GetMasterConfiguration ( context . Background ( ) , & master_pb . GetMasterConfigurationRequest { } )
return err
} )
if err != nil {
return nil , err
}
return super_block . NewReplicaPlacementFromString ( resp . DefaultReplication )
}
func parseReplicaPlacementArg ( commandEnv * CommandEnv , replicaStr string ) ( * super_block . ReplicaPlacement , error ) {
if replicaStr != "" {
rp , err := super_block . NewReplicaPlacementFromString ( replicaStr )
if err == nil {
fmt . Printf ( "using replica placement %q for EC volumes\n" , rp . String ( ) )
}
return rp , err
}
// No replica placement argument provided, resolve from master default settings.
rp , err := getDefaultReplicaPlacement ( commandEnv )
if err == nil {
fmt . Printf ( "using master default replica placement %q for EC volumes\n" , rp . String ( ) )
}
return rp , err
}
2020-02-26 13:50:12 +08:00
func moveMountedShardToEcNode ( commandEnv * CommandEnv , existingLocation * EcNode , collection string , vid needle . VolumeId , shardId erasure_coding . ShardId , destinationEcNode * EcNode , applyBalancing bool ) ( err error ) {
2019-06-03 17:26:31 +08:00
2022-08-23 05:12:23 +08:00
if ! commandEnv . isLocked ( ) {
return fmt . Errorf ( "lock is lost" )
}
2019-06-11 12:32:56 +08:00
copiedShardIds := [ ] uint32 { uint32 ( shardId ) }
2019-06-03 17:26:31 +08:00
2019-06-11 12:32:56 +08:00
if applyBalancing {
2019-06-03 17:26:31 +08:00
2021-09-13 13:47:52 +08:00
existingServerAddress := pb . NewServerAddressFromDataNode ( existingLocation . info )
2019-06-11 12:32:56 +08:00
// ask destination node to copy shard and the ecx file from source node, and mount it
2021-09-13 13:47:52 +08:00
copiedShardIds , err = oneServerCopyAndMountEcShardsFromSource ( commandEnv . option . GrpcDialOption , destinationEcNode , [ ] uint32 { uint32 ( shardId ) } , vid , collection , existingServerAddress )
2019-06-11 12:32:56 +08:00
if err != nil {
return err
}
2019-06-03 17:26:31 +08:00
2019-06-11 12:32:56 +08:00
// unmount the to be deleted shards
2021-09-13 13:47:52 +08:00
err = unmountEcShards ( commandEnv . option . GrpcDialOption , vid , existingServerAddress , copiedShardIds )
2019-06-11 12:32:56 +08:00
if err != nil {
return err
}
// ask source node to delete the shard, and maybe the ecx file
2021-09-13 13:47:52 +08:00
err = sourceServerDeleteEcShards ( commandEnv . option . GrpcDialOption , collection , vid , existingServerAddress , copiedShardIds )
2019-06-11 12:32:56 +08:00
if err != nil {
return err
}
fmt . Printf ( "moved ec shard %d.%d %s => %s\n" , vid , shardId , existingLocation . info . Id , destinationEcNode . info . Id )
2019-06-03 17:26:31 +08:00
2019-06-06 14:20:26 +08:00
}
2019-06-11 12:32:56 +08:00
destinationEcNode . addEcVolumeShards ( vid , collection , copiedShardIds )
existingLocation . deleteEcVolumeShards ( vid , copiedShardIds )
2019-06-06 14:20:26 +08:00
return nil
2019-06-03 17:26:31 +08:00
}
2020-02-26 13:50:12 +08:00
func oneServerCopyAndMountEcShardsFromSource ( grpcDialOption grpc . DialOption ,
2019-12-25 08:52:21 +08:00
targetServer * EcNode , shardIdsToCopy [ ] uint32 ,
2021-09-13 13:47:52 +08:00
volumeId needle . VolumeId , collection string , existingLocation pb . ServerAddress ) ( copiedShardIds [ ] uint32 , err error ) {
2019-06-03 17:26:31 +08:00
fmt . Printf ( "allocate %d.%v %s => %s\n" , volumeId , shardIdsToCopy , existingLocation , targetServer . info . Id )
2021-09-13 13:47:52 +08:00
targetAddress := pb . NewServerAddressFromDataNode ( targetServer . info )
2021-12-26 16:15:03 +08:00
err = operation . WithVolumeServerClient ( false , targetAddress , grpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
2019-06-03 17:26:31 +08:00
2021-09-13 13:47:52 +08:00
if targetAddress != existingLocation {
2019-06-03 17:26:31 +08:00
fmt . Printf ( "copy %d.%v %s => %s\n" , volumeId , shardIdsToCopy , existingLocation , targetServer . info . Id )
2020-02-26 13:50:12 +08:00
_ , copyErr := volumeServerClient . VolumeEcShardsCopy ( context . Background ( ) , & volume_server_pb . VolumeEcShardsCopyRequest {
2019-06-03 17:26:31 +08:00
VolumeId : uint32 ( volumeId ) ,
Collection : collection ,
ShardIds : shardIdsToCopy ,
CopyEcxFile : true ,
2019-12-24 04:48:20 +08:00
CopyEcjFile : true ,
2019-12-29 04:44:59 +08:00
CopyVifFile : true ,
2021-09-13 13:47:52 +08:00
SourceDataNode : string ( existingLocation ) ,
2019-06-03 17:26:31 +08:00
} )
if copyErr != nil {
2019-06-04 01:38:21 +08:00
return fmt . Errorf ( "copy %d.%v %s => %s : %v\n" , volumeId , shardIdsToCopy , existingLocation , targetServer . info . Id , copyErr )
2019-06-03 17:26:31 +08:00
}
}
fmt . Printf ( "mount %d.%v on %s\n" , volumeId , shardIdsToCopy , targetServer . info . Id )
2020-02-26 13:50:12 +08:00
_ , mountErr := volumeServerClient . VolumeEcShardsMount ( context . Background ( ) , & volume_server_pb . VolumeEcShardsMountRequest {
2019-06-03 17:26:31 +08:00
VolumeId : uint32 ( volumeId ) ,
Collection : collection ,
ShardIds : shardIdsToCopy ,
} )
if mountErr != nil {
2019-06-04 01:38:21 +08:00
return fmt . Errorf ( "mount %d.%v on %s : %v\n" , volumeId , shardIdsToCopy , targetServer . info . Id , mountErr )
2019-06-03 17:26:31 +08:00
}
2021-09-13 13:47:52 +08:00
if targetAddress != existingLocation {
2019-06-03 17:26:31 +08:00
copiedShardIds = shardIdsToCopy
glog . V ( 0 ) . Infof ( "%s ec volume %d deletes shards %+v" , existingLocation , volumeId , copiedShardIds )
}
return nil
} )
if err != nil {
return
}
return
}
2024-11-19 22:33:18 +08:00
func eachDataNode ( topo * master_pb . TopologyInfo , fn func ( dc DataCenterId , rack RackId , dn * master_pb . DataNodeInfo ) ) {
2019-06-03 17:26:31 +08:00
for _ , dc := range topo . DataCenterInfos {
for _ , rack := range dc . RackInfos {
for _ , dn := range rack . DataNodeInfos {
2024-11-19 22:33:18 +08:00
fn ( DataCenterId ( dc . Id ) , RackId ( rack . Id ) , dn )
2019-06-03 17:26:31 +08:00
}
}
}
}
2022-09-15 03:06:48 +08:00
func sortEcNodesByFreeslotsDescending ( ecNodes [ ] * EcNode ) {
2023-09-26 00:35:16 +08:00
slices . SortFunc ( ecNodes , func ( a , b * EcNode ) int {
return b . freeEcSlot - a . freeEcSlot
2019-06-03 17:26:31 +08:00
} )
}
2019-11-13 05:47:36 +08:00
func sortEcNodesByFreeslotsAscending ( ecNodes [ ] * EcNode ) {
2023-09-26 00:35:16 +08:00
slices . SortFunc ( ecNodes , func ( a , b * EcNode ) int {
return a . freeEcSlot - b . freeEcSlot
2019-11-13 05:47:36 +08:00
} )
}
2019-06-11 12:32:56 +08:00
// if the index node changed the freeEcSlot, need to keep every EcNode still sorted
func ensureSortedEcNodes ( data [ ] * CandidateEcNode , index int , lessThan func ( i , j int ) bool ) {
for i := index - 1 ; i >= 0 ; i -- {
if lessThan ( i + 1 , i ) {
swap ( data , i , i + 1 )
} else {
break
}
}
for i := index + 1 ; i < len ( data ) ; i ++ {
if lessThan ( i , i - 1 ) {
swap ( data , i , i - 1 )
} else {
break
}
}
}
func swap ( data [ ] * CandidateEcNode , i , j int ) {
t := data [ i ]
data [ i ] = data [ j ]
data [ j ] = t
}
2019-06-03 17:26:31 +08:00
func countShards ( ecShardInfos [ ] * master_pb . VolumeEcShardInformationMessage ) ( count int ) {
for _ , ecShardInfo := range ecShardInfos {
shardBits := erasure_coding . ShardBits ( ecShardInfo . EcIndexBits )
count += shardBits . ShardIdCount ( )
}
return
}
2021-02-16 18:47:02 +08:00
func countFreeShardSlots ( dn * master_pb . DataNodeInfo , diskType types . DiskType ) ( count int ) {
if dn . DiskInfos == nil {
return 0
}
diskInfo := dn . DiskInfos [ string ( diskType ) ]
if diskInfo == nil {
return 0
}
2022-02-08 17:51:13 +08:00
return int ( diskInfo . MaxVolumeCount - diskInfo . VolumeCount ) * erasure_coding . DataShardsCount - countShards ( diskInfo . EcShardInfos )
2019-06-03 17:26:31 +08:00
}
2020-09-15 14:47:11 +08:00
func ( ecNode * EcNode ) localShardIdCount ( vid uint32 ) int {
2021-02-16 18:47:02 +08:00
for _ , diskInfo := range ecNode . info . DiskInfos {
for _ , ecShardInfo := range diskInfo . EcShardInfos {
if vid == ecShardInfo . Id {
shardBits := erasure_coding . ShardBits ( ecShardInfo . EcIndexBits )
return shardBits . ShardIdCount ( )
}
2020-09-15 14:47:11 +08:00
}
}
return 0
}
2020-02-26 13:50:12 +08:00
func collectEcNodes ( commandEnv * CommandEnv , selectedDataCenter string ) ( ecNodes [ ] * EcNode , totalFreeEcSlots int , err error ) {
2019-06-03 17:26:31 +08:00
// list all possible locations
2021-02-22 16:28:42 +08:00
// collect topology information
2022-02-08 16:53:55 +08:00
topologyInfo , _ , err := collectTopologyInfo ( commandEnv , 0 )
2019-06-03 17:26:31 +08:00
if err != nil {
2021-02-22 16:28:42 +08:00
return
2019-06-03 17:26:31 +08:00
}
// find out all volume servers with one slot left.
2021-02-22 16:28:42 +08:00
ecNodes , totalFreeEcSlots = collectEcVolumeServersByDc ( topologyInfo , selectedDataCenter )
2020-09-15 14:47:11 +08:00
2022-09-15 03:06:48 +08:00
sortEcNodesByFreeslotsDescending ( ecNodes )
2020-09-15 14:47:11 +08:00
return
}
func collectEcVolumeServersByDc ( topo * master_pb . TopologyInfo , selectedDataCenter string ) ( ecNodes [ ] * EcNode , totalFreeEcSlots int ) {
2024-11-19 22:33:18 +08:00
eachDataNode ( topo , func ( dc DataCenterId , rack RackId , dn * master_pb . DataNodeInfo ) {
if selectedDataCenter != "" && selectedDataCenter != string ( dc ) {
2019-06-11 12:32:56 +08:00
return
}
2019-11-13 01:33:51 +08:00
2021-02-16 18:47:02 +08:00
freeEcSlots := countFreeShardSlots ( dn , types . HardDriveType )
2019-11-13 01:33:51 +08:00
ecNodes = append ( ecNodes , & EcNode {
info : dn ,
2024-11-19 22:33:18 +08:00
dc : dc ,
2019-11-13 01:33:51 +08:00
rack : rack ,
freeEcSlot : int ( freeEcSlots ) ,
} )
totalFreeEcSlots += freeEcSlots
2019-06-03 17:26:31 +08:00
} )
return
}
2021-09-13 13:47:52 +08:00
func sourceServerDeleteEcShards ( grpcDialOption grpc . DialOption , collection string , volumeId needle . VolumeId , sourceLocation pb . ServerAddress , toBeDeletedShardIds [ ] uint32 ) error {
2019-06-03 17:26:31 +08:00
fmt . Printf ( "delete %d.%v from %s\n" , volumeId , toBeDeletedShardIds , sourceLocation )
2021-12-26 16:15:03 +08:00
return operation . WithVolumeServerClient ( false , sourceLocation , grpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
2020-02-26 13:50:12 +08:00
_ , deleteErr := volumeServerClient . VolumeEcShardsDelete ( context . Background ( ) , & volume_server_pb . VolumeEcShardsDeleteRequest {
2019-06-03 17:26:31 +08:00
VolumeId : uint32 ( volumeId ) ,
Collection : collection ,
ShardIds : toBeDeletedShardIds ,
} )
return deleteErr
} )
}
2021-09-13 13:47:52 +08:00
func unmountEcShards ( grpcDialOption grpc . DialOption , volumeId needle . VolumeId , sourceLocation pb . ServerAddress , toBeUnmountedhardIds [ ] uint32 ) error {
2019-06-03 17:26:31 +08:00
fmt . Printf ( "unmount %d.%v from %s\n" , volumeId , toBeUnmountedhardIds , sourceLocation )
2021-12-26 16:15:03 +08:00
return operation . WithVolumeServerClient ( false , sourceLocation , grpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
2020-02-26 13:50:12 +08:00
_ , deleteErr := volumeServerClient . VolumeEcShardsUnmount ( context . Background ( ) , & volume_server_pb . VolumeEcShardsUnmountRequest {
2019-06-03 17:26:31 +08:00
VolumeId : uint32 ( volumeId ) ,
ShardIds : toBeUnmountedhardIds ,
} )
return deleteErr
} )
}
2021-09-13 13:47:52 +08:00
func mountEcShards ( grpcDialOption grpc . DialOption , collection string , volumeId needle . VolumeId , sourceLocation pb . ServerAddress , toBeMountedhardIds [ ] uint32 ) error {
2019-06-03 17:26:31 +08:00
fmt . Printf ( "mount %d.%v on %s\n" , volumeId , toBeMountedhardIds , sourceLocation )
2021-12-26 16:15:03 +08:00
return operation . WithVolumeServerClient ( false , sourceLocation , grpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
2020-02-26 13:50:12 +08:00
_ , mountErr := volumeServerClient . VolumeEcShardsMount ( context . Background ( ) , & volume_server_pb . VolumeEcShardsMountRequest {
2019-06-03 17:26:31 +08:00
VolumeId : uint32 ( volumeId ) ,
Collection : collection ,
ShardIds : toBeMountedhardIds ,
} )
return mountErr
} )
}
2019-06-11 12:32:56 +08:00
2024-11-19 10:05:06 +08:00
func ceilDivide ( a , b int ) int {
var r int
if ( a % b ) != 0 {
r = 1
}
return ( a / b ) + r
2019-06-11 12:32:56 +08:00
}
func findEcVolumeShards ( ecNode * EcNode , vid needle . VolumeId ) erasure_coding . ShardBits {
2021-02-16 21:13:48 +08:00
if diskInfo , found := ecNode . info . DiskInfos [ string ( types . HardDriveType ) ] ; found {
for _ , shardInfo := range diskInfo . EcShardInfos {
if needle . VolumeId ( shardInfo . Id ) == vid {
return erasure_coding . ShardBits ( shardInfo . EcIndexBits )
}
2019-06-11 12:32:56 +08:00
}
}
return 0
}
func ( ecNode * EcNode ) addEcVolumeShards ( vid needle . VolumeId , collection string , shardIds [ ] uint32 ) * EcNode {
foundVolume := false
2021-02-16 21:33:38 +08:00
diskInfo , found := ecNode . info . DiskInfos [ string ( types . HardDriveType ) ]
if found {
2021-02-16 21:13:48 +08:00
for _ , shardInfo := range diskInfo . EcShardInfos {
if needle . VolumeId ( shardInfo . Id ) == vid {
oldShardBits := erasure_coding . ShardBits ( shardInfo . EcIndexBits )
newShardBits := oldShardBits
for _ , shardId := range shardIds {
newShardBits = newShardBits . AddShardId ( erasure_coding . ShardId ( shardId ) )
}
shardInfo . EcIndexBits = uint32 ( newShardBits )
ecNode . freeEcSlot -= newShardBits . ShardIdCount ( ) - oldShardBits . ShardIdCount ( )
foundVolume = true
break
2019-06-11 12:32:56 +08:00
}
}
2021-02-16 21:33:38 +08:00
} else {
diskInfo = & master_pb . DiskInfo {
Type : string ( types . HardDriveType ) ,
}
ecNode . info . DiskInfos [ string ( types . HardDriveType ) ] = diskInfo
2019-06-11 12:32:56 +08:00
}
if ! foundVolume {
var newShardBits erasure_coding . ShardBits
for _ , shardId := range shardIds {
newShardBits = newShardBits . AddShardId ( erasure_coding . ShardId ( shardId ) )
}
2021-02-16 18:47:02 +08:00
diskInfo . EcShardInfos = append ( diskInfo . EcShardInfos , & master_pb . VolumeEcShardInformationMessage {
2019-06-11 12:32:56 +08:00
Id : uint32 ( vid ) ,
Collection : collection ,
EcIndexBits : uint32 ( newShardBits ) ,
2021-02-16 18:47:02 +08:00
DiskType : string ( types . HardDriveType ) ,
2019-06-11 12:32:56 +08:00
} )
ecNode . freeEcSlot -= len ( shardIds )
}
return ecNode
}
func ( ecNode * EcNode ) deleteEcVolumeShards ( vid needle . VolumeId , shardIds [ ] uint32 ) * EcNode {
2021-02-16 21:13:48 +08:00
if diskInfo , found := ecNode . info . DiskInfos [ string ( types . HardDriveType ) ] ; found {
for _ , shardInfo := range diskInfo . EcShardInfos {
if needle . VolumeId ( shardInfo . Id ) == vid {
oldShardBits := erasure_coding . ShardBits ( shardInfo . EcIndexBits )
newShardBits := oldShardBits
for _ , shardId := range shardIds {
newShardBits = newShardBits . RemoveShardId ( erasure_coding . ShardId ( shardId ) )
}
shardInfo . EcIndexBits = uint32 ( newShardBits )
ecNode . freeEcSlot -= newShardBits . ShardIdCount ( ) - oldShardBits . ShardIdCount ( )
2019-06-11 12:32:56 +08:00
}
}
}
return ecNode
}
func groupByCount ( data [ ] * EcNode , identifierFn func ( * EcNode ) ( id string , count int ) ) map [ string ] int {
countMap := make ( map [ string ] int )
for _ , d := range data {
id , count := identifierFn ( d )
countMap [ id ] += count
}
return countMap
}
func groupBy ( data [ ] * EcNode , identifierFn func ( * EcNode ) ( id string ) ) map [ string ] [ ] * EcNode {
groupMap := make ( map [ string ] [ ] * EcNode )
for _ , d := range data {
id := identifierFn ( d )
groupMap [ id ] = append ( groupMap [ id ] , d )
}
return groupMap
}
2024-11-05 09:56:20 +08:00
func collectRacks ( allEcNodes [ ] * EcNode ) map [ RackId ] * EcRack {
// collect racks info
racks := make ( map [ RackId ] * EcRack )
for _ , ecNode := range allEcNodes {
if racks [ ecNode . rack ] == nil {
racks [ ecNode . rack ] = & EcRack {
ecNodes : make ( map [ EcNodeId ] * EcNode ) ,
}
}
racks [ ecNode . rack ] . ecNodes [ EcNodeId ( ecNode . info . Id ) ] = ecNode
racks [ ecNode . rack ] . freeEcSlot += ecNode . freeEcSlot
}
return racks
}
func balanceEcVolumes ( commandEnv * CommandEnv , collection string , allEcNodes [ ] * EcNode , racks map [ RackId ] * EcRack , applyBalancing bool ) error {
fmt . Printf ( "balanceEcVolumes %s\n" , collection )
if err := deleteDuplicatedEcShards ( commandEnv , allEcNodes , collection , applyBalancing ) ; err != nil {
return fmt . Errorf ( "delete duplicated collection %s ec shards: %v" , collection , err )
}
if err := balanceEcShardsAcrossRacks ( commandEnv , allEcNodes , racks , collection , applyBalancing ) ; err != nil {
return fmt . Errorf ( "balance across racks collection %s ec shards: %v" , collection , err )
}
if err := balanceEcShardsWithinRacks ( commandEnv , allEcNodes , racks , collection , applyBalancing ) ; err != nil {
return fmt . Errorf ( "balance within racks collection %s ec shards: %v" , collection , err )
}
return nil
}
func deleteDuplicatedEcShards ( commandEnv * CommandEnv , allEcNodes [ ] * EcNode , collection string , applyBalancing bool ) error {
// vid => []ecNode
vidLocations := collectVolumeIdToEcNodes ( allEcNodes , collection )
// deduplicate ec shards
for vid , locations := range vidLocations {
if err := doDeduplicateEcShards ( commandEnv , collection , vid , locations , applyBalancing ) ; err != nil {
return err
}
}
return nil
}
func doDeduplicateEcShards ( commandEnv * CommandEnv , collection string , vid needle . VolumeId , locations [ ] * EcNode , applyBalancing bool ) error {
// check whether this volume has ecNodes that are over average
shardToLocations := make ( [ ] [ ] * EcNode , erasure_coding . TotalShardsCount )
for _ , ecNode := range locations {
shardBits := findEcVolumeShards ( ecNode , vid )
for _ , shardId := range shardBits . ShardIds ( ) {
shardToLocations [ shardId ] = append ( shardToLocations [ shardId ] , ecNode )
}
}
for shardId , ecNodes := range shardToLocations {
if len ( ecNodes ) <= 1 {
continue
}
sortEcNodesByFreeslotsAscending ( ecNodes )
fmt . Printf ( "ec shard %d.%d has %d copies, keeping %v\n" , vid , shardId , len ( ecNodes ) , ecNodes [ 0 ] . info . Id )
if ! applyBalancing {
continue
}
duplicatedShardIds := [ ] uint32 { uint32 ( shardId ) }
for _ , ecNode := range ecNodes [ 1 : ] {
if err := unmountEcShards ( commandEnv . option . GrpcDialOption , vid , pb . NewServerAddressFromDataNode ( ecNode . info ) , duplicatedShardIds ) ; err != nil {
return err
}
if err := sourceServerDeleteEcShards ( commandEnv . option . GrpcDialOption , collection , vid , pb . NewServerAddressFromDataNode ( ecNode . info ) , duplicatedShardIds ) ; err != nil {
return err
}
ecNode . deleteEcVolumeShards ( vid , duplicatedShardIds )
}
}
return nil
}
func balanceEcShardsAcrossRacks ( commandEnv * CommandEnv , allEcNodes [ ] * EcNode , racks map [ RackId ] * EcRack , collection string , applyBalancing bool ) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := collectVolumeIdToEcNodes ( allEcNodes , collection )
// spread the ec shards evenly
for vid , locations := range vidLocations {
if err := doBalanceEcShardsAcrossRacks ( commandEnv , collection , vid , locations , racks , applyBalancing ) ; err != nil {
return err
}
}
return nil
}
2024-11-22 00:46:24 +08:00
func countShardsByRack ( vid needle . VolumeId , locations [ ] * EcNode ) map [ string ] int {
return groupByCount ( locations , func ( ecNode * EcNode ) ( id string , count int ) {
shardBits := findEcVolumeShards ( ecNode , vid )
return string ( ecNode . rack ) , shardBits . ShardIdCount ( )
} )
}
2024-11-05 09:56:20 +08:00
2024-11-28 03:51:57 +08:00
// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
2024-11-22 00:46:24 +08:00
func doBalanceEcShardsAcrossRacks ( commandEnv * CommandEnv , collection string , vid needle . VolumeId , locations [ ] * EcNode , racks map [ RackId ] * EcRack , applyBalancing bool ) error {
2024-11-05 09:56:20 +08:00
// calculate average number of shards an ec rack should have for one volume
averageShardsPerEcRack := ceilDivide ( erasure_coding . TotalShardsCount , len ( racks ) )
// see the volume's shards are in how many racks, and how many in each rack
2024-11-22 00:46:24 +08:00
rackToShardCount := countShardsByRack ( vid , locations )
2024-11-05 09:56:20 +08:00
rackEcNodesWithVid := groupBy ( locations , func ( ecNode * EcNode ) string {
return string ( ecNode . rack )
} )
// ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
ecShardsToMove := make ( map [ erasure_coding . ShardId ] * EcNode )
for rackId , count := range rackToShardCount {
2024-11-22 00:46:24 +08:00
if count <= averageShardsPerEcRack {
continue
}
possibleEcNodes := rackEcNodesWithVid [ rackId ]
for shardId , ecNode := range pickNEcShardsToMoveFrom ( possibleEcNodes , vid , count - averageShardsPerEcRack ) {
ecShardsToMove [ shardId ] = ecNode
2024-11-05 09:56:20 +08:00
}
}
for shardId , ecNode := range ecShardsToMove {
2024-11-22 00:46:24 +08:00
// TODO: consider volume replica info when balancing racks
2024-11-29 00:42:41 +08:00
rackId , err := pickRackToBalanceShardsInto ( racks , rackToShardCount , nil , averageShardsPerEcRack )
if err != nil {
fmt . Printf ( "ec shard %d.%d at %s can not find a destination rack:\n%s\n" , vid , shardId , ecNode . info . Id , err . Error ( ) )
2024-11-05 09:56:20 +08:00
continue
}
2024-11-29 00:42:41 +08:00
2024-11-05 09:56:20 +08:00
var possibleDestinationEcNodes [ ] * EcNode
for _ , n := range racks [ rackId ] . ecNodes {
possibleDestinationEcNodes = append ( possibleDestinationEcNodes , n )
}
2024-11-29 00:42:41 +08:00
err = pickOneEcNodeAndMoveOneShard ( commandEnv , averageShardsPerEcRack , ecNode , collection , vid , shardId , possibleDestinationEcNodes , applyBalancing )
2024-11-05 09:56:20 +08:00
if err != nil {
return err
}
rackToShardCount [ string ( rackId ) ] += 1
rackToShardCount [ string ( ecNode . rack ) ] -= 1
racks [ rackId ] . freeEcSlot -= 1
racks [ ecNode . rack ] . freeEcSlot += 1
}
return nil
}
2024-11-29 00:42:41 +08:00
func pickRackToBalanceShardsInto ( rackToEcNodes map [ RackId ] * EcRack , rackToShardCount map [ string ] int , replicaPlacement * super_block . ReplicaPlacement , averageShardsPerEcRack int ) ( RackId , error ) {
2024-11-22 00:46:24 +08:00
targets := [ ] RackId { }
targetShards := - 1
for _ , shards := range rackToShardCount {
if shards > targetShards {
targetShards = shards
}
}
2024-11-05 09:56:20 +08:00
2024-11-29 00:42:41 +08:00
details := ""
2024-11-05 09:56:20 +08:00
for rackId , rack := range rackToEcNodes {
2024-11-22 00:46:24 +08:00
shards := rackToShardCount [ string ( rackId ) ]
2024-11-05 09:56:20 +08:00
if rack . freeEcSlot <= 0 {
2024-11-29 00:42:41 +08:00
details += fmt . Sprintf ( " Skipped %s because it has no free slots\n" , rackId )
2024-11-05 09:56:20 +08:00
continue
}
2024-11-22 00:46:24 +08:00
if replicaPlacement != nil && shards >= replicaPlacement . DiffRackCount {
2024-11-29 00:42:41 +08:00
details += fmt . Sprintf ( " Skipped %s because shards %d >= replica placement limit for other racks (%d)\n" , rackId , shards , replicaPlacement . DiffRackCount )
2024-11-22 00:46:24 +08:00
continue
}
if shards >= averageShardsPerEcRack {
2024-11-29 00:42:41 +08:00
details += fmt . Sprintf ( " Skipped %s because shards %d >= averageShards (%d)\n" , rackId , shards , averageShardsPerEcRack )
2024-11-22 00:46:24 +08:00
continue
}
if shards < targetShards {
// Favor racks with less shards, to ensure an uniform distribution.
targets = nil
targetShards = shards
}
if shards == targetShards {
targets = append ( targets , rackId )
}
2024-11-05 09:56:20 +08:00
}
2024-11-22 00:46:24 +08:00
if len ( targets ) == 0 {
2024-11-29 00:42:41 +08:00
return "" , errors . New ( details )
2024-11-22 00:46:24 +08:00
}
2024-11-29 00:42:41 +08:00
return targets [ rand . IntN ( len ( targets ) ) ] , nil
2024-11-05 09:56:20 +08:00
}
func balanceEcShardsWithinRacks ( commandEnv * CommandEnv , allEcNodes [ ] * EcNode , racks map [ RackId ] * EcRack , collection string , applyBalancing bool ) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := collectVolumeIdToEcNodes ( allEcNodes , collection )
// spread the ec shards evenly
for vid , locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack
2024-11-28 03:51:57 +08:00
rackToShardCount := countShardsByRack ( vid , locations )
2024-11-05 09:56:20 +08:00
rackEcNodesWithVid := groupBy ( locations , func ( ecNode * EcNode ) string {
return string ( ecNode . rack )
} )
for rackId , _ := range rackToShardCount {
var possibleDestinationEcNodes [ ] * EcNode
for _ , n := range racks [ RackId ( rackId ) ] . ecNodes {
if _ , found := n . info . DiskInfos [ string ( types . HardDriveType ) ] ; found {
possibleDestinationEcNodes = append ( possibleDestinationEcNodes , n )
}
}
sourceEcNodes := rackEcNodesWithVid [ rackId ]
averageShardsPerEcNode := ceilDivide ( rackToShardCount [ rackId ] , len ( possibleDestinationEcNodes ) )
if err := doBalanceEcShardsWithinOneRack ( commandEnv , averageShardsPerEcNode , collection , vid , sourceEcNodes , possibleDestinationEcNodes , applyBalancing ) ; err != nil {
return err
}
}
}
return nil
}
func doBalanceEcShardsWithinOneRack ( commandEnv * CommandEnv , averageShardsPerEcNode int , collection string , vid needle . VolumeId , existingLocations , possibleDestinationEcNodes [ ] * EcNode , applyBalancing bool ) error {
for _ , ecNode := range existingLocations {
shardBits := findEcVolumeShards ( ecNode , vid )
overLimitCount := shardBits . ShardIdCount ( ) - averageShardsPerEcNode
for _ , shardId := range shardBits . ShardIds ( ) {
if overLimitCount <= 0 {
break
}
fmt . Printf ( "%s has %d overlimit, moving ec shard %d.%d\n" , ecNode . info . Id , overLimitCount , vid , shardId )
err := pickOneEcNodeAndMoveOneShard ( commandEnv , averageShardsPerEcNode , ecNode , collection , vid , shardId , possibleDestinationEcNodes , applyBalancing )
if err != nil {
return err
}
overLimitCount --
}
}
return nil
}
func balanceEcRacks ( commandEnv * CommandEnv , racks map [ RackId ] * EcRack , applyBalancing bool ) error {
// balance one rack for all ec shards
for _ , ecRack := range racks {
if err := doBalanceEcRack ( commandEnv , ecRack , applyBalancing ) ; err != nil {
return err
}
}
return nil
}
func doBalanceEcRack ( commandEnv * CommandEnv , ecRack * EcRack , applyBalancing bool ) error {
if len ( ecRack . ecNodes ) <= 1 {
return nil
}
var rackEcNodes [ ] * EcNode
for _ , node := range ecRack . ecNodes {
rackEcNodes = append ( rackEcNodes , node )
}
ecNodeIdToShardCount := groupByCount ( rackEcNodes , func ( ecNode * EcNode ) ( id string , count int ) {
diskInfo , found := ecNode . info . DiskInfos [ string ( types . HardDriveType ) ]
if ! found {
return
}
for _ , ecShardInfo := range diskInfo . EcShardInfos {
count += erasure_coding . ShardBits ( ecShardInfo . EcIndexBits ) . ShardIdCount ( )
}
return ecNode . info . Id , count
} )
var totalShardCount int
for _ , count := range ecNodeIdToShardCount {
totalShardCount += count
}
averageShardCount := ceilDivide ( totalShardCount , len ( rackEcNodes ) )
hasMove := true
for hasMove {
hasMove = false
slices . SortFunc ( rackEcNodes , func ( a , b * EcNode ) int {
return b . freeEcSlot - a . freeEcSlot
} )
emptyNode , fullNode := rackEcNodes [ 0 ] , rackEcNodes [ len ( rackEcNodes ) - 1 ]
emptyNodeShardCount , fullNodeShardCount := ecNodeIdToShardCount [ emptyNode . info . Id ] , ecNodeIdToShardCount [ fullNode . info . Id ]
if fullNodeShardCount > averageShardCount && emptyNodeShardCount + 1 <= averageShardCount {
emptyNodeIds := make ( map [ uint32 ] bool )
if emptyDiskInfo , found := emptyNode . info . DiskInfos [ string ( types . HardDriveType ) ] ; found {
for _ , shards := range emptyDiskInfo . EcShardInfos {
emptyNodeIds [ shards . Id ] = true
}
}
if fullDiskInfo , found := fullNode . info . DiskInfos [ string ( types . HardDriveType ) ] ; found {
for _ , shards := range fullDiskInfo . EcShardInfos {
if _ , found := emptyNodeIds [ shards . Id ] ; ! found {
for _ , shardId := range erasure_coding . ShardBits ( shards . EcIndexBits ) . ShardIds ( ) {
fmt . Printf ( "%s moves ec shards %d.%d to %s\n" , fullNode . info . Id , shards . Id , shardId , emptyNode . info . Id )
err := moveMountedShardToEcNode ( commandEnv , fullNode , shards . Collection , needle . VolumeId ( shards . Id ) , shardId , emptyNode , applyBalancing )
if err != nil {
return err
}
ecNodeIdToShardCount [ emptyNode . info . Id ] ++
ecNodeIdToShardCount [ fullNode . info . Id ] --
hasMove = true
break
}
break
}
}
}
}
}
return nil
}
2024-11-28 03:51:57 +08:00
func pickEcNodeToBalanceShardsInto ( vid needle . VolumeId , existingLocation * EcNode , possibleDestinations [ ] * EcNode , replicaPlacement * super_block . ReplicaPlacement , averageShardsPerEcNode int ) ( * EcNode , error ) {
if existingLocation == nil {
return nil , fmt . Errorf ( "INTERNAL: missing source nodes" )
}
if len ( possibleDestinations ) == 0 {
return nil , fmt . Errorf ( "INTERNAL: missing destination nodes" )
}
2024-11-05 09:56:20 +08:00
2024-11-28 03:51:57 +08:00
nodeShards := map [ * EcNode ] int { }
for _ , node := range possibleDestinations {
nodeShards [ node ] = findEcVolumeShards ( node , vid ) . ShardIdCount ( )
}
2024-11-05 09:56:20 +08:00
2024-11-28 03:51:57 +08:00
targets := [ ] * EcNode { }
targetShards := - 1
for _ , shards := range nodeShards {
if shards > targetShards {
targetShards = shards
2024-11-05 09:56:20 +08:00
}
2024-11-28 03:51:57 +08:00
}
2024-11-05 09:56:20 +08:00
2024-11-28 03:51:57 +08:00
details := ""
for _ , node := range possibleDestinations {
if node . info . Id == existingLocation . info . Id {
2024-11-05 09:56:20 +08:00
continue
}
2024-11-28 03:51:57 +08:00
if node . freeEcSlot <= 0 {
details += fmt . Sprintf ( " Skipped %s because it has no free slots\n" , node . info . Id )
2024-11-05 09:56:20 +08:00
continue
}
2024-11-28 03:51:57 +08:00
shards := nodeShards [ node ]
if replicaPlacement != nil && shards >= replicaPlacement . SameRackCount {
details += fmt . Sprintf ( " Skipped %s because shards %d >= replica placement limit for the rack (%d)\n" , node . info . Id , shards , replicaPlacement . SameRackCount )
continue
}
if shards >= averageShardsPerEcNode {
details += fmt . Sprintf ( " Skipped %s because shards %d >= averageShards (%d)\n" ,
node . info . Id , shards , averageShardsPerEcNode )
continue
}
2024-11-05 09:56:20 +08:00
2024-11-28 03:51:57 +08:00
if shards < targetShards {
// Favor nodes with less shards, to ensure an uniform distribution.
targets = nil
targetShards = shards
2024-11-05 09:56:20 +08:00
}
2024-11-28 03:51:57 +08:00
if shards == targetShards {
targets = append ( targets , node )
}
}
2024-11-05 09:56:20 +08:00
2024-11-28 03:51:57 +08:00
if len ( targets ) == 0 {
return nil , errors . New ( details )
}
return targets [ rand . IntN ( len ( targets ) ) ] , nil
}
// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
func pickOneEcNodeAndMoveOneShard ( commandEnv * CommandEnv , averageShardsPerEcNode int , existingLocation * EcNode , collection string , vid needle . VolumeId , shardId erasure_coding . ShardId , possibleDestinationEcNodes [ ] * EcNode , applyBalancing bool ) error {
// TODO: consider volume replica info when balancing nodes
destNode , err := pickEcNodeToBalanceShardsInto ( vid , existingLocation , possibleDestinationEcNodes , nil , averageShardsPerEcNode )
if err != nil {
fmt . Printf ( "WARNING: Could not find suitable taget node for %d.%d:\n%s" , vid , shardId , err . Error ( ) )
2024-11-05 09:56:20 +08:00
return nil
}
2024-11-28 03:51:57 +08:00
fmt . Printf ( "%s moves ec shard %d.%d to %s\n" , existingLocation . info . Id , vid , shardId , destNode . info . Id )
return moveMountedShardToEcNode ( commandEnv , existingLocation , collection , vid , shardId , destNode , applyBalancing )
2024-11-05 09:56:20 +08:00
}
func pickNEcShardsToMoveFrom ( ecNodes [ ] * EcNode , vid needle . VolumeId , n int ) map [ erasure_coding . ShardId ] * EcNode {
picked := make ( map [ erasure_coding . ShardId ] * EcNode )
var candidateEcNodes [ ] * CandidateEcNode
for _ , ecNode := range ecNodes {
shardBits := findEcVolumeShards ( ecNode , vid )
if shardBits . ShardIdCount ( ) > 0 {
candidateEcNodes = append ( candidateEcNodes , & CandidateEcNode {
ecNode : ecNode ,
shardCount : shardBits . ShardIdCount ( ) ,
} )
}
}
slices . SortFunc ( candidateEcNodes , func ( a , b * CandidateEcNode ) int {
return b . shardCount - a . shardCount
} )
for i := 0 ; i < n ; i ++ {
selectedEcNodeIndex := - 1
for i , candidateEcNode := range candidateEcNodes {
shardBits := findEcVolumeShards ( candidateEcNode . ecNode , vid )
if shardBits > 0 {
selectedEcNodeIndex = i
for _ , shardId := range shardBits . ShardIds ( ) {
candidateEcNode . shardCount --
picked [ shardId ] = candidateEcNode . ecNode
candidateEcNode . ecNode . deleteEcVolumeShards ( vid , [ ] uint32 { uint32 ( shardId ) } )
break
}
break
}
}
if selectedEcNodeIndex >= 0 {
ensureSortedEcNodes ( candidateEcNodes , selectedEcNodeIndex , func ( i , j int ) bool {
return candidateEcNodes [ i ] . shardCount > candidateEcNodes [ j ] . shardCount
} )
}
}
return picked
}
func collectVolumeIdToEcNodes ( allEcNodes [ ] * EcNode , collection string ) map [ needle . VolumeId ] [ ] * EcNode {
vidLocations := make ( map [ needle . VolumeId ] [ ] * EcNode )
for _ , ecNode := range allEcNodes {
diskInfo , found := ecNode . info . DiskInfos [ string ( types . HardDriveType ) ]
if ! found {
continue
}
for _ , shardInfo := range diskInfo . EcShardInfos {
// ignore if not in current collection
if shardInfo . Collection == collection {
vidLocations [ needle . VolumeId ( shardInfo . Id ) ] = append ( vidLocations [ needle . VolumeId ( shardInfo . Id ) ] , ecNode )
}
}
}
return vidLocations
}
2024-12-03 00:44:07 +08:00
// TODO: EC volumes have no topology replica placement info :( We need a better solution to resolve topology, and balancing, for those.
func volumeIdToReplicaPlacement ( commandEnv * CommandEnv , vid needle . VolumeId , nodes [ ] * EcNode , ecReplicaPlacement * super_block . ReplicaPlacement ) ( * super_block . ReplicaPlacement , error ) {
2024-11-19 10:05:06 +08:00
for _ , ecNode := range nodes {
for _ , diskInfo := range ecNode . info . DiskInfos {
for _ , volumeInfo := range diskInfo . VolumeInfos {
2024-12-03 00:44:07 +08:00
if needle . VolumeId ( volumeInfo . Id ) == vid {
return super_block . NewReplicaPlacementFromByte ( byte ( volumeInfo . ReplicaPlacement ) )
}
}
for _ , ecShardInfo := range diskInfo . EcShardInfos {
if needle . VolumeId ( ecShardInfo . Id ) == vid {
return ecReplicaPlacement , nil
2024-11-19 10:05:06 +08:00
}
}
}
}
return nil , fmt . Errorf ( "failed to resolve replica placement for volume ID %d" , vid )
}
2024-12-03 00:44:07 +08:00
func EcBalance ( commandEnv * CommandEnv , collections [ ] string , dc string , ecReplicaPlacement * super_block . ReplicaPlacement , applyBalancing bool ) ( err error ) {
2024-11-05 09:56:20 +08:00
if len ( collections ) == 0 {
return fmt . Errorf ( "no collections to balance" )
}
// collect all ec nodes
allEcNodes , totalFreeEcSlots , err := collectEcNodes ( commandEnv , dc )
if err != nil {
return err
}
if totalFreeEcSlots < 1 {
return fmt . Errorf ( "no free ec shard slots. only %d left" , totalFreeEcSlots )
}
racks := collectRacks ( allEcNodes )
for _ , c := range collections {
if err = balanceEcVolumes ( commandEnv , c , allEcNodes , racks , applyBalancing ) ; err != nil {
return err
}
}
if err := balanceEcRacks ( commandEnv , racks , applyBalancing ) ; err != nil {
return fmt . Errorf ( "balance ec racks: %v" , err )
}
return nil
}