This commit is contained in:
chrislu 2024-01-22 09:00:22 -08:00
parent 47924afa1c
commit 861ad732ca

View File

@ -54,11 +54,21 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
}
for _, bpa := range resp.BrokerPartitionAssignments {
fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker)
if assignErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments); assignErr != nil {
return nil, assignErr
}
glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
return resp, err
}
func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) error {
for _, bpa := range assignments {
fmt.Printf("create topic %s partition %+v on %s\n", t, bpa.Partition, bpa.LeaderBroker)
if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
_, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
Topic: request.Topic,
Topic: t,
BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
{
Partition: bpa.Partition,
@ -68,7 +78,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
IsDraining: false,
})
if doCreateErr != nil {
return fmt.Errorf("do create topic %s on %s: %v", request.Topic, bpa.LeaderBroker, doCreateErr)
return fmt.Errorf("do create topic %s on %s: %v", t, bpa.LeaderBroker, doCreateErr)
}
brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
if !found {
@ -77,16 +87,13 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
}
}
brokerStats.RegisterAssignment(request.Topic, bpa.Partition)
brokerStats.RegisterAssignment(t, bpa.Partition)
return nil
}); doCreateErr != nil {
return nil, doCreateErr
return doCreateErr
}
}
glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
return resp, err
return nil
}
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment