seaweedfs/weed/mq/client/pub_client/scheduler.go

182 lines
4.8 KiB
Go
Raw Normal View History

2024-01-27 06:09:57 +08:00
package pub_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sort"
"sync"
"time"
)
type EachPartitionError struct {
*mq_pb.BrokerPartitionAssignment
Err error
generation int
}
type EachPartitionPublishJob struct {
*mq_pb.BrokerPartitionAssignment
stopChan chan bool
wg sync.WaitGroup
generation int
}
func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error {
if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil {
return err
}
generation := 0
var errChan chan EachPartitionError
for {
glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation, p.namespace, p.topic)
if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil {
generation++
glog.V(0).Infof("start generation %d", generation)
if errChan == nil {
errChan = make(chan EachPartitionError, len(assignments))
}
p.onEachAssignments(generation, assignments, errChan)
} else {
glog.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
time.Sleep(5 * time.Second)
continue
}
// wait for any error to happen. If so, consume all remaining errors, and retry
for {
select {
case eachErr := <-errChan:
glog.Errorf("gen %d publish to topic %s/%s partition %v: %v", eachErr.generation, p.namespace, p.topic, eachErr.Partition, eachErr.Err)
if eachErr.generation < generation {
continue
}
break
}
}
}
}
func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
// TODO assuming this is not re-configured so the partitions are fixed.
sort.Slice(assignments, func(i, j int) bool {
return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
})
var jobs []*EachPartitionPublishJob
hasExistingJob := len(p.jobs) == len(assignments)
for i, assignment := range assignments {
if assignment.LeaderBroker == "" {
continue
}
if hasExistingJob {
var existingJob *EachPartitionPublishJob
existingJob = p.jobs[i]
if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
existingJob.generation = generation
jobs = append(jobs, existingJob)
continue
} else {
if existingJob.LeaderBroker != "" {
close(existingJob.stopChan)
existingJob.LeaderBroker = ""
existingJob.wg.Wait()
}
}
}
// start a go routine to publish to this partition
job := &EachPartitionPublishJob{
BrokerPartitionAssignment: assignment,
stopChan: make(chan bool, 1),
generation: generation,
}
job.wg.Add(1)
go func(job *EachPartitionPublishJob) {
defer job.wg.Done()
if err := p.doPublishToPartition(job); err != nil {
errChan <- EachPartitionError{assignment, err, generation}
}
}(job)
jobs = append(jobs, job)
}
p.jobs = jobs
}
func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
return nil
}
func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err error) {
if len(bootstrapBrokers) == 0 {
return fmt.Errorf("no bootstrap brokers")
}
var lastErr error
for _, brokerAddress := range bootstrapBrokers {
err = pb.WithBrokerGrpcClient(false,
brokerAddress,
p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
Topic: &mq_pb.Topic{
Namespace: p.namespace,
Name: p.topic,
},
PartitionCount: p.config.CreateTopicPartitionCount,
})
return err
})
if err == nil {
return nil
} else {
lastErr = err
}
}
if lastErr != nil {
return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
}
return nil
}
func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
if len(bootstrapBrokers) == 0 {
return nil, fmt.Errorf("no bootstrap brokers")
}
var lastErr error
for _, brokerAddress := range bootstrapBrokers {
err := pb.WithBrokerGrpcClient(false,
brokerAddress,
p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
lookupResp, err := client.LookupTopicBrokers(context.Background(),
&mq_pb.LookupTopicBrokersRequest{
Topic: &mq_pb.Topic{
Namespace: p.namespace,
Name: p.topic,
},
})
glog.V(0).Infof("lookup topic %s/%s: %v", p.namespace, p.topic, lookupResp)
if err != nil {
return err
}
assignments = lookupResp.BrokerPartitionAssignments
return nil
})
if err == nil {
return assignments, nil
} else {
lastErr = err
}
}
return nil, fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, lastErr)
}