seaweedfs/weed/mq/client/pub_client/scheduler.go

253 lines
7.1 KiB
Go
Raw Normal View History

2024-01-27 06:09:57 +08:00
package pub_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
2024-01-27 06:09:57 +08:00
"sort"
"sync"
"time"
)
type EachPartitionError struct {
*mq_pb.BrokerPartitionAssignment
Err error
generation int
}
type EachPartitionPublishJob struct {
*mq_pb.BrokerPartitionAssignment
stopChan chan bool
wg sync.WaitGroup
generation int
inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
2024-01-27 06:09:57 +08:00
}
2024-01-29 04:06:58 +08:00
func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *sync.WaitGroup) error {
2024-01-27 06:09:57 +08:00
if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil {
2024-01-29 04:23:20 +08:00
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
2024-01-27 06:09:57 +08:00
}
2024-01-29 04:23:20 +08:00
log.Printf("start scheduler thread for topic %s", p.config.Topic)
2024-01-27 06:09:57 +08:00
generation := 0
var errChan chan EachPartitionError
for {
2024-01-29 04:23:20 +08:00
glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
2024-01-27 06:09:57 +08:00
if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil {
generation++
2024-01-29 04:06:58 +08:00
glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
2024-01-27 06:09:57 +08:00
if errChan == nil {
errChan = make(chan EachPartitionError, len(assignments))
}
p.onEachAssignments(generation, assignments, errChan)
} else {
2024-01-29 04:23:20 +08:00
glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
2024-01-27 06:09:57 +08:00
time.Sleep(5 * time.Second)
continue
}
2024-01-29 04:06:58 +08:00
if generation == 1 {
wg.Done()
}
2024-01-27 06:09:57 +08:00
// wait for any error to happen. If so, consume all remaining errors, and retry
for {
select {
case eachErr := <-errChan:
2024-01-29 04:23:20 +08:00
glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
2024-01-27 06:09:57 +08:00
if eachErr.generation < generation {
continue
}
break
}
}
}
}
func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
// TODO assuming this is not re-configured so the partitions are fixed.
sort.Slice(assignments, func(i, j int) bool {
return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
})
var jobs []*EachPartitionPublishJob
hasExistingJob := len(p.jobs) == len(assignments)
for i, assignment := range assignments {
if assignment.LeaderBroker == "" {
continue
}
if hasExistingJob {
var existingJob *EachPartitionPublishJob
existingJob = p.jobs[i]
if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
existingJob.generation = generation
jobs = append(jobs, existingJob)
continue
} else {
if existingJob.LeaderBroker != "" {
close(existingJob.stopChan)
existingJob.LeaderBroker = ""
existingJob.wg.Wait()
}
}
}
// start a go routine to publish to this partition
job := &EachPartitionPublishJob{
BrokerPartitionAssignment: assignment,
stopChan: make(chan bool, 1),
generation: generation,
inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024, true),
2024-01-27 06:09:57 +08:00
}
job.wg.Add(1)
go func(job *EachPartitionPublishJob) {
defer job.wg.Done()
if err := p.doPublishToPartition(job); err != nil {
errChan <- EachPartitionError{assignment, err, generation}
}
}(job)
jobs = append(jobs, job)
// TODO assuming this is not re-configured so the partitions are fixed.
// better just re-use the existing job
p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
2024-01-27 06:09:57 +08:00
}
p.jobs = jobs
}
func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
if err != nil {
return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
}
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
stream, err := brokerClient.PublishMessage(context.Background())
if err != nil {
return fmt.Errorf("create publish client: %v", err)
}
publishClient := &PublishClient{
SeaweedMessaging_PublishMessageClient: stream,
Broker: job.LeaderBroker,
}
if err = publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Init{
Init: &mq_pb.PublishMessageRequest_InitMessage{
2024-01-29 04:23:20 +08:00
Topic: p.config.Topic.ToPbTopic(),
Partition: job.Partition,
AckInterval: 128,
},
},
}); err != nil {
return fmt.Errorf("send init message: %v", err)
}
resp, err := stream.Recv()
if err != nil {
return fmt.Errorf("recv init response: %v", err)
}
if resp.Error != "" {
return fmt.Errorf("init response error: %v", resp.Error)
}
go func() {
for {
_, err := publishClient.Recv()
if err != nil {
e, ok := status.FromError(err)
if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
return
}
publishClient.Err = err
fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
return
}
}
}()
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
if err := publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Data{
Data: data,
},
}); err != nil {
return fmt.Errorf("send publish data: %v", err)
}
}
2024-01-27 06:09:57 +08:00
return nil
}
func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err error) {
if len(bootstrapBrokers) == 0 {
return fmt.Errorf("no bootstrap brokers")
}
var lastErr error
for _, brokerAddress := range bootstrapBrokers {
err = pb.WithBrokerGrpcClient(false,
brokerAddress,
p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
2024-01-29 04:23:20 +08:00
Topic: p.config.Topic.ToPbTopic(),
2024-01-27 06:09:57 +08:00
PartitionCount: p.config.CreateTopicPartitionCount,
})
return err
})
if err == nil {
return nil
} else {
lastErr = err
}
}
if lastErr != nil {
2024-01-29 04:23:20 +08:00
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
2024-01-27 06:09:57 +08:00
}
return nil
}
func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
if len(bootstrapBrokers) == 0 {
return nil, fmt.Errorf("no bootstrap brokers")
}
var lastErr error
for _, brokerAddress := range bootstrapBrokers {
err := pb.WithBrokerGrpcClient(false,
brokerAddress,
p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
lookupResp, err := client.LookupTopicBrokers(context.Background(),
&mq_pb.LookupTopicBrokersRequest{
2024-01-29 04:23:20 +08:00
Topic: p.config.Topic.ToPbTopic(),
2024-01-27 06:09:57 +08:00
})
2024-01-29 04:23:20 +08:00
glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
2024-01-27 06:09:57 +08:00
if err != nil {
return err
}
2024-01-29 04:06:58 +08:00
if len(lookupResp.BrokerPartitionAssignments) == 0 {
return fmt.Errorf("no broker partition assignments")
}
2024-01-27 06:09:57 +08:00
assignments = lookupResp.BrokerPartitionAssignments
return nil
})
if err == nil {
return assignments, nil
} else {
lastErr = err
}
}
2024-01-29 04:23:20 +08:00
return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
2024-01-27 06:09:57 +08:00
}