seaweedfs/weed/mq/client/sub_client/subscriber.go

61 lines
2.2 KiB
Go
Raw Normal View History

2023-09-01 15:36:51 +08:00
package sub_client
import (
2024-02-06 15:14:25 +08:00
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
2023-09-01 15:36:51 +08:00
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"sync"
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-12 04:05:54 +08:00
"time"
2023-09-01 15:36:51 +08:00
)
type SubscriberConfiguration struct {
ClientId string
ConsumerGroup string
ConsumerGroupInstanceId string
2024-02-06 15:14:25 +08:00
GrpcDialOption grpc.DialOption
2024-05-21 03:28:01 +08:00
MaxPartitionCount int32 // how many partitions to process concurrently
PerPartitionConcurrency int32 // how many messages to process concurrently per partition
2023-09-01 15:36:51 +08:00
}
2023-09-05 12:43:50 +08:00
type ContentConfiguration struct {
2024-02-06 15:14:25 +08:00
Topic topic.Topic
2023-09-05 12:43:50 +08:00
Filter string
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-12 04:05:54 +08:00
StartTime time.Time
2023-09-05 12:43:50 +08:00
}
2024-05-20 05:52:38 +08:00
type OnEachMessageFunc func(key, value []byte) (err error)
2023-09-05 12:43:50 +08:00
type OnCompletionFunc func()
2023-09-01 15:36:51 +08:00
type TopicSubscriber struct {
2024-05-23 23:23:35 +08:00
SubscriberConfig *SubscriberConfiguration
ContentConfig *ContentConfiguration
brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse
brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest
OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc
bootstrapBrokers []string
waitForMoreMessage bool
activeProcessors map[topic.Partition]*ProcessorState
activeProcessorsLock sync.Mutex
2023-09-01 15:36:51 +08:00
}
2024-05-21 03:28:01 +08:00
func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
2023-09-01 15:36:51 +08:00
return &TopicSubscriber{
2024-05-23 23:23:35 +08:00
SubscriberConfig: subscriber,
ContentConfig: content,
brokerPartitionAssignmentChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1024),
brokerPartitionAssignmentAckChan: make(chan *mq_pb.SubscriberToSubCoordinatorRequest, 1024),
bootstrapBrokers: bootstrapBrokers,
waitForMoreMessage: true,
activeProcessors: make(map[topic.Partition]*ProcessorState),
2023-09-01 15:36:51 +08:00
}
}
2023-09-05 12:43:50 +08:00
func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
sub.OnEachMessageFunc = onEachMessageFn
}
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-12 04:05:54 +08:00
func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) {
sub.OnCompletionFunc = onCompletionFn
2023-09-05 12:43:50 +08:00
}