mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-12-12 00:19:14 +08:00
295 lines
8.5 KiB
Go
295 lines
8.5 KiB
Go
package pub_client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"log"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
type EachPartitionError struct {
|
|
*mq_pb.BrokerPartitionAssignment
|
|
Err error
|
|
generation int
|
|
}
|
|
|
|
type EachPartitionPublishJob struct {
|
|
*mq_pb.BrokerPartitionAssignment
|
|
stopChan chan bool
|
|
wg sync.WaitGroup
|
|
generation int
|
|
inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
|
|
}
|
|
|
|
func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
|
|
|
|
if err := p.doConfigureTopic(); err != nil {
|
|
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
|
|
}
|
|
|
|
log.Printf("start scheduler thread for topic %s", p.config.Topic)
|
|
|
|
generation := 0
|
|
var errChan chan EachPartitionError
|
|
for {
|
|
glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
|
|
if assignments, err := p.doLookupTopicPartitions(); err == nil {
|
|
generation++
|
|
glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
|
|
if errChan == nil {
|
|
errChan = make(chan EachPartitionError, len(assignments))
|
|
}
|
|
p.onEachAssignments(generation, assignments, errChan)
|
|
} else {
|
|
glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
|
|
if generation == 1 {
|
|
wg.Done()
|
|
}
|
|
|
|
// wait for any error to happen. If so, consume all remaining errors, and retry
|
|
for {
|
|
select {
|
|
case eachErr := <-errChan:
|
|
glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
|
|
if eachErr.generation < generation {
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
|
|
// TODO assuming this is not re-configured so the partitions are fixed.
|
|
sort.Slice(assignments, func(i, j int) bool {
|
|
return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
|
|
})
|
|
var jobs []*EachPartitionPublishJob
|
|
hasExistingJob := len(p.jobs) == len(assignments)
|
|
for i, assignment := range assignments {
|
|
if assignment.LeaderBroker == "" {
|
|
continue
|
|
}
|
|
if hasExistingJob {
|
|
var existingJob *EachPartitionPublishJob
|
|
existingJob = p.jobs[i]
|
|
if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
|
|
existingJob.generation = generation
|
|
jobs = append(jobs, existingJob)
|
|
continue
|
|
} else {
|
|
if existingJob.LeaderBroker != "" {
|
|
close(existingJob.stopChan)
|
|
existingJob.LeaderBroker = ""
|
|
existingJob.wg.Wait()
|
|
}
|
|
}
|
|
}
|
|
|
|
// start a go routine to publish to this partition
|
|
job := &EachPartitionPublishJob{
|
|
BrokerPartitionAssignment: assignment,
|
|
stopChan: make(chan bool, 1),
|
|
generation: generation,
|
|
inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
|
|
}
|
|
job.wg.Add(1)
|
|
go func(job *EachPartitionPublishJob) {
|
|
defer job.wg.Done()
|
|
if err := p.doPublishToPartition(job); err != nil {
|
|
errChan <- EachPartitionError{assignment, err, generation}
|
|
}
|
|
}(job)
|
|
jobs = append(jobs, job)
|
|
// TODO assuming this is not re-configured so the partitions are fixed.
|
|
// better just re-use the existing job
|
|
p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
|
|
}
|
|
p.jobs = jobs
|
|
}
|
|
|
|
func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
|
|
|
|
log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
|
|
|
|
grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
|
|
if err != nil {
|
|
return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
|
|
}
|
|
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
|
|
stream, err := brokerClient.PublishMessage(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("create publish client: %v", err)
|
|
}
|
|
publishClient := &PublishClient{
|
|
SeaweedMessaging_PublishMessageClient: stream,
|
|
Broker: job.LeaderBroker,
|
|
}
|
|
if err = publishClient.Send(&mq_pb.PublishMessageRequest{
|
|
Message: &mq_pb.PublishMessageRequest_Init{
|
|
Init: &mq_pb.PublishMessageRequest_InitMessage{
|
|
Topic: p.config.Topic.ToPbTopic(),
|
|
Partition: job.Partition,
|
|
AckInterval: 128,
|
|
FollowerBrokers: job.FollowerBrokers,
|
|
PublisherName: p.config.PublisherName,
|
|
},
|
|
},
|
|
}); err != nil {
|
|
return fmt.Errorf("send init message: %v", err)
|
|
}
|
|
// process the hello message
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
return fmt.Errorf("recv init response: %v", err)
|
|
}
|
|
if resp.Error != "" {
|
|
return fmt.Errorf("init response error: %v", resp.Error)
|
|
}
|
|
|
|
var publishedTsNs int64
|
|
hasMoreData := int32(1)
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
ackResp, err := publishClient.Recv()
|
|
if err != nil {
|
|
e, _ := status.FromError(err)
|
|
if e.Code() == codes.Unknown && e.Message() == "EOF" {
|
|
log.Printf("publish to %s EOF", publishClient.Broker)
|
|
return
|
|
}
|
|
publishClient.Err = err
|
|
log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
|
|
return
|
|
}
|
|
if ackResp.Error != "" {
|
|
publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
|
|
log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
|
|
return
|
|
}
|
|
if ackResp.AckSequence > 0 {
|
|
log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
|
|
}
|
|
if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
publishCounter := 0
|
|
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
|
|
if data.Ctrl != nil && data.Ctrl.IsClose {
|
|
// need to set this before sending to brokers, to avoid timing issue
|
|
atomic.StoreInt32(&hasMoreData, 0)
|
|
}
|
|
if err := publishClient.Send(&mq_pb.PublishMessageRequest{
|
|
Message: &mq_pb.PublishMessageRequest_Data{
|
|
Data: data,
|
|
},
|
|
}); err != nil {
|
|
return fmt.Errorf("send publish data: %v", err)
|
|
}
|
|
publishCounter++
|
|
atomic.StoreInt64(&publishedTsNs, data.TsNs)
|
|
}
|
|
if publishCounter > 0 {
|
|
wg.Wait()
|
|
} else {
|
|
// CloseSend would cancel the context on the server side
|
|
if err := publishClient.CloseSend(); err != nil {
|
|
return fmt.Errorf("close send: %v", err)
|
|
}
|
|
}
|
|
|
|
log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *TopicPublisher) doConfigureTopic() (err error) {
|
|
if len(p.config.Brokers) == 0 {
|
|
return fmt.Errorf("no bootstrap brokers")
|
|
}
|
|
var lastErr error
|
|
for _, brokerAddress := range p.config.Brokers {
|
|
err = pb.WithBrokerGrpcClient(false,
|
|
brokerAddress,
|
|
p.grpcDialOption,
|
|
func(client mq_pb.SeaweedMessagingClient) error {
|
|
_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
|
|
Topic: p.config.Topic.ToPbTopic(),
|
|
PartitionCount: p.config.PartitionCount,
|
|
RecordType: p.config.RecordType, // TODO schema upgrade
|
|
})
|
|
return err
|
|
})
|
|
if err == nil {
|
|
lastErr = nil
|
|
return nil
|
|
} else {
|
|
lastErr = fmt.Errorf("%s: %v", brokerAddress, err)
|
|
}
|
|
}
|
|
|
|
if lastErr != nil {
|
|
return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
|
|
if len(p.config.Brokers) == 0 {
|
|
return nil, fmt.Errorf("no bootstrap brokers")
|
|
}
|
|
var lastErr error
|
|
for _, brokerAddress := range p.config.Brokers {
|
|
err := pb.WithBrokerGrpcClient(false,
|
|
brokerAddress,
|
|
p.grpcDialOption,
|
|
func(client mq_pb.SeaweedMessagingClient) error {
|
|
lookupResp, err := client.LookupTopicBrokers(context.Background(),
|
|
&mq_pb.LookupTopicBrokersRequest{
|
|
Topic: p.config.Topic.ToPbTopic(),
|
|
})
|
|
glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(lookupResp.BrokerPartitionAssignments) == 0 {
|
|
return fmt.Errorf("no broker partition assignments")
|
|
}
|
|
|
|
assignments = lookupResp.BrokerPartitionAssignments
|
|
|
|
return nil
|
|
})
|
|
if err == nil {
|
|
return assignments, nil
|
|
} else {
|
|
lastErr = err
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
|
|
|
|
}
|