fix deadlock hang when broadcast to clients (#6184)

fix deadlock when broadcast to clients

when master thransfer leader, the old master will disconnect with all
filers and volumeServers, if the cluster is a big , the broadcast
messages may be more big than the max of the channel len 100, then if the
KeepConnect was not listen on the channel in disconnect, it will
deadlock. and the whole cluster will not serve!
This commit is contained in:
wyang 2024-11-04 15:20:48 +08:00 committed by GitHub
parent 0f2c3648dc
commit a7973ed7d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 5 deletions

View File

@ -4,12 +4,13 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"net" "net"
"sort" "sort"
"time" "time"
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/backend"
@ -89,7 +90,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
ms.UnRegisterUuids(dn.Ip, dn.Port) ms.UnRegisterUuids(dn.Ip, dn.Port)
if len(message.DeletedVids) > 0 || len(message.DeletedEcVids) > 0 { if ms.Topo.IsLeader() && (len(message.DeletedVids) > 0 || len(message.DeletedEcVids) > 0) {
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message}) ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
} }
} }
@ -338,8 +339,14 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedResponse) { func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedResponse) {
ms.clientChansLock.RLock() ms.clientChansLock.RLock()
for _, ch := range ms.clientChans { for client, ch := range ms.clientChans {
ch <- message select {
case ch <- message:
glog.V(4).Infof("send message to %s", client)
default:
stats.MasterBroadcastToFullErrorCounter.Inc()
glog.Errorf("broadcastToClients %s message full", client)
}
} }
ms.clientChansLock.RUnlock() ms.clientChansLock.RUnlock()
} }

View File

@ -94,6 +94,14 @@ var (
Help: "Counter of master pick for write error", Help: "Counter of master pick for write error",
}) })
MasterBroadcastToFullErrorCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "master",
Name: "broadcast_to_full",
Help: "Counter of master broadcast send to full message channel err",
})
MasterLeaderChangeCounter = prometheus.NewCounterVec( MasterLeaderChangeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: Namespace, Namespace: Namespace,
@ -314,6 +322,7 @@ func init() {
Gather.MustRegister(MasterReplicaPlacementMismatch) Gather.MustRegister(MasterReplicaPlacementMismatch)
Gather.MustRegister(MasterVolumeLayoutWritable) Gather.MustRegister(MasterVolumeLayoutWritable)
Gather.MustRegister(MasterVolumeLayoutCrowded) Gather.MustRegister(MasterVolumeLayoutCrowded)
Gather.MustRegister(MasterBroadcastToFullErrorCounter)
Gather.MustRegister(FilerRequestCounter) Gather.MustRegister(FilerRequestCounter)
Gather.MustRegister(FilerHandlerCounter) Gather.MustRegister(FilerHandlerCounter)