seaweedfs/weed/mq/pub_balancer/balance.go

74 lines
2.4 KiB
Go
Raw Normal View History

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-12 04:05:54 +08:00
package pub_balancer
import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"google.golang.org/grpc"
)
/*
* Assuming a topic has [x,y] number of partitions when publishing, and there are b number of brokers.
* and p is the number of partitions per topic.
* if the broker number b <= x, then p = x.
* if the broker number x < b < y, then x <= p <= b.
* if the broker number b >= y, x <= p <= y
Balance topic partitions to brokers
===================================
When the goal is to make sure that low traffic partitions can be merged, (and p >= x, and after last rebalance interval):
1. Calculate the average load(throughput) of partitions per topic.
2. If any two neighboring partitions have a load that is less than the average load, merge them.
3. If min(b, y) < p, then merge two neighboring partitions that have the least combined load.
When the goal is to make sure that high traffic partitions can be split, (and p < y and p < b, and after last rebalance interval):
1. Calculate the average number of partitions per broker.
2. If any partition has a load that is more than the average load, split it into two partitions.
When the goal is to make sure that each broker has the same number of partitions:
1. Calculate the average number of partitions per broker.
2. For the brokers that have more than the average number of partitions, move the partitions to the brokers that have less than the average number of partitions.
*/
type BalanceAction interface {
}
type BalanceActionMerge struct {
Before []topic.TopicPartition
After topic.TopicPartition
}
type BalanceActionSplit struct {
Before topic.TopicPartition
After []topic.TopicPartition
}
type BalanceActionMove struct {
TopicPartition topic.TopicPartition
SourceBroker string
TargetBroker string
}
type BalanceActionCreate struct {
TopicPartition topic.TopicPartition
TargetBroker string
}
// BalancePublishers check the stats of all brokers,
// and balance the publishers to the brokers.
2024-05-22 00:56:30 +08:00
func (balancer *PubBalancer) BalancePublishers() []BalanceAction {
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-12 04:05:54 +08:00
action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
return []BalanceAction{action}
}
2024-05-22 00:56:30 +08:00
func (balancer *PubBalancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) {
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-12 04:05:54 +08:00
for _, action := range actions {
switch action.(type) {
case *BalanceActionMove:
err = balancer.ExecuteBalanceActionMove(action.(*BalanceActionMove), grpcDialOption)
}
if err != nil {
return err
}
}
return nil
}