From b0a2e9aea37937d40c9fa097803237bf89a7f908 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 21 Jan 2024 01:27:22 -0800 Subject: [PATCH] fix assignments if brokers changed --- .../mq/broker/broker_topic_conf_read_write.go | 11 ++++++++++ weed/mq/pub_balancer/allocate.go | 20 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 3294aa5cb..6c9a38e6d 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -46,5 +47,15 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. }); err != nil { return nil, err } + + // also fix assignee broker if invalid + changedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments) + if len(changedAssignments) > 0 { + glog.V(0).Infof("topic %v partition assignments changed: %v", t, changedAssignments) + if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil { + return nil, err + } + } + return conf, err } diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index 8fa0214ad..7822f5ed9 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -55,3 +55,23 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) } return pickedBrokers } + +func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string,*BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (changedAssignments []*mq_pb.BrokerPartitionAssignment) { + for _, assignment := range assignments { + if assignment.LeaderBroker == "" { + changedAssignments = append(changedAssignments, assignment) + continue + } + if _, found := activeBrokers.Get(assignment.LeaderBroker); !found { + changedAssignments = append(changedAssignments, assignment) + continue + } + } + + // pick the brokers with the least number of partitions + pickedBrokers := pickBrokers(activeBrokers, int32(len(changedAssignments))) + for i, assignment := range changedAssignments { + assignment.LeaderBroker = pickedBrokers[i] + } + return changedAssignments +}