mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-12-15 11:09:06 +08:00
580940bf82
* balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
93 lines
2.6 KiB
Go
93 lines
2.6 KiB
Go
package broker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"time"
|
|
)
|
|
|
|
func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
|
|
|
|
t := topic.FromPbTopic(req.GetInit().Topic)
|
|
partition := topic.FromPbPartition(req.GetInit().Partition)
|
|
localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition)
|
|
if localTopicPartition == nil {
|
|
stream.Send(&mq_pb.SubscribeResponse{
|
|
Message: &mq_pb.SubscribeResponse_Ctrl{
|
|
Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{
|
|
Error: "not initialized",
|
|
},
|
|
},
|
|
})
|
|
return nil
|
|
}
|
|
|
|
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
|
|
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
|
|
glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
|
|
isConnected := true
|
|
sleepIntervalCount := 0
|
|
defer func() {
|
|
isConnected = false
|
|
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
|
|
glog.V(0).Infof("Subscriber %s on %v %v disconnected", clientName, t, partition)
|
|
}()
|
|
|
|
ctx := stream.Context()
|
|
var startTime time.Time
|
|
if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 {
|
|
startTime = time.Unix(0, startTs)
|
|
} else {
|
|
startTime = time.Now()
|
|
}
|
|
|
|
localTopicPartition.Subscribe(clientName, startTime, func() bool {
|
|
if !isConnected {
|
|
return false
|
|
}
|
|
sleepIntervalCount++
|
|
if sleepIntervalCount > 10 {
|
|
sleepIntervalCount = 10
|
|
}
|
|
time.Sleep(time.Duration(sleepIntervalCount) * 2339 * time.Millisecond)
|
|
|
|
// Check if the client has disconnected by monitoring the context
|
|
select {
|
|
case <-ctx.Done():
|
|
err := ctx.Err()
|
|
if err == context.Canceled {
|
|
// Client disconnected
|
|
return false
|
|
}
|
|
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
|
|
return false
|
|
default:
|
|
// Continue processing the request
|
|
}
|
|
|
|
return true
|
|
}, func(logEntry *filer_pb.LogEntry) error {
|
|
// reset the sleep interval count
|
|
sleepIntervalCount = 0
|
|
|
|
value := logEntry.GetData()
|
|
if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
|
|
Data: &mq_pb.DataMessage{
|
|
Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)),
|
|
Value: value,
|
|
TsNs: logEntry.TsNs,
|
|
},
|
|
}}); err != nil {
|
|
glog.Errorf("Error sending setup response: %v", err)
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return nil
|
|
}
|