2019-05-06 12:17:23 +08:00
package shell
import (
"flag"
"fmt"
2021-09-13 13:47:52 +08:00
"github.com/chrislusf/seaweedfs/weed/pb"
2020-09-11 15:29:25 +08:00
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
2021-02-16 18:47:02 +08:00
"github.com/chrislusf/seaweedfs/weed/storage/types"
2022-04-18 10:35:43 +08:00
"golang.org/x/exp/slices"
2019-05-06 12:17:23 +08:00
"io"
"os"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func init ( ) {
2019-06-05 16:30:24 +08:00
Commands = append ( Commands , & commandVolumeBalance { } )
2019-05-06 12:17:23 +08:00
}
type commandVolumeBalance struct {
}
func ( c * commandVolumeBalance ) Name ( ) string {
return "volume.balance"
}
func ( c * commandVolumeBalance ) Help ( ) string {
return ` balance all volumes among volume servers
2019-11-25 14:17:43 +08:00
volume . balance [ - collection ALL | EACH_COLLECTION | < collection_name > ] [ - force ] [ - dataCenter = < data_center_name > ]
2019-05-06 12:17:23 +08:00
Algorithm :
2019-05-08 05:02:01 +08:00
2019-05-06 12:17:23 +08:00
For each type of volume server ( different max volume count limit ) {
2019-05-07 04:30:12 +08:00
for each collection {
balanceWritableVolumes ( )
balanceReadOnlyVolumes ( )
}
2019-05-06 12:17:23 +08:00
}
func balanceWritableVolumes ( ) {
2020-09-12 16:01:19 +08:00
idealWritableVolumeRatio = totalWritableVolumes / totalNumberOfMaxVolumes
2019-06-11 12:32:56 +08:00
for hasMovedOneVolume {
2020-09-12 16:01:19 +08:00
sort all volume servers ordered by the localWritableVolumeRatio = localWritableVolumes to localVolumeMax
pick the volume server B with the highest localWritableVolumeRatio y
for any the volume server A with the number of writable volumes x + 1 <= idealWritableVolumeRatio * localVolumeMax {
if y > localWritableVolumeRatio {
2020-09-11 14:05:00 +08:00
if B has a writable volume id v that A does not have , and satisfy v replication requirements {
move writable volume v from A to B
}
2019-05-06 12:17:23 +08:00
}
}
}
}
func balanceReadOnlyVolumes ( ) {
//similar to balanceWritableVolumes
}
`
}
2019-06-05 16:30:24 +08:00
func ( c * commandVolumeBalance ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
2019-05-06 12:17:23 +08:00
balanceCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
2019-06-03 17:26:31 +08:00
collection := balanceCommand . String ( "collection" , "EACH_COLLECTION" , "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection" )
2019-05-07 04:30:12 +08:00
dc := balanceCommand . String ( "dataCenter" , "" , "only apply the balancing for this dataCenter" )
2019-06-03 17:26:31 +08:00
applyBalancing := balanceCommand . Bool ( "force" , false , "apply the balancing plan." )
2019-05-06 12:17:23 +08:00
if err = balanceCommand . Parse ( args ) ; err != nil {
return nil
}
2022-06-01 05:48:46 +08:00
infoAboutSimulationMode ( writer , * applyBalancing , "-force" )
2019-05-06 12:17:23 +08:00
2021-12-11 05:24:38 +08:00
if err = commandEnv . confirmIsLocked ( args ) ; err != nil {
2021-09-14 13:13:34 +08:00
return
}
2021-02-22 16:28:42 +08:00
// collect topology information
2022-02-08 16:53:55 +08:00
topologyInfo , volumeSizeLimitMb , err := collectTopologyInfo ( commandEnv , 15 * time . Second )
2019-05-06 12:17:23 +08:00
if err != nil {
return err
}
2021-02-22 16:28:42 +08:00
volumeServers := collectVolumeServersByDc ( topologyInfo , * dc )
volumeReplicas , _ := collectVolumeReplicaLocations ( topologyInfo )
diskTypes := collectVolumeDiskTypes ( topologyInfo )
2019-12-24 09:58:47 +08:00
2020-09-12 19:06:26 +08:00
if * collection == "EACH_COLLECTION" {
collections , err := ListCollectionNames ( commandEnv , true , false )
if err != nil {
return err
2019-05-06 12:17:23 +08:00
}
2020-09-12 19:06:26 +08:00
for _ , c := range collections {
2021-02-22 16:28:42 +08:00
if err = balanceVolumeServers ( commandEnv , diskTypes , volumeReplicas , volumeServers , volumeSizeLimitMb * 1024 * 1024 , c , * applyBalancing ) ; err != nil {
2019-05-06 13:28:14 +08:00
return err
}
2019-05-06 12:58:46 +08:00
}
2020-09-12 19:06:26 +08:00
} else if * collection == "ALL_COLLECTIONS" {
2021-02-22 16:28:42 +08:00
if err = balanceVolumeServers ( commandEnv , diskTypes , volumeReplicas , volumeServers , volumeSizeLimitMb * 1024 * 1024 , "ALL_COLLECTIONS" , * applyBalancing ) ; err != nil {
2020-09-12 19:06:26 +08:00
return err
}
} else {
2021-02-22 16:28:42 +08:00
if err = balanceVolumeServers ( commandEnv , diskTypes , volumeReplicas , volumeServers , volumeSizeLimitMb * 1024 * 1024 , * collection , * applyBalancing ) ; err != nil {
2020-09-12 19:06:26 +08:00
return err
}
2019-05-06 12:17:23 +08:00
}
2020-09-12 19:06:26 +08:00
2019-05-06 12:17:23 +08:00
return nil
}
2021-02-16 18:47:02 +08:00
func balanceVolumeServers ( commandEnv * CommandEnv , diskTypes [ ] types . DiskType , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , volumeSizeLimit uint64 , collection string , applyBalancing bool ) error {
2019-05-06 12:17:23 +08:00
2021-02-14 15:25:16 +08:00
for _ , diskType := range diskTypes {
2021-02-14 14:34:12 +08:00
if err := balanceVolumeServersByDiskType ( commandEnv , diskType , volumeReplicas , nodes , volumeSizeLimit , collection , applyBalancing ) ; err != nil {
return err
}
}
return nil
}
2021-02-16 18:47:02 +08:00
func balanceVolumeServersByDiskType ( commandEnv * CommandEnv , diskType types . DiskType , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , volumeSizeLimit uint64 , collection string , applyBalancing bool ) error {
2021-02-14 14:34:12 +08:00
2019-05-06 12:17:23 +08:00
for _ , n := range nodes {
2019-05-06 13:28:14 +08:00
n . selectVolumes ( func ( v * master_pb . VolumeInformationMessage ) bool {
2019-12-24 09:58:47 +08:00
if collection != "ALL_COLLECTIONS" {
2019-05-06 12:17:23 +08:00
if v . Collection != collection {
return false
}
}
2021-08-11 02:37:12 +08:00
return v . DiskType == string ( diskType )
2019-05-06 12:17:23 +08:00
} )
}
2021-08-11 02:10:09 +08:00
if err := balanceSelectedVolume ( commandEnv , diskType , volumeReplicas , nodes , capacityByMaxVolumeCount ( diskType ) , sortWritableVolumes , applyBalancing ) ; err != nil {
2020-12-13 19:40:33 +08:00
return err
}
2019-05-06 12:58:46 +08:00
return nil
2019-05-06 12:17:23 +08:00
}
2020-09-12 19:06:26 +08:00
func collectVolumeServersByDc ( t * master_pb . TopologyInfo , selectedDataCenter string ) ( nodes [ ] * Node ) {
2019-05-06 12:17:23 +08:00
for _ , dc := range t . DataCenterInfos {
2019-05-07 04:30:12 +08:00
if selectedDataCenter != "" && dc . Id != selectedDataCenter {
continue
}
2019-05-06 12:17:23 +08:00
for _ , r := range dc . RackInfos {
for _ , dn := range r . DataNodeInfos {
2020-09-12 19:06:26 +08:00
nodes = append ( nodes , & Node {
2020-02-22 13:23:25 +08:00
info : dn ,
dc : dc . Id ,
rack : r . Id ,
} )
2019-05-06 12:17:23 +08:00
}
}
}
return
}
2021-02-16 18:47:02 +08:00
func collectVolumeDiskTypes ( t * master_pb . TopologyInfo ) ( diskTypes [ ] types . DiskType ) {
2021-02-14 15:25:16 +08:00
knownTypes := make ( map [ string ] bool )
for _ , dc := range t . DataCenterInfos {
for _ , r := range dc . RackInfos {
for _ , dn := range r . DataNodeInfos {
2021-02-16 18:47:02 +08:00
for diskType , _ := range dn . DiskInfos {
if _ , found := knownTypes [ diskType ] ; ! found {
knownTypes [ diskType ] = true
2021-02-14 15:25:16 +08:00
}
}
}
}
}
for diskType , _ := range knownTypes {
2021-02-16 18:47:02 +08:00
diskTypes = append ( diskTypes , types . ToDiskType ( diskType ) )
2021-02-14 15:25:16 +08:00
}
return
}
2019-05-06 12:17:23 +08:00
type Node struct {
info * master_pb . DataNodeInfo
selectedVolumes map [ uint32 ] * master_pb . VolumeInformationMessage
2020-02-22 13:23:25 +08:00
dc string
rack string
2019-05-06 12:17:23 +08:00
}
2020-12-13 19:40:33 +08:00
type CapacityFunc func ( * master_pb . DataNodeInfo ) int
2021-02-16 18:47:02 +08:00
func capacityByMaxVolumeCount ( diskType types . DiskType ) CapacityFunc {
return func ( info * master_pb . DataNodeInfo ) int {
2021-03-15 11:49:56 +08:00
diskInfo , found := info . DiskInfos [ string ( diskType ) ]
2021-02-16 20:16:46 +08:00
if ! found {
return 0
}
2021-02-16 18:47:02 +08:00
return int ( diskInfo . MaxVolumeCount )
}
2020-12-13 19:40:33 +08:00
}
2021-02-16 20:27:16 +08:00
func capacityByFreeVolumeCount ( diskType types . DiskType ) CapacityFunc {
return func ( info * master_pb . DataNodeInfo ) int {
diskInfo , found := info . DiskInfos [ string ( diskType ) ]
if ! found {
return 0
}
return int ( diskInfo . MaxVolumeCount - diskInfo . VolumeCount )
}
}
2020-12-13 19:40:33 +08:00
func ( n * Node ) localVolumeRatio ( capacityFunc CapacityFunc ) float64 {
return divide ( len ( n . selectedVolumes ) , capacityFunc ( n . info ) )
2020-09-12 19:06:26 +08:00
}
2020-12-13 19:40:33 +08:00
func ( n * Node ) localVolumeNextRatio ( capacityFunc CapacityFunc ) float64 {
return divide ( len ( n . selectedVolumes ) + 1 , capacityFunc ( n . info ) )
2020-09-12 19:06:26 +08:00
}
func ( n * Node ) selectVolumes ( fn func ( v * master_pb . VolumeInformationMessage ) bool ) {
n . selectedVolumes = make ( map [ uint32 ] * master_pb . VolumeInformationMessage )
2021-02-16 18:47:02 +08:00
for _ , diskInfo := range n . info . DiskInfos {
for _ , v := range diskInfo . VolumeInfos {
if fn ( v ) {
n . selectedVolumes [ v . Id ] = v
}
2020-09-12 19:06:26 +08:00
}
}
}
2019-05-06 12:17:23 +08:00
func sortWritableVolumes ( volumes [ ] * master_pb . VolumeInformationMessage ) {
2022-04-18 10:35:43 +08:00
slices . SortFunc ( volumes , func ( a , b * master_pb . VolumeInformationMessage ) bool {
return a . Size < b . Size
2019-05-06 12:17:23 +08:00
} )
}
func sortReadOnlyVolumes ( volumes [ ] * master_pb . VolumeInformationMessage ) {
2022-04-18 10:35:43 +08:00
slices . SortFunc ( volumes , func ( a , b * master_pb . VolumeInformationMessage ) bool {
return a . Id < b . Id
2019-05-06 12:17:23 +08:00
} )
}
2021-08-11 02:10:09 +08:00
func balanceSelectedVolume ( commandEnv * CommandEnv , diskType types . DiskType , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , capacityFunc CapacityFunc , sortCandidatesFn func ( volumes [ ] * master_pb . VolumeInformationMessage ) , applyBalancing bool ) ( err error ) {
2020-09-12 19:06:26 +08:00
selectedVolumeCount , volumeMaxCount := 0 , 0
2020-12-19 02:39:30 +08:00
var nodesWithCapacity [ ] * Node
2019-05-06 12:17:23 +08:00
for _ , dn := range nodes {
selectedVolumeCount += len ( dn . selectedVolumes )
2020-12-19 02:39:30 +08:00
capacity := capacityFunc ( dn . info )
if capacity > 0 {
nodesWithCapacity = append ( nodesWithCapacity , dn )
}
volumeMaxCount += capacity
2019-05-06 12:17:23 +08:00
}
2020-09-12 19:06:26 +08:00
idealVolumeRatio := divide ( selectedVolumeCount , volumeMaxCount )
2019-05-06 12:17:23 +08:00
2020-09-11 14:05:00 +08:00
hasMoved := true
2019-05-06 12:17:23 +08:00
2020-12-14 16:11:52 +08:00
// fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio)
2020-09-11 14:05:00 +08:00
for hasMoved {
hasMoved = false
2022-04-18 10:35:43 +08:00
slices . SortFunc ( nodesWithCapacity , func ( a , b * Node ) bool {
return a . localVolumeRatio ( capacityFunc ) < b . localVolumeRatio ( capacityFunc )
2019-05-06 12:17:23 +08:00
} )
2022-07-02 03:16:18 +08:00
if len ( nodesWithCapacity ) == 0 {
fmt . Printf ( "no volume server found with capacity for %s" , diskType . ReadableString ( ) )
return nil
}
2020-12-19 02:39:30 +08:00
fullNode := nodesWithCapacity [ len ( nodesWithCapacity ) - 1 ]
2020-09-11 14:05:00 +08:00
var candidateVolumes [ ] * master_pb . VolumeInformationMessage
for _ , v := range fullNode . selectedVolumes {
candidateVolumes = append ( candidateVolumes , v )
}
sortCandidatesFn ( candidateVolumes )
2020-12-19 02:39:30 +08:00
for i := 0 ; i < len ( nodesWithCapacity ) - 1 ; i ++ {
emptyNode := nodesWithCapacity [ i ]
2020-12-13 19:40:33 +08:00
if ! ( fullNode . localVolumeRatio ( capacityFunc ) > idealVolumeRatio && emptyNode . localVolumeNextRatio ( capacityFunc ) <= idealVolumeRatio ) {
2020-09-11 14:05:00 +08:00
// no more volume servers with empty slots
break
2019-05-06 12:17:23 +08:00
}
2021-08-11 03:33:29 +08:00
fmt . Fprintf ( os . Stdout , "%s %.2f %.2f:%.2f\t" , diskType . ReadableString ( ) , idealVolumeRatio , fullNode . localVolumeRatio ( capacityFunc ) , emptyNode . localVolumeNextRatio ( capacityFunc ) )
2020-09-11 15:29:25 +08:00
hasMoved , err = attemptToMoveOneVolume ( commandEnv , volumeReplicas , fullNode , candidateVolumes , emptyNode , applyBalancing )
2020-09-11 14:05:00 +08:00
if err != nil {
return
}
if hasMoved {
// moved one volume
break
2019-05-06 12:17:23 +08:00
}
}
}
2019-05-06 12:58:46 +08:00
return nil
2019-05-06 12:17:23 +08:00
}
2020-09-11 15:29:25 +08:00
func attemptToMoveOneVolume ( commandEnv * CommandEnv , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , fullNode * Node , candidateVolumes [ ] * master_pb . VolumeInformationMessage , emptyNode * Node , applyBalancing bool ) ( hasMoved bool , err error ) {
2020-09-11 14:05:00 +08:00
for _ , v := range candidateVolumes {
2020-09-15 14:47:11 +08:00
hasMoved , err = maybeMoveOneVolume ( commandEnv , volumeReplicas , fullNode , v , emptyNode , applyBalancing )
if err != nil {
return
2020-09-11 14:05:00 +08:00
}
2020-09-15 14:47:11 +08:00
if hasMoved {
break
}
}
return
}
func maybeMoveOneVolume ( commandEnv * CommandEnv , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , fullNode * Node , candidateVolume * master_pb . VolumeInformationMessage , emptyNode * Node , applyChange bool ) ( hasMoved bool , err error ) {
if candidateVolume . ReplicaPlacement > 0 {
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( candidateVolume . ReplicaPlacement ) )
if ! isGoodMove ( replicaPlacement , volumeReplicas [ candidateVolume . Id ] , fullNode , emptyNode ) {
return false , nil
}
}
if _ , found := emptyNode . selectedVolumes [ candidateVolume . Id ] ; ! found {
if err = moveVolume ( commandEnv , candidateVolume , fullNode , emptyNode , applyChange ) ; err == nil {
adjustAfterMove ( candidateVolume , volumeReplicas , fullNode , emptyNode )
return true , nil
} else {
return
2020-09-11 14:05:00 +08:00
}
}
return
}
2020-09-15 14:47:11 +08:00
func moveVolume ( commandEnv * CommandEnv , v * master_pb . VolumeInformationMessage , fullNode * Node , emptyNode * Node , applyChange bool ) error {
2019-05-06 12:17:23 +08:00
collectionPrefix := v . Collection + "_"
if v . Collection == "" {
collectionPrefix = ""
}
2020-12-14 16:11:52 +08:00
fmt . Fprintf ( os . Stdout , " moving %s volume %s%d %s => %s\n" , v . DiskType , collectionPrefix , v . Id , fullNode . info . Id , emptyNode . info . Id )
2020-09-15 14:47:11 +08:00
if applyChange {
2021-09-13 13:47:52 +08:00
return LiveMoveVolume ( commandEnv . option . GrpcDialOption , os . Stderr , needle . VolumeId ( v . Id ) , pb . NewServerAddressFromDataNode ( fullNode . info ) , pb . NewServerAddressFromDataNode ( emptyNode . info ) , 5 * time . Second , v . DiskType , false )
2019-05-06 12:17:23 +08:00
}
2019-05-06 12:58:46 +08:00
return nil
2019-05-06 12:17:23 +08:00
}
2020-09-11 15:29:25 +08:00
func isGoodMove ( placement * super_block . ReplicaPlacement , existingReplicas [ ] * VolumeReplica , sourceNode , targetNode * Node ) bool {
for _ , replica := range existingReplicas {
if replica . location . dataNode . Id == targetNode . info . Id &&
replica . location . rack == targetNode . rack &&
replica . location . dc == targetNode . dc {
// never move to existing nodes
return false
}
}
dcs , racks := make ( map [ string ] bool ) , make ( map [ string ] int )
for _ , replica := range existingReplicas {
if replica . location . dataNode . Id != sourceNode . info . Id {
dcs [ replica . location . DataCenter ( ) ] = true
racks [ replica . location . Rack ( ) ] ++
}
}
dcs [ targetNode . dc ] = true
racks [ fmt . Sprintf ( "%s %s" , targetNode . dc , targetNode . rack ) ] ++
2020-11-20 17:12:25 +08:00
if len ( dcs ) != placement . DiffDataCenterCount + 1 {
2020-09-11 15:29:25 +08:00
return false
}
2020-11-20 17:12:25 +08:00
if len ( racks ) != placement . DiffRackCount + placement . DiffDataCenterCount + 1 {
2020-09-11 15:29:25 +08:00
return false
}
for _ , sameRackCount := range racks {
2020-11-20 17:12:25 +08:00
if sameRackCount != placement . SameRackCount + 1 {
2020-09-11 15:29:25 +08:00
return false
}
}
return true
}
2020-09-12 16:01:19 +08:00
func adjustAfterMove ( v * master_pb . VolumeInformationMessage , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , fullNode * Node , emptyNode * Node ) {
delete ( fullNode . selectedVolumes , v . Id )
2020-09-15 14:47:11 +08:00
if emptyNode . selectedVolumes != nil {
emptyNode . selectedVolumes [ v . Id ] = v
}
2020-09-12 16:01:19 +08:00
existingReplicas := volumeReplicas [ v . Id ]
for _ , replica := range existingReplicas {
if replica . location . dataNode . Id == fullNode . info . Id &&
replica . location . rack == fullNode . rack &&
replica . location . dc == fullNode . dc {
2021-03-15 12:29:55 +08:00
loc := newLocation ( emptyNode . dc , emptyNode . rack , emptyNode . info )
replica . location = & loc
2020-09-12 16:01:19 +08:00
return
}
}
}