diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 9292a6184..c1984c05e 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -37,6 +37,9 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. glog.V(0).Infof("read topic %s conf: %v", request.Topic, err) } else { err = b.ensureTopicActiveAssignments(t, resp) + // no need to assign directly. + // The added or updated assignees will read from filer directly. + // The gone assignees will die by themselves. } if err == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) { glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) @@ -55,10 +58,6 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) } - if assignErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, true); assignErr != nil { - return nil, assignErr - } - glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) return resp, err