diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index 882e7ddf9..020f459b6 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -84,12 +84,12 @@ func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBroke } // collect current consumer group instance ids - var consumerInstanceIds []string + var consumerInstances []*ConsumerGroupInstance for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() { - consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId) + consumerInstances = append(consumerInstances, consumerGroupInstance) } - cg.mapping.BalanceToConsumerInstanceIds(partitionSlotToBrokerList, consumerInstanceIds) + cg.mapping.BalanceToConsumerInstances(partitionSlotToBrokerList, consumerInstances) // convert cg.mapping currentMapping to map of consumer group instance id to partition slots consumerInstanceToPartitionSlots := make(map[string][]*PartitionSlotToConsumerInstance) diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go index c7f104af1..c5c4f6866 100644 --- a/weed/mq/sub_coordinator/partition_consumer_mapping.go +++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go @@ -23,8 +23,8 @@ func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping { // 2. allow one consumer instance to be down unexpectedly // without affecting the processing power utilization -func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstanceIds []string) { - if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstanceIds) == 0 { +func (pcm *PartitionConsumerMapping) BalanceToConsumerInstances(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstances []*ConsumerGroupInstance) { + if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstances) == 0 { return } newVersion := time.Now().UnixNano() @@ -35,7 +35,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotT } else { prevMapping = nil } - newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstanceIds, prevMapping) + newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstances, prevMapping) if pcm.currentMapping != nil { pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping) if len(pcm.prevMappings) > 10 { @@ -45,7 +45,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotT pcm.currentMapping = newMapping } -func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) { +func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstances []*ConsumerGroupInstance, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) { // collect previous consumer instance ids prevConsumerInstanceIds := make(map[string]struct{}) if prevMapping != nil { @@ -57,8 +57,8 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI } // collect current consumer instance ids currConsumerInstanceIds := make(map[string]struct{}) - for _, consumerInstanceId := range consumerInstanceIds { - currConsumerInstanceIds[consumerInstanceId] = struct{}{} + for _, consumerInstance := range consumerInstances { + currConsumerInstanceIds[consumerInstance.InstanceId] = struct{}{} } // check deleted consumer instances @@ -106,25 +106,25 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI } } // average number of partitions that are assigned to each consumer instance - averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstanceIds)) + averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstances)) // assign unassigned partition slots to consumer instances that is underloaded consumerInstanceIdsIndex := 0 for _, newPartitionSlot := range newPartitionSlots { if newPartitionSlot.AssignedInstanceId == "" { - for avoidDeadLoop := len(consumerInstanceIds); avoidDeadLoop > 0; avoidDeadLoop-- { - consumerInstanceId := consumerInstanceIds[consumerInstanceIdsIndex] - if float32(consumerInstancePartitionCount[consumerInstanceId]) < averageConsumerInstanceLoad { - newPartitionSlot.AssignedInstanceId = consumerInstanceId - consumerInstancePartitionCount[consumerInstanceId]++ + for avoidDeadLoop := len(consumerInstances); avoidDeadLoop > 0; avoidDeadLoop-- { + consumerInstance := consumerInstances[consumerInstanceIdsIndex] + if float32(consumerInstancePartitionCount[consumerInstance.InstanceId]) < averageConsumerInstanceLoad { + newPartitionSlot.AssignedInstanceId = consumerInstance.InstanceId + consumerInstancePartitionCount[consumerInstance.InstanceId]++ consumerInstanceIdsIndex++ - if consumerInstanceIdsIndex >= len(consumerInstanceIds) { + if consumerInstanceIdsIndex >= len(consumerInstances) { consumerInstanceIdsIndex = 0 } break } else { consumerInstanceIdsIndex++ - if consumerInstanceIdsIndex >= len(consumerInstanceIds) { + if consumerInstanceIdsIndex >= len(consumerInstances) { consumerInstanceIdsIndex = 0 } } diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go index 9a9abe011..7dcfc6f9b 100644 --- a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go +++ b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go @@ -9,7 +9,7 @@ import ( func Test_doBalanceSticky(t *testing.T) { type args struct { partitions []*pub_balancer.PartitionSlotToBroker - consumerInstanceIds []string + consumerInstanceIds []*ConsumerGroupInstance prevMapping *PartitionSlotToConsumerInstanceList } tests := []struct { @@ -26,7 +26,12 @@ func Test_doBalanceSticky(t *testing.T) { RangeStop: 100, }, }, - consumerInstanceIds: []string{"consumer-instance-1"}, + consumerInstanceIds: []*ConsumerGroupInstance{ + { + InstanceId: "consumer-instance-1", + MaxPartitionCount: 1, + }, + }, prevMapping: nil, }, wantPartitionSlots: []*PartitionSlotToConsumerInstance{ @@ -46,7 +51,16 @@ func Test_doBalanceSticky(t *testing.T) { RangeStop: 100, }, }, - consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"}, + consumerInstanceIds: []*ConsumerGroupInstance{ + { + InstanceId: "consumer-instance-1", + MaxPartitionCount: 1, + }, + { + InstanceId: "consumer-instance-2", + MaxPartitionCount: 1, + }, + }, prevMapping: nil, }, wantPartitionSlots: []*PartitionSlotToConsumerInstance{ @@ -70,7 +84,12 @@ func Test_doBalanceSticky(t *testing.T) { RangeStop: 100, }, }, - consumerInstanceIds: []string{"consumer-instance-1"}, + consumerInstanceIds: []*ConsumerGroupInstance{ + { + InstanceId: "consumer-instance-1", + MaxPartitionCount: 1, + }, + }, prevMapping: nil, }, wantPartitionSlots: []*PartitionSlotToConsumerInstance{ @@ -99,7 +118,16 @@ func Test_doBalanceSticky(t *testing.T) { RangeStop: 100, }, }, - consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"}, + consumerInstanceIds: []*ConsumerGroupInstance{ + { + InstanceId: "consumer-instance-1", + MaxPartitionCount: 1, + }, + { + InstanceId: "consumer-instance-2", + MaxPartitionCount: 1, + }, + }, prevMapping: nil, }, wantPartitionSlots: []*PartitionSlotToConsumerInstance{ @@ -128,7 +156,16 @@ func Test_doBalanceSticky(t *testing.T) { RangeStop: 100, }, }, - consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"}, + consumerInstanceIds: []*ConsumerGroupInstance{ + { + InstanceId: "consumer-instance-1", + MaxPartitionCount: 1, + }, + { + InstanceId: "consumer-instance-2", + MaxPartitionCount: 1, + }, + }, prevMapping: &PartitionSlotToConsumerInstanceList{ PartitionSlots: []*PartitionSlotToConsumerInstance{ { @@ -170,7 +207,20 @@ func Test_doBalanceSticky(t *testing.T) { RangeStop: 100, }, }, - consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2", "consumer-instance-3"}, + consumerInstanceIds: []*ConsumerGroupInstance{ + { + InstanceId: "consumer-instance-1", + MaxPartitionCount: 1, + }, + { + InstanceId: "consumer-instance-2", + MaxPartitionCount: 1, + }, + { + InstanceId: "consumer-instance-3", + MaxPartitionCount: 1, + }, + }, prevMapping: &PartitionSlotToConsumerInstanceList{ PartitionSlots: []*PartitionSlotToConsumerInstance{ { @@ -216,7 +266,16 @@ func Test_doBalanceSticky(t *testing.T) { RangeStop: 150, }, }, - consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"}, + consumerInstanceIds: []*ConsumerGroupInstance{ + { + InstanceId: "consumer-instance-1", + MaxPartitionCount: 1, + }, + { + InstanceId: "consumer-instance-2", + MaxPartitionCount: 1, + }, + }, prevMapping: &PartitionSlotToConsumerInstanceList{ PartitionSlots: []*PartitionSlotToConsumerInstance{ { @@ -267,7 +326,20 @@ func Test_doBalanceSticky(t *testing.T) { RangeStop: 150, }, }, - consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2", "consumer-instance-3"}, + consumerInstanceIds: []*ConsumerGroupInstance{ + { + InstanceId: "consumer-instance-1", + MaxPartitionCount: 1, + }, + { + InstanceId: "consumer-instance-2", + MaxPartitionCount: 1, + }, + { + InstanceId: "consumer-instance-3", + MaxPartitionCount: 1, + }, + }, prevMapping: &PartitionSlotToConsumerInstanceList{ PartitionSlots: []*PartitionSlotToConsumerInstance{ {