mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-19 06:53:32 +08:00
refactor
This commit is contained in:
parent
541140f735
commit
2828ccbb30
@ -55,16 +55,12 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
|
|||||||
var p topic.Partition
|
var p topic.Partition
|
||||||
if initMessage != nil {
|
if initMessage != nil {
|
||||||
t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
|
t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
|
||||||
localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
|
localTopicPartition, err = b.loadLocalTopicPartition(t, p)
|
||||||
if localTopicPartition == nil {
|
|
||||||
localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, p)
|
|
||||||
// if not created, return error
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error = fmt.Sprintf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err)
|
response.Error = fmt.Sprintf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err)
|
||||||
glog.Errorf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err)
|
glog.Errorf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err)
|
||||||
return stream.Send(response)
|
return stream.Send(response)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
ackInterval = int(initMessage.AckInterval)
|
ackInterval = int(initMessage.AckInterval)
|
||||||
stream.Send(response)
|
stream.Send(response)
|
||||||
} else {
|
} else {
|
||||||
@ -148,6 +144,14 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *MessageQueueBroker) loadLocalTopicPartition(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
|
||||||
|
localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
|
||||||
|
if localTopicPartition == nil {
|
||||||
|
localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, p)
|
||||||
|
}
|
||||||
|
return localTopicPartition, err
|
||||||
|
}
|
||||||
|
|
||||||
func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
|
func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
|
||||||
self := b.option.BrokerAddress()
|
self := b.option.BrokerAddress()
|
||||||
|
|
||||||
|
@ -21,12 +21,12 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
|
|||||||
initMessage := req.GetInit()
|
initMessage := req.GetInit()
|
||||||
var brokerStats *pub_balancer.BrokerStats
|
var brokerStats *pub_balancer.BrokerStats
|
||||||
if initMessage != nil {
|
if initMessage != nil {
|
||||||
brokerStats = b.Balancer.OnBrokerConnected(initMessage.Broker)
|
brokerStats = b.Balancer.AddBroker(initMessage.Broker)
|
||||||
} else {
|
} else {
|
||||||
return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
|
return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
b.Balancer.OnBrokerDisconnected(initMessage.Broker, brokerStats)
|
b.Balancer.RemoveBroker(initMessage.Broker, brokerStats)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// process stats message
|
// process stats message
|
||||||
|
@ -67,6 +67,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
|
|||||||
}
|
}
|
||||||
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
|
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
|
||||||
pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange
|
pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange
|
||||||
|
pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnAddBroker
|
||||||
|
pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnRemoveBroker
|
||||||
|
|
||||||
go mqBroker.MasterClient.KeepConnectedToMaster()
|
go mqBroker.MasterClient.KeepConnectedToMaster()
|
||||||
|
|
||||||
|
@ -33,6 +33,8 @@ type Balancer struct {
|
|||||||
// Collected from all brokers when they connect to the broker leader
|
// Collected from all brokers when they connect to the broker leader
|
||||||
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
|
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
|
||||||
OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
|
OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
|
||||||
|
OnAddBroker func(broker string, brokerStats *BrokerStats)
|
||||||
|
OnRemoveBroker func(broker string, brokerStats *BrokerStats)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBalancer() *Balancer {
|
func NewBalancer() *Balancer {
|
||||||
@ -42,7 +44,7 @@ func NewBalancer() *Balancer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (balancer *Balancer) OnBrokerConnected(broker string) (brokerStats *BrokerStats) {
|
func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) {
|
||||||
var found bool
|
var found bool
|
||||||
brokerStats, found = balancer.Brokers.Get(broker)
|
brokerStats, found = balancer.Brokers.Get(broker)
|
||||||
if !found {
|
if !found {
|
||||||
@ -51,10 +53,11 @@ func (balancer *Balancer) OnBrokerConnected(broker string) (brokerStats *BrokerS
|
|||||||
brokerStats, _ = balancer.Brokers.Get(broker)
|
brokerStats, _ = balancer.Brokers.Get(broker)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
balancer.OnAddBroker(broker, brokerStats)
|
||||||
return brokerStats
|
return brokerStats
|
||||||
}
|
}
|
||||||
|
|
||||||
func (balancer *Balancer) OnBrokerDisconnected(broker string, stats *BrokerStats) {
|
func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) {
|
||||||
balancer.Brokers.Remove(broker)
|
balancer.Brokers.Remove(broker)
|
||||||
|
|
||||||
// update TopicToBrokers
|
// update TopicToBrokers
|
||||||
@ -65,6 +68,7 @@ func (balancer *Balancer) OnBrokerDisconnected(broker string, stats *BrokerStats
|
|||||||
}
|
}
|
||||||
partitionSlotToBrokerList.RemoveBroker(broker)
|
partitionSlotToBrokerList.RemoveBroker(broker)
|
||||||
}
|
}
|
||||||
|
balancer.OnRemoveBroker(broker, stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) {
|
func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) {
|
||||||
|
@ -99,3 +99,11 @@ func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb
|
|||||||
cg.OnPartitionListChange(assignments)
|
cg.OnPartitionListChange(assignments)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) OnAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) OnRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
|
||||||
|
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user