seaweedfs/weed/mq/client/pub_client/publisher.go

74 lines
1.8 KiB
Go
Raw Normal View History

2023-08-29 00:02:12 +08:00
package pub_client
2023-08-28 04:13:14 +08:00
2023-08-28 08:50:59 +08:00
import (
2023-08-29 00:02:12 +08:00
"github.com/rdleal/intervalst/interval"
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-12 04:05:54 +08:00
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
2024-01-29 04:23:20 +08:00
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
2023-08-28 08:50:59 +08:00
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
2024-04-13 14:36:15 +08:00
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
2023-08-28 08:50:59 +08:00
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
2024-01-29 04:30:08 +08:00
"log"
2023-09-05 12:43:30 +08:00
"sync"
2023-08-28 08:50:59 +08:00
)
2023-08-29 00:02:12 +08:00
type PublisherConfiguration struct {
2024-04-13 13:33:00 +08:00
Topic topic.Topic
PartitionCount int32
Brokers []string
PublisherName string // for debugging
2024-04-13 14:36:15 +08:00
RecordType *schema_pb.RecordType
2023-08-29 00:02:12 +08:00
}
2023-09-05 12:43:30 +08:00
type PublishClient struct {
2024-01-06 09:10:43 +08:00
mq_pb.SeaweedMessaging_PublishMessageClient
2023-09-05 12:43:30 +08:00
Broker string
Err error
}
2023-08-29 00:02:12 +08:00
type TopicPublisher struct {
partition2Buffer *interval.SearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage], int32]
2023-09-05 12:43:30 +08:00
grpcDialOption grpc.DialOption
sync.Mutex // protects grpc
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-12 04:05:54 +08:00
config *PublisherConfiguration
2024-01-27 06:09:57 +08:00
jobs []*EachPartitionPublishJob
2023-08-29 00:02:12 +08:00
}
2023-08-28 08:50:59 +08:00
2024-01-29 04:23:20 +08:00
func NewTopicPublisher(config *PublisherConfiguration) *TopicPublisher {
2024-01-29 04:30:08 +08:00
tp := &TopicPublisher{
partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int {
return int(a - b)
}),
2023-09-05 12:43:30 +08:00
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-12 04:05:54 +08:00
config: config,
2023-08-28 08:50:59 +08:00
}
2024-01-29 04:30:08 +08:00
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
2024-01-29 05:09:30 +08:00
if err := tp.startSchedulerThread(&wg); err != nil {
2024-01-29 04:30:08 +08:00
log.Println(err)
return
}
}()
wg.Wait()
return tp
2023-08-29 00:02:12 +08:00
}
2023-08-28 08:50:59 +08:00
2023-09-08 14:55:19 +08:00
func (p *TopicPublisher) Shutdown() error {
if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
for _, inputBuffer := range inputBuffers {
inputBuffer.CloseInput()
}
}
2024-01-29 06:27:32 +08:00
for _, job := range p.jobs {
job.wg.Wait()
}
2023-09-08 14:55:19 +08:00
return nil
}