package broker

import (
	"context"
	"fmt"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
	"time"
)

func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {

	t := topic.FromPbTopic(req.GetInit().Topic)
	partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
	localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition)
	if localTopicPartition == nil {
		stream.Send(&mq_pb.SubscribeMessageResponse{
			Message: &mq_pb.SubscribeMessageResponse_Ctrl{
				Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{
					Error: "not initialized",
				},
			},
		})
		return nil
	}

	clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
	localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
	glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
	isConnected := true
	sleepIntervalCount := 0
	defer func() {
		isConnected = false
		localTopicPartition.Subscribers.RemoveSubscriber(clientName)
		glog.V(0).Infof("Subscriber %s on %v %v disconnected", clientName, t, partition)
	}()

	ctx := stream.Context()
	var startPosition log_buffer.MessagePosition
	var inMemoryOnly bool
	if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil {
		offset := req.GetInit().GetPartitionOffset()
		if offset.StartTsNs != 0 {
			startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
		}
		if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
			startPosition = log_buffer.NewMessagePosition(1, -2)
		} else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
			startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -2)
		} else if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY {
			inMemoryOnly = true
			for !localTopicPartition.HasData() {
				time.Sleep(337 * time.Millisecond)
			}
			memPosition := localTopicPartition.GetEarliestInMemoryMessagePosition()
			if startPosition.Before(memPosition.Time) {
				startPosition = memPosition
			}
		}
	}

	localTopicPartition.Subscribe(clientName, startPosition, inMemoryOnly, func() bool {
		if !isConnected {
			return false
		}
		sleepIntervalCount++
		if sleepIntervalCount > 10 {
			sleepIntervalCount = 10
		}
		time.Sleep(time.Duration(sleepIntervalCount) * 337 * time.Millisecond)

		// Check if the client has disconnected by monitoring the context
		select {
		case <-ctx.Done():
			err := ctx.Err()
			if err == context.Canceled {
				// Client disconnected
				return false
			}
			glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
			return false
		default:
			// Continue processing the request
		}

		return true
	}, func(logEntry *filer_pb.LogEntry) error {
		// reset the sleep interval count
		sleepIntervalCount = 0

		value := logEntry.GetData()
		if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
			Data: &mq_pb.DataMessage{
				Key:   []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)),
				Value: value,
				TsNs:  logEntry.TsNs,
			},
		}}); err != nil {
			glog.Errorf("Error sending setup response: %v", err)
			return err
		}
		return nil
	})

	return nil
}