seaweedfs/weed/shell/command_ec_balance.go

133 lines
4.2 KiB
Go
Raw Normal View History

2019-05-30 16:38:59 +08:00
package shell
import (
"flag"
"fmt"
"io"
2019-05-30 16:38:59 +08:00
)
func init() {
Commands = append(Commands, &commandEcBalance{})
2019-05-30 16:38:59 +08:00
}
type commandEcBalance struct {
}
func (c *commandEcBalance) Name() string {
return "ec.balance"
}
// TODO: Update help string and move to command_ec_common.go once shard replica placement logic is enabled.
2019-05-30 16:38:59 +08:00
func (c *commandEcBalance) Help() string {
2019-06-11 12:32:56 +08:00
return `balance all ec shards among all racks and volume servers
2019-05-30 16:38:59 +08:00
ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>] [-shardReplicaPlacement <replica_placement>]
2019-05-30 16:38:59 +08:00
Algorithm:
func EcBalance() {
2019-06-11 12:32:56 +08:00
for each collection:
balanceEcVolumes(collectionName)
for each rack:
balanceEcRack(rack)
}
func balanceEcVolumes(collectionName){
for each volume:
doDeduplicateEcShards(volumeId)
tracks rack~shardCount mapping
for each volume:
doBalanceEcShardsAcrossRacks(volumeId)
for each volume:
doBalanceEcShardsWithinRacks(volumeId)
}
// spread ec shards into more racks
func doBalanceEcShardsAcrossRacks(volumeId){
tracks rack~volumeIdShardCount mapping
averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
for each ecShardsToMove {
destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, averageShardsPerEcRack, ecShardReplicaPlacement)
2019-06-11 12:32:56 +08:00
destVolumeServers = volume servers on the destRack
pickOneEcNodeAndMoveOneShard(destVolumeServers)
}
}
func doBalanceEcShardsWithinRacks(volumeId){
racks = collect all racks that the volume id is on
for rack, shards := range racks
doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
}
// move ec shards
func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
tracks volumeServer~volumeIdShardCount mapping
averageShardCount = len(shards) / numVolumeServers
volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
ecShardsToMove = select overflown ec shards from volumeServersOverAverage
for each ecShardsToMove {
destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, averageShardCount, ecShardReplicaPlacement)
2019-06-11 12:32:56 +08:00
pickOneEcNodeAndMoveOneShard(destVolumeServers)
2019-05-30 16:38:59 +08:00
}
}
2019-06-11 12:32:56 +08:00
// move ec shards while keeping shard distribution for the same volume unchanged or more even
func balanceEcRack(rack){
averageShardCount = total shards / numVolumeServers
for hasMovedOneEcShard {
sort all volume servers ordered by the number of local ec shards
pick the volume server A with the lowest number of ec shards x
pick the volume server B with the highest number of ec shards y
if y > averageShardCount and x +1 <= averageShardCount {
if B has a ec shard with volume id v that A does not have {
move one ec shard v from B to A
hasMovedOneEcShard = true
2019-05-30 16:38:59 +08:00
}
}
}
}
`
}
2024-09-30 01:38:22 +08:00
func (c *commandEcBalance) HasTag(CommandTag) bool {
return false
}
func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
2019-05-30 16:38:59 +08:00
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
2019-06-03 17:26:31 +08:00
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
2019-05-30 16:38:59 +08:00
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
shardReplicaPlacement := balanceCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
2019-06-03 17:26:31 +08:00
applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan")
2019-05-30 16:38:59 +08:00
if err = balanceCommand.Parse(args); err != nil {
return nil
}
2022-06-01 05:48:46 +08:00
infoAboutSimulationMode(writer, *applyBalancing, "-force")
2019-05-30 16:38:59 +08:00
2021-12-11 05:24:38 +08:00
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
var collections []string
2019-06-11 12:32:56 +08:00
if *collection == "EACH_COLLECTION" {
collections, err = ListCollectionNames(commandEnv, false, true)
2019-06-11 12:32:56 +08:00
if err != nil {
return err
2019-05-30 16:38:59 +08:00
}
2019-06-11 12:32:56 +08:00
} else {
collections = append(collections, *collection)
2019-05-30 16:38:59 +08:00
}
fmt.Printf("balanceEcVolumes collections %+v\n", len(collections))
2019-06-11 12:32:56 +08:00
rp, err := parseReplicaPlacementArg(commandEnv, *shardReplicaPlacement)
if err != nil {
return err
}
return EcBalance(commandEnv, collections, *dc, rp, *applyBalancing)
2019-06-06 14:20:26 +08:00
}