This commit is contained in:
chrislu 2024-05-23 08:25:53 -07:00
parent 77a7c5c2a0
commit cdeaaf95b4
5 changed files with 19 additions and 19 deletions

View File

@ -44,9 +44,9 @@ type MessageQueueBroker struct {
currentFiler pb.ServerAddress
localTopicManager *topic.LocalTopicManager
PubBalancer *pub_balancer.PubBalancer
lockAsBalancer *cluster.LiveLock
SubCoordinator *sub_coordinator.SubCoordinator
accessLock sync.Mutex
lockAsBalancer *cluster.LiveLock
SubCoordinator *sub_coordinator.SubCoordinator
accessLock sync.Mutex
fca *sub_coordinator.FilerClientAccessor
}

View File

@ -14,10 +14,10 @@ import (
)
var (
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")

View File

@ -16,10 +16,10 @@ import (
)
var (
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")

View File

@ -65,7 +65,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
select {
case <-stopCh:
break
case ack := <- partitionOffsetChan:
case ack := <-partitionOffsetChan:
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{

View File

@ -27,16 +27,16 @@ type OnEachMessageFunc func(key, value []byte) (err error)
type OnCompletionFunc func()
type TopicSubscriber struct {
SubscriberConfig *SubscriberConfiguration
ContentConfig *ContentConfiguration
SubscriberConfig *SubscriberConfiguration
ContentConfig *ContentConfiguration
brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse
brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest
OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc
bootstrapBrokers []string
waitForMoreMessage bool
activeProcessors map[topic.Partition]*ProcessorState
activeProcessorsLock sync.Mutex
OnCompletionFunc OnCompletionFunc
bootstrapBrokers []string
waitForMoreMessage bool
activeProcessors map[topic.Partition]*ProcessorState
activeProcessorsLock sync.Mutex
}
func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {