seaweedfs/weed/mq/client/pub_client/publish.go
chrislu 841fafd0a8 publish to input buffer
currently, the input buffer may not exist when start to publish
2024-01-27 23:43:22 -08:00

26 lines
592 B
Go

package pub_client
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func (p *TopicPublisher) Publish(key, value []byte) error {
hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount
if hashKey < 0 {
hashKey = -hashKey
}
inputBuffer, found := p.partition2Buffer.Floor(hashKey+1, hashKey+1)
if !found {
return fmt.Errorf("no input buffer found for key %d", hashKey)
}
return inputBuffer.Enqueue(&mq_pb.DataMessage{
Key: key,
Value: value,
})
}