mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-07 11:35:40 +08:00
128 lines
4.0 KiB
Go
128 lines
4.0 KiB
Go
|
package pub_balancer
|
||
|
|
||
|
import (
|
||
|
cmap "github.com/orcaman/concurrent-map/v2"
|
||
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||
|
"math/rand"
|
||
|
"modernc.org/mathutil"
|
||
|
"sort"
|
||
|
)
|
||
|
|
||
|
func (balancer *Balancer) RepairTopics() []BalanceAction {
|
||
|
action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
|
||
|
return []BalanceAction{action}
|
||
|
}
|
||
|
|
||
|
type TopicPartitionInfo struct {
|
||
|
Leader string
|
||
|
Followers []string
|
||
|
}
|
||
|
|
||
|
// RepairMissingTopicPartitions check the stats of all brokers,
|
||
|
// and repair the missing topic partitions on the brokers.
|
||
|
func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats]) (actions []BalanceAction) {
|
||
|
|
||
|
// find all topic partitions
|
||
|
topicToTopicPartitions := make(map[topic.Topic]map[topic.Partition]*TopicPartitionInfo)
|
||
|
for brokerStatsItem := range brokers.IterBuffered() {
|
||
|
broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
|
||
|
for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
|
||
|
topicPartitionStat := topicPartitionStatsItem.Val
|
||
|
topicPartitionToInfo, found := topicToTopicPartitions[topicPartitionStat.Topic]
|
||
|
if !found {
|
||
|
topicPartitionToInfo = make(map[topic.Partition]*TopicPartitionInfo)
|
||
|
topicToTopicPartitions[topicPartitionStat.Topic] = topicPartitionToInfo
|
||
|
}
|
||
|
tpi, found := topicPartitionToInfo[topicPartitionStat.Partition]
|
||
|
if !found {
|
||
|
tpi = &TopicPartitionInfo{}
|
||
|
topicPartitionToInfo[topicPartitionStat.Partition] = tpi
|
||
|
}
|
||
|
if topicPartitionStat.IsLeader {
|
||
|
tpi.Leader = broker
|
||
|
} else {
|
||
|
tpi.Followers = append(tpi.Followers, broker)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// collect all brokers as candidates
|
||
|
candidates := make([]string, 0, brokers.Count())
|
||
|
for brokerStatsItem := range brokers.IterBuffered() {
|
||
|
candidates = append(candidates, brokerStatsItem.Key)
|
||
|
}
|
||
|
|
||
|
// find the missing topic partitions
|
||
|
for t, topicPartitionToInfo := range topicToTopicPartitions {
|
||
|
missingPartitions := EachTopicRepairMissingTopicPartitions(t, topicPartitionToInfo)
|
||
|
for _, partition := range missingPartitions {
|
||
|
actions = append(actions, BalanceActionCreate{
|
||
|
TopicPartition: topic.TopicPartition{
|
||
|
Topic: t,
|
||
|
Partition: partition,
|
||
|
},
|
||
|
TargetBroker: candidates[rand.Intn(len(candidates))],
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return actions
|
||
|
}
|
||
|
|
||
|
func EachTopicRepairMissingTopicPartitions(t topic.Topic, info map[topic.Partition]*TopicPartitionInfo) (missingPartitions []topic.Partition) {
|
||
|
|
||
|
// find the missing topic partitions
|
||
|
var partitions []topic.Partition
|
||
|
for partition := range info {
|
||
|
partitions = append(partitions, partition)
|
||
|
}
|
||
|
return findMissingPartitions(partitions, MaxPartitionCount)
|
||
|
}
|
||
|
|
||
|
// findMissingPartitions find the missing partitions
|
||
|
func findMissingPartitions(partitions []topic.Partition, ringSize int32) (missingPartitions []topic.Partition) {
|
||
|
// sort the partitions by range start
|
||
|
sort.Slice(partitions, func(i, j int) bool {
|
||
|
return partitions[i].RangeStart < partitions[j].RangeStart
|
||
|
})
|
||
|
|
||
|
// calculate the average partition size
|
||
|
var covered int32
|
||
|
for _, partition := range partitions {
|
||
|
covered += partition.RangeStop - partition.RangeStart
|
||
|
}
|
||
|
averagePartitionSize := covered / int32(len(partitions))
|
||
|
|
||
|
// find the missing partitions
|
||
|
var coveredWatermark int32
|
||
|
i := 0
|
||
|
for i < len(partitions) {
|
||
|
partition := partitions[i]
|
||
|
if partition.RangeStart > coveredWatermark {
|
||
|
upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, partition.RangeStart)
|
||
|
missingPartitions = append(missingPartitions, topic.Partition{
|
||
|
RangeStart: coveredWatermark,
|
||
|
RangeStop: upperBound,
|
||
|
RingSize: ringSize,
|
||
|
})
|
||
|
coveredWatermark = upperBound
|
||
|
if coveredWatermark == partition.RangeStop {
|
||
|
i++
|
||
|
}
|
||
|
} else {
|
||
|
coveredWatermark = partition.RangeStop
|
||
|
i++
|
||
|
}
|
||
|
}
|
||
|
for coveredWatermark < ringSize {
|
||
|
upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, ringSize)
|
||
|
missingPartitions = append(missingPartitions, topic.Partition{
|
||
|
RangeStart: coveredWatermark,
|
||
|
RangeStop: upperBound,
|
||
|
RingSize: ringSize,
|
||
|
})
|
||
|
coveredWatermark = upperBound
|
||
|
}
|
||
|
return missingPartitions
|
||
|
}
|