seaweedfs/weed/mq/client/sub_client/on_each_partition.go
Chris Lu dc784bf217
merge current message queue code changes (#6201)
* listing files to convert to parquet

* write parquet files

* save logs into parquet files

* pass by value

* compact logs into parquet format

* can skip existing files

* refactor

* refactor

* fix compilation

* when no partition found

* refactor

* add untested parquet file read

* rename package

* refactor

* rename files

* remove unused

* add merged log read func

* parquet wants to know the file size

* rewind by time

* pass in stop ts

* add stop ts

* adjust log

* minor

* adjust log

* skip .parquet files when reading message logs

* skip non message files

* Update subscriber_record.go

* send messages

* skip message data with only ts

* skip non log files

* update parquet-go package

* ensure a valid record type

* add new field to a record type

* Update read_parquet_to_log.go

* fix parquet file name generation

* separating reading parquet and logs

* add key field

* add skipped logs

* use in memory cache

* refactor

* refactor

* refactor

* refactor, and change compact log

* refactor

* rename

* refactor

* fix format

* prefix v to version directory
2024-11-04 12:08:25 -08:00

138 lines
4.2 KiB
Go

package sub_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"reflect"
)
func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error {
// connect to the partition broker
return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
subscribeClient, err := client.SubscribeMessage(context.Background())
if err != nil {
return fmt.Errorf("create subscribe client: %v", err)
}
perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency
if perPartitionConcurrency <= 0 {
perPartitionConcurrency = 1
}
var stopTsNs int64
if !sub.ContentConfig.StopTime.IsZero() {
stopTsNs = sub.ContentConfig.StopTime.UnixNano()
}
if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(),
PartitionOffset: &mq_pb.PartitionOffset{
Partition: assigned.Partition,
StartTsNs: sub.ContentConfig.StartTime.UnixNano(),
StopTsNs: stopTsNs,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
},
Filter: sub.ContentConfig.Filter,
FollowerBroker: assigned.FollowerBroker,
Concurrency: perPartitionConcurrency,
},
},
}); err != nil {
glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
}
glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
}
type KeyedOffset struct {
Key []byte
Offset int64
}
partitionOffsetChan := make(chan KeyedOffset, 1024)
defer func() {
close(partitionOffsetChan)
}()
executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))
go func() {
for {
select {
case <-stopCh:
subscribeClient.CloseSend()
return
case ack, ok := <-partitionOffsetChan:
if !ok {
subscribeClient.CloseSend()
return
}
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
Key: ack.Key,
Sequence: ack.Offset,
},
},
})
}
}
}()
var lastErr error
for lastErr == nil {
// glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv()
if err != nil {
return fmt.Errorf("subscribe recv: %v", err)
}
if resp.Message == nil {
glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
continue
}
switch m := resp.Message.(type) {
case *mq_pb.SubscribeMessageResponse_Data:
if m.Data.Ctrl != nil {
glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
continue
}
if len(m.Data.Key) == 0 {
fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
continue
}
executors.Execute(func() {
processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
if processErr == nil {
partitionOffsetChan <- KeyedOffset{
Key: m.Data.Key,
Offset: m.Data.TsNs,
}
} else {
lastErr = processErr
}
})
case *mq_pb.SubscribeMessageResponse_Ctrl:
// glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return io.EOF
}
}
}
return lastErr
})
}