2023-08-29 00:02:12 +08:00
|
|
|
package pub_client
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2024-04-13 14:36:15 +08:00
|
|
|
"github.com/golang/protobuf/proto"
|
2023-12-12 04:05:54 +08:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
|
2023-08-29 00:02:12 +08:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
2024-04-13 14:36:15 +08:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
2023-08-29 00:02:12 +08:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
2024-03-21 03:25:40 +08:00
|
|
|
"time"
|
2023-08-29 00:02:12 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
func (p *TopicPublisher) Publish(key, value []byte) error {
|
2024-05-02 23:48:51 +08:00
|
|
|
if p.config.RecordType != nil {
|
|
|
|
return fmt.Errorf("record type is set, use PublishRecord instead")
|
|
|
|
}
|
|
|
|
return p.doPublish(key, value)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *TopicPublisher) doPublish(key, value []byte) error {
|
2023-12-12 04:05:54 +08:00
|
|
|
hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount
|
2023-08-29 00:02:12 +08:00
|
|
|
if hashKey < 0 {
|
|
|
|
hashKey = -hashKey
|
|
|
|
}
|
2024-01-28 15:43:22 +08:00
|
|
|
inputBuffer, found := p.partition2Buffer.Floor(hashKey+1, hashKey+1)
|
2023-08-29 00:02:12 +08:00
|
|
|
if !found {
|
2024-01-28 15:43:22 +08:00
|
|
|
return fmt.Errorf("no input buffer found for key %d", hashKey)
|
2023-08-29 00:02:12 +08:00
|
|
|
}
|
2024-01-28 15:43:22 +08:00
|
|
|
|
|
|
|
return inputBuffer.Enqueue(&mq_pb.DataMessage{
|
|
|
|
Key: key,
|
|
|
|
Value: value,
|
2024-03-21 03:25:40 +08:00
|
|
|
TsNs: time.Now().UnixNano(),
|
2024-01-28 15:43:22 +08:00
|
|
|
})
|
2023-08-29 00:02:12 +08:00
|
|
|
}
|
2024-03-31 16:28:40 +08:00
|
|
|
|
2024-04-13 14:36:15 +08:00
|
|
|
func (p *TopicPublisher) PublishRecord(key []byte, recordValue *schema_pb.RecordValue) error {
|
|
|
|
// serialize record value
|
|
|
|
value, err := proto.Marshal(recordValue)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to marshal record value: %v", err)
|
|
|
|
}
|
|
|
|
|
2024-05-02 23:48:51 +08:00
|
|
|
return p.doPublish(key, value)
|
2024-04-13 14:36:15 +08:00
|
|
|
}
|
|
|
|
|
2024-03-31 16:28:40 +08:00
|
|
|
func (p *TopicPublisher) FinishPublish() error {
|
|
|
|
if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
|
|
|
|
for _, inputBuffer := range inputBuffers {
|
|
|
|
inputBuffer.Enqueue(&mq_pb.DataMessage{
|
2024-05-21 02:03:56 +08:00
|
|
|
TsNs: time.Now().UnixNano(),
|
2024-04-01 07:35:46 +08:00
|
|
|
Ctrl: &mq_pb.ControlMessage{
|
|
|
|
IsClose: true,
|
|
|
|
},
|
2024-03-31 16:28:40 +08:00
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|