package balancer import ( "fmt" cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) const ( MaxPartitionCount = 1024 ) type Balancer struct { Brokers cmap.ConcurrentMap[string, *BrokerStats] } func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) ([]*mq_pb.BrokerPartitionAssignment, error) { // TODO lock the topic // find the topic partitions on the filer // if the topic is not found // if the request is_for_publish // create the topic // if the request is_for_subscribe // return error not found // t := topic.FromPbTopic(request.Topic) return []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:17777", FollowerBrokers: []string{"localhost:17777"}, Partition: &mq_pb.Partition{ RingSize: MaxPartitionCount, RangeStart: 0, RangeStop: MaxPartitionCount, }, }, }, nil } type BrokerStats struct { TopicPartitionCount int32 ConsumerCount int32 CpuUsagePercent int32 Stats cmap.ConcurrentMap[string, *TopicPartitionStats] } func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { bs.TopicPartitionCount = int32(len(stats.Stats)) bs.CpuUsagePercent = stats.CpuUsagePercent var consumerCount int32 currentTopicPartitions := bs.Stats.Items() for _, topicPartitionStats := range stats.Stats { tps := &TopicPartitionStats{ TopicPartition: TopicPartition{ Namespace: topicPartitionStats.Topic.Namespace, Topic: topicPartitionStats.Topic.Name, RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, }, ConsumerCount: topicPartitionStats.ConsumerCount, } consumerCount += topicPartitionStats.ConsumerCount key := tps.TopicPartition.String() bs.Stats.Set(key, tps) delete(currentTopicPartitions, key) } // remove the topic partitions that are not in the stats for key := range currentTopicPartitions { bs.Stats.Remove(key) } bs.ConsumerCount = consumerCount } type TopicPartition struct { Namespace string Topic string RangeStart int32 RangeStop int32 } type TopicPartitionStats struct { TopicPartition ConsumerCount int32 } func NewBalancer() *Balancer { return &Balancer{ Brokers: cmap.New[*BrokerStats](), } } func NewBrokerStats() *BrokerStats { return &BrokerStats{ Stats: cmap.New[*TopicPartitionStats](), } } func (tp *TopicPartition) String() string { return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop) }