diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index e7749f94b..fa75f87fe 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -108,15 +108,15 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s Namespace: sub.ContentConfig.Namespace, Name: sub.ContentConfig.Topic, }, - Partition: &mq_pb.Partition{ - RingSize: partition.RingSize, - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, + PartitionOffset: &mq_pb.PartitionOffset{ + Partition: &mq_pb.Partition{ + RingSize: partition.RingSize, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + }, + TsNs: sub.alreadyProcessedTsNs, }, Filter: sub.ContentConfig.Filter, - Offset: &mq_pb.SubscribeMessageRequest_InitMessage_StartTimestampNs{ - StartTimestampNs: sub.alreadyProcessedTsNs, - }, }, }, }) @@ -148,6 +148,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s return fmt.Errorf("subscribe recv: %v", err) } if resp.Message == nil { + glog.V(0).Infof("subscriber %s/%s/%s received nil message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) continue } switch m := resp.Message.(type) {