From e3c8be7f899be52c8fa9bcff35c886d69e2ca693 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 27 Mar 2024 23:55:06 -0700 Subject: [PATCH] LocalPartition shutdown --- weed/mq/topic/local_partition.go | 36 ++++++++++++++++---------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 8b9970f20..a9f861710 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -182,26 +182,26 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa } func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { - if !p.Publishers.IsEmpty() { - return - } - if !p.Subscribers.IsEmpty() { - return - } - p.LogBuffer.ShutdownLogBuffer() - if p.followerStream != nil { - // send close to the follower - if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ - Message: &mq_pb.PublishFollowMeRequest_Close{ - Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, - }, - }); followErr != nil { - glog.Errorf("Error closing follower stream: %v", followErr) + if p.Publishers.IsEmpty() { + if p.followerStream != nil { + // send close to the follower + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Close{ + Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, + }, + }); followErr != nil { + glog.Errorf("Error closing follower stream: %v", followErr) + } + glog.V(4).Infof("closing grpcConnection to follower") + p.followerGrpcConnection.Close() + p.followerStream = nil } - glog.V(4).Infof("closing grpcConnection to follower") - p.followerGrpcConnection.Close() - p.followerStream = nil + } + + if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { + p.LogBuffer.ShutdownLogBuffer() + hasShutdown = true } return