From efb695fd931619831c57a34f61ee3bdf9e5d2339 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 2 Jan 2024 17:29:35 -0800 Subject: [PATCH] lookup existing topic partitions --- weed/mq/broker/broker_grpc_lookup.go | 3 +-- weed/mq/pub_balancer/lookup.go | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index ac13a7581..fa5f81172 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -14,7 +14,6 @@ import ( // // 2. find the topic partitions on the filer // 2.1 if the topic is not found, return error -// 2.1.1 if the request is_for_subscribe, return error not found // 2.1.2 if the request is_for_publish, create the topic // 2.2 if the topic is found, return the brokers // @@ -36,7 +35,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq ret := &mq_pb.LookupTopicBrokersResponse{} ret.Topic = request.Topic - ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, 6) + ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, -1) return ret, err } diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go index 3e103a650..33c5a864b 100644 --- a/weed/mq/pub_balancer/lookup.go +++ b/weed/mq/pub_balancer/lookup.go @@ -2,7 +2,6 @@ package pub_balancer import ( "errors" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -34,8 +33,8 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu } } } - if len(assignments) > 0 && len(assignments) == int(partitionCount) || !publish { - glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments) + if len(assignments) > 0 || !publish { + // glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments) return assignments, nil }