seaweedfs/weed/mq/sub_coordinator/consumer_group.go
2024-06-06 19:44:19 -07:00

112 lines
3.9 KiB
Go

package sub_coordinator
import (
"fmt"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"time"
)
type ConsumerGroup struct {
topic topic.Topic
// map a consumer group instance id to a consumer group instance
ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
Market *Market
reBalanceTimer *time.Timer
filerClientAccessor *filer_client.FilerClientAccessor
stopCh chan struct{}
}
func NewConsumerGroup(t *mq_pb.Topic, reblanceSeconds int32, filerClientAccessor *filer_client.FilerClientAccessor) *ConsumerGroup {
cg := &ConsumerGroup{
topic: topic.FromPbTopic(t),
ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
filerClientAccessor: filerClientAccessor,
stopCh: make(chan struct{}),
}
if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
var partitions []topic.Partition
for _, assignment := range conf.BrokerPartitionAssignments {
partitions = append(partitions, topic.FromPbPartition(assignment.Partition))
}
cg.Market = NewMarket(partitions, time.Duration(reblanceSeconds)*time.Second)
} else {
glog.V(0).Infof("fail to read topic conf from filer: %v", err)
return nil
}
go func() {
for {
select {
case adjustment := <-cg.Market.AdjustmentChan:
cgi, found := cg.ConsumerGroupInstances.Get(string(adjustment.consumer))
if !found {
glog.V(0).Infof("consumer group instance %s not found", adjustment.consumer)
continue
}
if adjustment.isAssign {
if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
for _, assignment := range conf.BrokerPartitionAssignments {
if adjustment.partition.Equals(topic.FromPbPartition(assignment.Partition)) {
cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
PartitionAssignment: &mq_pb.BrokerPartitionAssignment{
Partition: adjustment.partition.ToPbPartition(),
LeaderBroker: assignment.LeaderBroker,
FollowerBroker: assignment.FollowerBroker,
},
},
},
}
glog.V(0).Infof("send assignment %v to %s", adjustment.partition, adjustment.consumer)
break
}
}
}
} else {
cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
Message: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment_{
UnAssignment: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment{
Partition: adjustment.partition.ToPbPartition(),
},
},
}
glog.V(0).Infof("send unassignment %v to %s", adjustment.partition, adjustment.consumer)
}
case <-cg.stopCh:
return
}
}
}()
return cg
}
func (cg *ConsumerGroup) AckAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage) {
fmt.Printf("ack assignment %v\n", assignment)
cg.Market.ConfirmAdjustment(&Adjustment{
consumer: cgi.InstanceId,
partition: topic.FromPbPartition(assignment.Partition),
isAssign: true,
})
}
func (cg *ConsumerGroup) AckUnAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) {
fmt.Printf("ack unassignment %v\n", assignment)
cg.Market.ConfirmAdjustment(&Adjustment{
consumer: cgi.InstanceId,
partition: topic.FromPbPartition(assignment.Partition),
isAssign: false,
})
}
func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) {
}
func (cg *ConsumerGroup) Shutdown() {
close(cg.stopCh)
}