package sub_coordinator

import (
	"fmt"
	cmap "github.com/orcaman/concurrent-map/v2"
	"github.com/seaweedfs/seaweedfs/weed/filer_client"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"time"
)

type ConsumerGroup struct {
	topic topic.Topic
	// map a consumer group instance id to a consumer group instance
	ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
	Market                 *Market
	reBalanceTimer         *time.Timer
	filerClientAccessor    *filer_client.FilerClientAccessor
	stopCh                 chan struct{}
}

func NewConsumerGroup(t *mq_pb.Topic, reblanceSeconds int32, filerClientAccessor *filer_client.FilerClientAccessor) *ConsumerGroup {
	cg := &ConsumerGroup{
		topic:                  topic.FromPbTopic(t),
		ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
		filerClientAccessor:    filerClientAccessor,
		stopCh:                 make(chan struct{}),
	}
	if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
		var partitions []topic.Partition
		for _, assignment := range conf.BrokerPartitionAssignments {
			partitions = append(partitions, topic.FromPbPartition(assignment.Partition))
		}
		cg.Market = NewMarket(partitions, time.Duration(reblanceSeconds)*time.Second)
	} else {
		glog.V(0).Infof("fail to read topic conf from filer: %v", err)
		return nil
	}

	go func() {
		for {
			select {
			case adjustment := <-cg.Market.AdjustmentChan:
				cgi, found := cg.ConsumerGroupInstances.Get(string(adjustment.consumer))
				if !found {
					glog.V(0).Infof("consumer group instance %s not found", adjustment.consumer)
					continue
				}
				if adjustment.isAssign {
					if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
						for _, assignment := range conf.BrokerPartitionAssignments {
							if adjustment.partition.Equals(topic.FromPbPartition(assignment.Partition)) {
								cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
									Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
										Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
											PartitionAssignment: &mq_pb.BrokerPartitionAssignment{
												Partition:      adjustment.partition.ToPbPartition(),
												LeaderBroker:   assignment.LeaderBroker,
												FollowerBroker: assignment.FollowerBroker,
											},
										},
									},
								}
								glog.V(0).Infof("send assignment %v to %s", adjustment.partition, adjustment.consumer)
								break
							}
						}
					}
				} else {
					cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
						Message: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment_{
							UnAssignment: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment{
								Partition: adjustment.partition.ToPbPartition(),
							},
						},
					}
					glog.V(0).Infof("send unassignment %v to %s", adjustment.partition, adjustment.consumer)
				}
			case <-cg.stopCh:
				return
			}
		}
	}()

	return cg
}

func (cg *ConsumerGroup) AckAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage) {
	fmt.Printf("ack assignment %v\n", assignment)
	cg.Market.ConfirmAdjustment(&Adjustment{
		consumer:  cgi.InstanceId,
		partition: topic.FromPbPartition(assignment.Partition),
		isAssign:  true,
	})
}
func (cg *ConsumerGroup) AckUnAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) {
	fmt.Printf("ack unassignment %v\n", assignment)
	cg.Market.ConfirmAdjustment(&Adjustment{
		consumer:  cgi.InstanceId,
		partition: topic.FromPbPartition(assignment.Partition),
		isAssign:  false,
	})
}

func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) {
}

func (cg *ConsumerGroup) Shutdown() {
	close(cg.stopCh)
}