2024-05-21 03:32:12 +08:00
package sub_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
2024-11-05 04:08:25 +08:00
"reflect"
2024-05-21 03:32:12 +08:00
)
2024-05-21 23:42:04 +08:00
func ( sub * TopicSubscriber ) onEachPartition ( assigned * mq_pb . BrokerPartitionAssignment , stopCh chan struct { } ) error {
2024-05-21 03:32:12 +08:00
// connect to the partition broker
return pb . WithBrokerGrpcClient ( true , assigned . LeaderBroker , sub . SubscriberConfig . GrpcDialOption , func ( client mq_pb . SeaweedMessagingClient ) error {
subscribeClient , err := client . SubscribeMessage ( context . Background ( ) )
if err != nil {
return fmt . Errorf ( "create subscribe client: %v" , err )
}
perPartitionConcurrency := sub . SubscriberConfig . PerPartitionConcurrency
if perPartitionConcurrency <= 0 {
perPartitionConcurrency = 1
}
2024-11-05 04:08:25 +08:00
var stopTsNs int64
if ! sub . ContentConfig . StopTime . IsZero ( ) {
stopTsNs = sub . ContentConfig . StopTime . UnixNano ( )
}
2024-05-21 03:32:12 +08:00
if err = subscribeClient . Send ( & mq_pb . SubscribeMessageRequest {
Message : & mq_pb . SubscribeMessageRequest_Init {
Init : & mq_pb . SubscribeMessageRequest_InitMessage {
ConsumerGroup : sub . SubscriberConfig . ConsumerGroup ,
ConsumerId : sub . SubscriberConfig . ConsumerGroupInstanceId ,
Topic : sub . ContentConfig . Topic . ToPbTopic ( ) ,
PartitionOffset : & mq_pb . PartitionOffset {
Partition : assigned . Partition ,
2024-11-05 04:08:25 +08:00
StartTsNs : sub . ContentConfig . StartTime . UnixNano ( ) ,
StopTsNs : stopTsNs ,
2024-05-21 03:32:12 +08:00
StartType : mq_pb . PartitionOffsetStartType_EARLIEST_IN_MEMORY ,
} ,
Filter : sub . ContentConfig . Filter ,
FollowerBroker : assigned . FollowerBroker ,
Concurrency : perPartitionConcurrency ,
} ,
} ,
} ) ; err != nil {
glog . V ( 0 ) . Infof ( "subscriber %s connected to partition %+v at %v: %v" , sub . ContentConfig . Topic , assigned . Partition , assigned . LeaderBroker , err )
}
glog . V ( 0 ) . Infof ( "subscriber %s connected to partition %+v at %v" , sub . ContentConfig . Topic , assigned . Partition , assigned . LeaderBroker )
if sub . OnCompletionFunc != nil {
defer sub . OnCompletionFunc ( )
}
type KeyedOffset struct {
Key [ ] byte
Offset int64
}
partitionOffsetChan := make ( chan KeyedOffset , 1024 )
defer func ( ) {
close ( partitionOffsetChan )
} ( )
executors := util . NewLimitedConcurrentExecutor ( int ( perPartitionConcurrency ) )
go func ( ) {
2024-05-21 23:42:04 +08:00
for {
select {
case <- stopCh :
2024-05-28 08:30:16 +08:00
subscribeClient . CloseSend ( )
return
2024-05-30 15:27:44 +08:00
case ack , ok := <- partitionOffsetChan :
if ! ok {
subscribeClient . CloseSend ( )
return
}
2024-05-21 23:42:04 +08:00
subscribeClient . SendMsg ( & mq_pb . SubscribeMessageRequest {
Message : & mq_pb . SubscribeMessageRequest_Ack {
Ack : & mq_pb . SubscribeMessageRequest_AckMessage {
Key : ack . Key ,
Sequence : ack . Offset ,
} ,
2024-05-21 03:32:12 +08:00
} ,
2024-05-21 23:42:04 +08:00
} )
}
2024-05-21 03:32:12 +08:00
}
} ( )
var lastErr error
for lastErr == nil {
// glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp , err := subscribeClient . Recv ( )
if err != nil {
return fmt . Errorf ( "subscribe recv: %v" , err )
}
if resp . Message == nil {
glog . V ( 0 ) . Infof ( "subscriber %s/%s received nil message" , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
continue
}
switch m := resp . Message . ( type ) {
case * mq_pb . SubscribeMessageResponse_Data :
2024-05-30 14:34:39 +08:00
if m . Data . Ctrl != nil {
2024-08-13 00:45:51 +08:00
glog . V ( 2 ) . Infof ( "subscriber %s received control from producer:%s isClose:%v" , sub . SubscriberConfig . ConsumerGroup , m . Data . Ctrl . PublisherName , m . Data . Ctrl . IsClose )
2024-05-30 14:34:39 +08:00
continue
}
2024-11-05 04:08:25 +08:00
if len ( m . Data . Key ) == 0 {
fmt . Printf ( "empty key %+v, type %v\n" , m , reflect . TypeOf ( m ) )
continue
}
2024-05-21 03:32:12 +08:00
executors . Execute ( func ( ) {
processErr := sub . OnEachMessageFunc ( m . Data . Key , m . Data . Value )
if processErr == nil {
partitionOffsetChan <- KeyedOffset {
Key : m . Data . Key ,
Offset : m . Data . TsNs ,
}
} else {
lastErr = processErr
}
} )
case * mq_pb . SubscribeMessageResponse_Ctrl :
// glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
if m . Ctrl . IsEndOfStream || m . Ctrl . IsEndOfTopic {
return io . EOF
}
}
}
return lastErr
} )
}