mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-12-30 04:57:55 +08:00
[shell] only apply the balancing for writable volumes (#6346)
This commit is contained in:
parent
9987a65e8a
commit
0a4b1909a2
@ -24,6 +24,10 @@ func init() {
|
||||
}
|
||||
|
||||
type commandVolumeBalance struct {
|
||||
volumeSizeLimitMb uint64
|
||||
commandEnv *CommandEnv
|
||||
writable bool
|
||||
applyBalancing bool
|
||||
}
|
||||
|
||||
func (c *commandVolumeBalance) Name() string {
|
||||
@ -76,12 +80,13 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
|
||||
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
|
||||
racks := balanceCommand.String("racks", "", "only apply the balancing for this racks")
|
||||
nodes := balanceCommand.String("nodes", "", "only apply the balancing for this nodes")
|
||||
c.writable = *(balanceCommand.Bool("writable", false, "only apply the balancing for writable volumes"))
|
||||
noLock := balanceCommand.Bool("noLock", false, "do not lock the admin shell at one's own risk")
|
||||
applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan.")
|
||||
c.applyBalancing = *(balanceCommand.Bool("force", false, "apply the balancing plan."))
|
||||
if err = balanceCommand.Parse(args); err != nil {
|
||||
return nil
|
||||
}
|
||||
infoAboutSimulationMode(writer, *applyBalancing, "-force")
|
||||
infoAboutSimulationMode(writer, c.applyBalancing, "-force")
|
||||
|
||||
if *noLock {
|
||||
commandEnv.noLock = true
|
||||
@ -90,9 +95,11 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
|
||||
return
|
||||
}
|
||||
}
|
||||
c.commandEnv = commandEnv
|
||||
|
||||
// collect topology information
|
||||
topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Second)
|
||||
var topologyInfo *master_pb.TopologyInfo
|
||||
topologyInfo, c.volumeSizeLimitMb, err = collectTopologyInfo(commandEnv, 5*time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -106,13 +113,13 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, c := range collections {
|
||||
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, c, *applyBalancing); err != nil {
|
||||
for _, col := range collections {
|
||||
if err = c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, col); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, *collection, *applyBalancing); err != nil {
|
||||
if err = c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, *collection); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -120,10 +127,10 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
|
||||
return nil
|
||||
}
|
||||
|
||||
func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string, applyBalancing bool) error {
|
||||
func (c *commandVolumeBalance) balanceVolumeServers(diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string) error {
|
||||
|
||||
for _, diskType := range diskTypes {
|
||||
if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, collection, applyBalancing); err != nil {
|
||||
if err := c.balanceVolumeServersByDiskType(diskType, volumeReplicas, nodes, collection); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -131,7 +138,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, vo
|
||||
|
||||
}
|
||||
|
||||
func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string, applyBalancing bool) error {
|
||||
func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string) error {
|
||||
|
||||
for _, n := range nodes {
|
||||
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
|
||||
@ -140,10 +147,16 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskT
|
||||
return false
|
||||
}
|
||||
}
|
||||
return v.DiskType == string(diskType)
|
||||
if v.DiskType != string(diskType) {
|
||||
return false
|
||||
}
|
||||
if c.writable && v.Size > c.volumeSizeLimitMb {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
if err := balanceSelectedVolume(commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil {
|
||||
if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -255,8 +255,8 @@ func TestBalance(t *testing.T) {
|
||||
volumeServers := collectVolumeServersByDcRackNode(topologyInfo, "", "", "")
|
||||
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
|
||||
diskTypes := collectVolumeDiskTypes(topologyInfo)
|
||||
|
||||
if err := balanceVolumeServers(nil, diskTypes, volumeReplicas, volumeServers, "ALL_COLLECTIONS", false); err != nil {
|
||||
c := &commandVolumeBalance{}
|
||||
if err := c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, "ALL_COLLECTIONS"); err != nil {
|
||||
t.Errorf("balance: %v", err)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user