seaweedfs/weed/mq/broker/broker_grpc_lookup.go

48 lines
1.7 KiB
Go
Raw Normal View History

2023-08-28 09:33:46 +08:00
package broker
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
2023-09-20 05:08:17 +08:00
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
2023-08-28 09:33:46 +08:00
)
// FindTopicBrokers returns the brokers that are serving the topic
//
// 1. lock the topic
//
// 2. find the topic partitions on the filer
// 2.1 if the topic is not found, return error
// 2.2 if the request is_for_publish, create the topic
// 2.2.1 if the request is_for_subscribe, return error not found
// 2.2.2 if the request is_for_publish, create the topic
// 2.2 if the topic is found, return the brokers
//
// 3. unlock the topic
2023-09-20 05:08:17 +08:00
func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
if broker.currentBalancer == "" {
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
if !broker.lockAsBalancer.IsLocked() {
proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.LookupTopicBrokers(ctx, request)
return nil
})
if proxyErr != nil {
return nil, proxyErr
}
return resp, err
}
2023-08-28 09:33:46 +08:00
2023-09-20 05:08:17 +08:00
ret := &mq_pb.LookupTopicBrokersResponse{}
2023-08-28 09:59:04 +08:00
ret.Topic = request.Topic
2023-09-20 05:08:17 +08:00
ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish)
return ret, err
2023-08-28 09:33:46 +08:00
}
// CheckTopicPartitionsStatus check the topic partitions on the broker
func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
return ret, nil
}