mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-25 03:29:10 +08:00
sync update topologyInfo
This commit is contained in:
parent
2b4112e462
commit
73a0dea16b
@ -24,7 +24,6 @@ type commandVolumeServerEvacuate struct {
|
||||
topologyInfo *master_pb.TopologyInfo
|
||||
targetServer string
|
||||
volumeRack string
|
||||
otherNodes []*Node
|
||||
}
|
||||
|
||||
func (c *commandVolumeServerEvacuate) Name() string {
|
||||
@ -98,28 +97,6 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn
|
||||
return err
|
||||
}
|
||||
|
||||
if applyChange {
|
||||
stopchan := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
default:
|
||||
if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil {
|
||||
fmt.Fprintf(writer, "update topologyInfo %v", err)
|
||||
} else {
|
||||
c.topologyInfo = topologyInfo
|
||||
_, c.otherNodes = c.nodesOtherThan(
|
||||
collectVolumeServersByDc(c.topologyInfo, ""), volumeServer)
|
||||
fmt.Fprintf(writer, "topologyInfo updated %v\n", len(c.otherNodes))
|
||||
}
|
||||
case <-stopchan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(stopchan)
|
||||
}
|
||||
|
||||
if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -134,18 +111,34 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn
|
||||
func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
|
||||
// find this volume server
|
||||
volumeServers := collectVolumeServersByDc(c.topologyInfo, "")
|
||||
var thisNodes []*Node
|
||||
thisNodes, c.otherNodes = c.nodesOtherThan(volumeServers, volumeServer)
|
||||
thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer)
|
||||
if len(thisNodes) == 0 {
|
||||
return fmt.Errorf("%s is not found in this cluster", volumeServer)
|
||||
}
|
||||
|
||||
// move away normal volumes
|
||||
ticker := time.NewTicker(topologyInfoUpdateInterval)
|
||||
for _, thisNode := range thisNodes {
|
||||
for _, diskInfo := range thisNode.info.DiskInfos {
|
||||
if applyChange {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil {
|
||||
fmt.Fprintf(writer, "update topologyInfo %v", err)
|
||||
} else {
|
||||
_, otherNodesNew := c.nodesOtherThan(
|
||||
collectVolumeServersByDc(topologyInfo, ""), volumeServer)
|
||||
if len(otherNodesNew) > 0 {
|
||||
otherNodes = otherNodesNew
|
||||
c.topologyInfo = topologyInfo
|
||||
fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
|
||||
for _, vol := range diskInfo.VolumeInfos {
|
||||
hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, c.otherNodes, applyChange)
|
||||
hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
|
||||
if err != nil {
|
||||
fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user