Merge pull request #3338 from kmlebedev/issues/3083

rollback over onPeerUpdate implementation of automatic clean-up of failed servers in favor of synchronous ping
This commit is contained in:
Chris Lu 2022-08-01 08:23:10 -07:00 committed by GitHub
commit b59bc607bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 51 deletions

View File

@ -102,7 +102,7 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres
if err != nil {
errLvl := glog.Level(0)
if strings.Contains(err.Error(), "duplicated local subscription detected") {
errLvl = glog.Level(1)
errLvl = glog.Level(4)
}
glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
}

View File

@ -1,7 +1,6 @@
package weed_server
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/stats"
"net/http"
@ -32,9 +31,8 @@ import (
)
const (
SequencerType = "master.sequencer.type"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
RaftServerRemovalTime = 72 * time.Minute
SequencerType = "master.sequencer.type"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
)
type MasterOption struct {
@ -65,9 +63,6 @@ type MasterServer struct {
boundedLeaderChan chan int
onPeerUpdateDoneCn chan string
onPeerUpdateDoneCnExist bool
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.KeepConnectedResponse
@ -118,7 +113,6 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
ms.onPeerUpdateDoneCn = make(chan string)
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
@ -351,50 +345,18 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
peerAddress := pb.ServerAddress(update.Address)
peerName := string(peerAddress)
isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
if update.IsAdd {
if isLeader {
raftServerFound := false
for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
if string(server.ID) == peerName {
raftServerFound = true
}
}
if !raftServerFound {
glog.V(0).Infof("adding new raft server: %s", peerName)
ms.Topo.HashicorpRaft.AddVoter(
hashicorpRaft.ServerID(peerName),
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
if update.IsAdd && isLeader {
raftServerFound := false
for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
if string(server.ID) == peerName {
raftServerFound = true
}
}
if ms.onPeerUpdateDoneCnExist {
ms.onPeerUpdateDoneCn <- peerName
if !raftServerFound {
glog.V(0).Infof("adding new raft server: %s", peerName)
ms.Topo.HashicorpRaft.AddVoter(
hashicorpRaft.ServerID(peerName),
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
} else if isLeader {
go func(peerName string) {
raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime)
for {
select {
case <-raftServerRemovalTimeAfter:
err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
Id: peerName,
Force: false,
})
return err
})
if err != nil {
glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
}
glog.V(0).Infof("old raft server %s removed", peerName)
return
case peerDone := <-ms.onPeerUpdateDoneCn:
if peerName == peerDone {
glog.V(0).Infof("raft server %s remove canceled", peerName)
return
}
}
}
}(peerName)
ms.onPeerUpdateDoneCnExist = true
}
}